一直搞不懂Java线程通信,这次终于明白了(2)
ReentrantLock结合Condition
Condition是JDK1.5新增的接口,在java.util.concurrent.locks 包中,提供了类似的Object的监视器方法,与Lock配合可以实现等待/通知模式,方法作用在下方源码中已简单注释,想要查看详细说明,强烈建议看源码,通过翻译软件翻译一下就行!
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;
public interface Condition {
//使当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
// 使当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
void awaitUninterruptibly();
// 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
// 返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 使当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
void signal();
// 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁
void signalAll();
}
在此我们通过经典的生产者消费者案例说一下Condition实现线程通信,多几种案例思维更宽阔,多样化理解对技术刺激更大
案例:有一个快递点,可以接货和送货,最多存放5个包裹,再放就提示包裹已满,派件时包裹送完就不能再送,提示没有包裹,不能派送
快递点:
package com.stt.thread.communication;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 快递点:
* goodsNumber: 快递数量,默认为0,最多5个,保障原子性使用 AtomicInteger
* receiving() : 收货方法,累加货物数量,每次 + 1
* dispatch() : 派送方法,递减数量,每次 - 1
* 注意:因为使用 Condition 实现,Condition 需要通过 ReentrantLock 获取,
* 所以可以使用 ReentrantLock实现同步就不需要 synchronized
*/
public class ExpressPoint {
// 快递数量,使用原子类
private AtomicInteger goodsNumber = new AtomicInteger();
// 锁对象
private ReentrantLock lock = new ReentrantLock();
// 创建线程通信对象
private Condition condition = lock.newCondition();
// 收货方法,使用Lock锁,就不需要synchronized同步了
public void receiving() {
// 上锁
lock.lock();
// 写try...finally,保障无论是否发生异常都可以解锁,避免死锁
try {
// 如果达到5个,就提示,并且等待
while (goodsNumber.get() == 5) {
System.out.println("库房已满,已不能再接收!");
// 等待,有异常抛出
condition.await();
}
System.out.println(Thread.currentThread().getName() + "已收到编号:" + goodsNumber.incrementAndGet() + "的包裹");
// 唤醒其他线程
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 解锁
lock.unlock();
}
}
// 派送方法
public void dispatch() {
// 上锁
lock.lock();
try {
// 等于0就不能再派送
while (goodsNumber.get() == 0) {
System.out.println("没有包裹,不能派送!");
condition.await();
}
System.out.println(Thread.currentThread().getName() + "已送出编号:" + goodsNumber.get() + "的包裹");
goodsNumber.decrementAndGet();
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 解锁
lock.unlock();
}
}
}
测试类:通过while死循环,不断接货和送货
public class ExpressPointMain {
public static void main(String[] args) {
ExpressPoint expressPoint = new ExpressPoint();
// 收货线程
new Thread(() -> {
while (true){
expressPoint.receiving();
}
},"收货员").start();
// 送货线程
new Thread(() -> {
while (true){
expressPoint.dispatch();
}
},"送货员").start();
}
}
运行结果:发现收货员线程和送货员线程交替执行,并且库存满和送完之后都有对应的提示
总结:在Condition中,用await()替换wait(),用signal()替换 notify(),用signalAll()替换notifyAll(),对于我们以前使用传统的Object方法,Condition都能够给予实现
Condition 精准唤醒
不同的 Condition 可以用来等待和唤醒不同的线程,类似于上边我们说的等待池,但是Condition是通过队列实现等待和唤醒,Condition的await()方法,会使得当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()返回时,当前线程一定是获取了Condition相关联的锁。Condition实现方式在后边我们再分析
上边调用await 和 signalAll方法是控制所有该Condition对象的线程,我们有两个线程分别为收货和送货,我们可以创建两个Condition对象来精准控制等待和唤醒收货和送货线程。
package com.stt.thread.communication;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
定义两个 Condition 对象,一个控制收货线程等待和唤醒,一个控制送货线程的等待和唤醒
*/
public class ExpressPoint {
// 快递数量,使用原子类
private AtomicInteger goodsNumber = new AtomicInteger();
// 锁对象
private ReentrantLock lock = new ReentrantLock();
// 创建线程通信对象
private Condition receivingCondition = lock.newCondition();
private Condition dispatchCondition = lock.newCondition();
// 收货方法,使用Lock锁,就不需要synchronized同步了
public void receiving() {
// 上锁
lock.lock();
// 写try...finally,保障无论是否发生异常都可以解锁,避免死锁
try {
// 判断是否继续接货
while (goodsNumber.get() == 5) {
System.out.println("库房已满,已不能再接收!");
// 让收货线程进入等待
receivingCondition.await();
}
System.out.println(Thread.currentThread().getName() + "已收到编号:" + goodsNumber.incrementAndGet() + "的包裹");
// 仅仅唤醒送货线程
dispatchCondition.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 解锁
lock.unlock();
}
}
// 派送方法
public void dispatch() {
// 上锁
lock.lock();
try {
// 判断是否继续送货
while (goodsNumber.get() == 0) {
System.out.println("没有包裹,不能派送!");
// 送货线程等待
dispatchCondition.await();
}
System.out.println(Thread.currentThread().getName() + "已送出编号:" + goodsNumber.get() + "的包裹");
goodsNumber.decrementAndGet();
// 唤醒收货线程
receivingCondition.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 解锁
lock.unlock();
}
}
}
运行结果:运行结果是一样的,只是仅仅会让对应的线程等待和唤醒。
Condition实现分析
等待队列
Conditiont的等待队列是一个FIFO队列,队列的每个节点都是等待在Condition对象上的线程的引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await(),那么该线程就会释放锁,构成节点加入等待队列并进入等待状态。
从下图可以看出来Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可。上述节点引用更新过程没有使用CAS机制,因为在调用await()的线程必定是获取了锁的线程,该过程由锁保证线程的安全。
一个Lock(同步器)拥有一个同步队列和多个等待队列:
如上边的例子:就是拥有receivingCondition 和 dispatchCondition两个等待队列
private Condition receivingCondition = lock.newCondition();
private Condition dispatchCondition = lock.newCondition();
等待
调用Condition的await()方法,会使得当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()返回时,当前线程一定是获取了Condition相关联的锁。
线程触发await()这个过程可以看作是同步队列的首节点【当前线程肯定是成功获得了锁,才会执行await方法,因此一定是在同步队列的首节点】移动到了Condition的等待队列的尾节点,并释放同步状态进入等待状态,同时会唤醒同步队列的后继节点。
唤醒
调用signal():会唤醒再等待队列中的首节点,该节点也是到目前为止等待时间最长的节点
调用signalAll():将等待队列中的所有节点全部唤醒,相当于将等待队列中的每一个节点都执行一次signal()
CountDownLatch
Java5之后在 java.util.concurrent 也就是【JUC】包中提供了很多并发编程的工具类,如 CountDownLatch 计数器是基于 AQS 框架实现的多个线程之间维护共享变量的类。
使用场景
可以通过 CountDownLatch 使当前线程阻塞,等待其他线程完成给定任务,比如,等待线程完成下载任务之后,提示用户下载完成;导游等待所有游客参观完之后去下一个景点等。
使用介绍
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待n个点完成,这里就传入n。这里所说的n个点,可以是n个线程,也可以是1个线程里的n个执行步骤。CountDownLatch 构造函数如下:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
计数器参数count必须大于等于0,等于0的时候,调用await方法时不会阻塞当前线程。
当我们调用CountDownLatch的countDown()方法时,n就会减1,CountDownLatch的await()方法会阻塞当前线程,直到n变成零,继续执行。
CountDownLatch 方法
·await():阻塞当前线程,直到计数器为零为止
· await(long timeout, TimeUnit unit):await()的重载方法,可以指定阻塞时长
· countDown():计数器减1,如果计数达到零,释放所有等待的线程
· getCount(): 返回当前计数
案例:比如开一把英雄联盟,需要10个人加载完成才会进入游戏,可以理解为10个线程运行完毕之后进入游戏页面
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class LoadingGame {
public static void main(String[] args) {
// 计数器
CountDownLatch latch = new CountDownLatch(10);
// 玩家数组
String[] player = new String[10];
// 随机数,用来加载进度条时线程睡眠使用,防止直接加载到100
Random random = new Random();
// 循环开启10个线程,即10个玩家
for (int i = 0; i < 10; i++) {
// 记录玩家在数组中的下标
int index = i;
new Thread(() -> {
// 循环进度条到100
for (int j = 0; j <= 100; j++) {
try {
// 每加载 1% 就随机睡眠一段时间
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 修改指定玩家进度条
player[index] = j +"%";
// 输出当前所有的玩家进度
System.out.print("\r" + Arrays.toString(player));
}
// 每加载完一个玩家计数-1
latch.countDown();
}).start();
}
try {
// 阻塞当前线程【main线程】,等待十个玩家加载结束后唤醒
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("\n"+"游戏开始");
}
}
运行结果:发现主线程等待10个子线程加载到100时才执行。
高频面试题——如何保证多个线程按顺序执行
其实就是让线程按照指定的顺序一个一个执行,这里结合同一案例给大家介绍4种方法:
案例:老师布置作业之后,学生开始写作业,学生写完作业老师批改,之后老师再将学生的作业情况记录下来,这个顺序不可错乱
Thread的join方法
public class HomeworkJoin {
public static void main(String[] args) {
// 布置作业线程
Thread t1 = new Thread(() -> {
System.out.println("......老师布置作业......");
});
// 学生写作业,需要等待老师布置完
Thread t2 = new Thread(() -> {
try {
// t1插入执行,也就是插队
t1.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("......学生写作业......");
});
// 学生写作业,需要等待老师布置完
Thread t3 = new Thread(() -> {
try {
// t2插队
t2.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("......老师检查作业......");
});
// 学生写作业,需要等待老师布置完
Thread t4 = new Thread(() -> {
try {
// t3插队
t3.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("......老师记录作业情况......");
});
// 开启线程
t1.start();
t2.start();
t3.start();
t4.start();
// t1线程插队
try {
t4.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("......作业布置和检查结束......");
}
}
运行结果:
使用Condition(条件变量)
我们可以使用Condition精确唤醒下一个需要执行的线程
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class HomeworkCondition {
// 锁对象
private static Lock lock = new ReentrantLock();
// 阻塞队列
private static Condition doWork = lock.newCondition();
private static Condition checkWork = lock.newCondition();
private static Condition recordWork = lock.newCondition();
/**
* 为什么要加这三个标识状态?
* 如果没有状态标识,线程就无法正确唤醒,就一直处于等待状态
*/
private static Boolean t1Run = false;
private static Boolean t2Run = false;
private static Boolean t3Run = false;
public static void main(String[] args) {
// 布置作业线程
Thread t1 = new Thread(() -> {
lock.lock();
try {
System.out.println("......老师布置作业......");
// t1执行完毕
t1Run = true;
// 唤醒doWork等待队列中的第一个线程
doWork.signal();
}finally {
lock.unlock();
}
});
// 学生写作业,需要等待老师布置完
Thread t2 = new Thread(() -> {
lock.lock();
try {
// 判断是否布置作业
if(!t1Run) {
// 还没布置作业,先不写作业,进入等待队列
doWork.await();
}
System.out.println("......学生写作业......");
t2Run = true;
// 唤醒checkWork等待队列第一个线程
checkWork.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
});
// 老师家查作业,需要学生写完
Thread t3 = new Thread(() -> {
lock.lock();
try {
// 判断学生是否写完作业
if(!t2Run) {
// 没写完,先不检查,进入等待队列
checkWork.await();
}
System.out.println("......老师检查作业......");
t3Run = true;
// 唤醒recordWork等待队列第一个线程
recordWork.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
});
// 老师上传作业情况,需要检查完
Thread t4 = new Thread(() -> {
lock.lock();
try {
if(!t3Run) {
recordWork.await();
}
System.out.println("......老师记录作业情况......");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
});
t1.start();
t2.start();
t3.start();
t4.start();
}
}
使用CountDownLatch(倒计数)
声明三个 CountDownLatch 计数器,初始只都为 1,每次执行上一部操作之后下一步操作的计数器 -1,当计数器值为0时就继续执行,否则就陷入等待
import java.util.concurrent.CountDownLatch;
public class HomeworkCountDownLatch {
public static void main(String[] args) {
// 创建三个计数器
CountDownLatch doWork = new CountDownLatch(1);
CountDownLatch checkWork = new CountDownLatch(1);
CountDownLatch recordWork = new CountDownLatch(1);
// 布置作业线程
Thread t1 = new Thread(() -> {
System.out.println("......老师布置作业......");
// 布置作业之后,做作业计数器 -1
doWork.countDown();
});
// 学生写作业,需要等待老师布置完
Thread t2 = new Thread(() -> {
try {
doWork.await();
System.out.println("......学生写作业......");
// 对 检查作业 -1
checkWork.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 学生写作业,需要等待老师布置完
Thread t3 = new Thread(() -> {
try {
doWork.await();
System.out.println("......老师检查作业......");
// 对 录入作业情况 -1
recordWork.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 学生写作业,需要等待老师布置完
Thread t4 = new Thread(() -> {
try {
recordWork.await();
System.out.println("......老师记录作业情况......");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
t3.start();
t4.start();
}
}
使用CyclicBarrier(回环栅栏)
CyclicBarrier可以实现让一组线程等待至某个状态之后再全部同时执行,【回环】是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用,可以把这个状态当做barrier,当调用await()方法之后,线程就处于barrier了。示例如下:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class HomeworkCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier doWork = new CyclicBarrier(2);
CyclicBarrier checkWork = new CyclicBarrier(2);
CyclicBarrier recordWork = new CyclicBarrier(2);
// 布置作业线程
Thread t1 = new Thread(() -> {
try {
System.out.println("......老师布置作业......");
//放开栅栏1
doWork.await();
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 学生写作业,需要等待老师布置完
Thread t2 = new Thread(() -> {
try {
//放开栅栏1
doWork.await();
System.out.println("......学生写作业......");
//放开栅栏2
checkWork.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
// 学生写作业,需要等待老师布置完
Thread t3 = new Thread(() -> {
try {
//放开栅栏2
checkWork.await();
System.out.println("......老师检查作业......");
//放开栅栏3
recordWork.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
// 学生写作业,需要等待老师布置完
Thread t4 = new Thread(() -> {
try {
//放开栅栏3
recordWork.await();
System.out.println("......老师记录作业情况......");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
t3.start();
t4.start();
}
}
此四种方法都可以实现同样的效果,当然你也可以使用Object的wait() 和 notify()/notifyAll()实现
高频面试题——Thread.join()和CountDownLatch的区别
Thread.join()是Thread类的一个方法,Thread.join()的实现是依靠Object的wait()和notifyAll()来完成的,而CountDownLatch是JUC包中的一个工具类
当我们使用ExecutorService 【线程池】,就不能使用join,必须使用CountDownLatch,比如:
ExecutorService service = Executors.newFixedThreadPool(5);
final CountDownLatch latch = new CountDownLatch(5);
for(int x = 0; x < 5; x++) {
service.submit(new Runnable() {
public void run() {
// do something
latch.countDown();
}
});
}
latch.await();
调用join方法需要等待thread执行完毕才能继续向下执行,而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景。
为什么wait, notify和notifyAll这些方法在Object类中不在Thread类里面?
Java提供的锁是对象级的而不是线程级的,线程为了进入临界区【也就是同步块内】,需要获得锁并等待锁可用,它们并不知道也不需要知道哪些线程持有锁,它们只需要知道当前资源是否被占用,是否可以获得锁,所以锁的持有状态应该由同步监视器来获取,而不是线程本身。
如果Java不提供关键字来解决线程之间的通信,锁是对象级别,由于wait,notify,notifyAll都是锁级别的操作,每个对象都可以当做锁所以把他们定义在Object类中是最合适的。
相关阅读:
- Java正则表达式的使用 (liqianqian1116, 2023-4-14)
- 轻松完成异步任务,一文搞懂Python Celery (liqianqian1116, 2023-4-03)
- 跟我学编程:Java 虚拟机之指令重排序 (liqianqian1116, 2023-3-31)
- 让Python代码飞起来,高手必用十个VSCode插件 (liqianqian1116, 2023-4-04)
- 一直搞不懂Java线程通信,这次终于明白了(1) (liqianqian1116, 2023-4-03)
标题搜索
我的存档
数据统计
- 访问量: 253249
- 日志数: 959
- 建立时间: 2020-08-11
- 更新时间: 2024-04-26