一直搞不懂Java线程通信,这次终于明白了(2)

上一篇 / 下一篇  2023-04-03 14:58:38

  ReentrantLock结合Condition
  Condition是JDK1.5新增的接口,在java.util.concurrent.locks 包中,提供了类似的Object的监视器方法,与Lock配合可以实现等待/通知模式,方法作用在下方源码中已简单注释,想要查看详细说明,强烈建议看源码,通过翻译软件翻译一下就行!
  package java.util.concurrent.locks;
  import java.util.concurrent.TimeUnit;
  import java.util.Date;
  public interface Condition {
      //使当前线程在接到信号或被中断之前一直处于等待状态
      void await() throws InterruptedException;
      // 使当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
      void awaitUninterruptibly();
      // 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
      // 返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了
      long awaitNanos(long nanosTimeout) throws InterruptedException;
      // 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
      boolean await(long time, TimeUnit unit) throws InterruptedException;
      // 使当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false
      boolean awaitUntil(Date deadline) throws InterruptedException;
      // 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
      void signal();
      // 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁
      void signalAll();
  }
  在此我们通过经典的生产者消费者案例说一下Condition实现线程通信,多几种案例思维更宽阔,多样化理解对技术刺激更大
  案例:有一个快递点,可以接货和送货,最多存放5个包裹,再放就提示包裹已满,派件时包裹送完就不能再送,提示没有包裹,不能派送
  快递点:
  package com.stt.thread.communication;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.Condition;
  import java.util.concurrent.locks.ReentrantLock;
  /**
   * 快递点:
   * goodsNumber: 快递数量,默认为0,最多5个,保障原子性使用 AtomicInteger 
   * receiving() : 收货方法,累加货物数量,每次 + 1
   * dispatch() : 派送方法,递减数量,每次 - 1
   * 注意:因为使用 Condition 实现,Condition 需要通过 ReentrantLock 获取,
   *      所以可以使用 ReentrantLock实现同步就不需要 synchronized
   */
  public class ExpressPoint {
      // 快递数量,使用原子类
      private AtomicInteger goodsNumber = new AtomicInteger();
      // 锁对象
      private ReentrantLock lock = new ReentrantLock();
      // 创建线程通信对象
      private Condition condition = lock.newCondition();
      // 收货方法,使用Lock锁,就不需要synchronized同步了
      public void receiving() {
          // 上锁
          lock.lock();
          // 写try...finally,保障无论是否发生异常都可以解锁,避免死锁
          try {
              // 如果达到5个,就提示,并且等待
              while (goodsNumber.get() == 5) {
                  System.out.println("库房已满,已不能再接收!");
                  // 等待,有异常抛出
                  condition.await();
              }
              System.out.println(Thread.currentThread().getName() + "已收到编号:" + goodsNumber.incrementAndGet() + "的包裹");
              // 唤醒其他线程
              condition.signalAll();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              // 解锁
              lock.unlock();
          }
      }
      // 派送方法
      public void dispatch() {
          // 上锁
          lock.lock();
          try {
              // 等于0就不能再派送
              while (goodsNumber.get() == 0) {
                  System.out.println("没有包裹,不能派送!");
                  condition.await();
              }
              System.out.println(Thread.currentThread().getName() + "已送出编号:" + goodsNumber.get() + "的包裹");
              goodsNumber.decrementAndGet();
              condition.signalAll();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              // 解锁
              lock.unlock();
          }
      }
  }
  测试类:通过while死循环,不断接货和送货
  public class ExpressPointMain {
      public static void main(String[] args) {
          ExpressPoint expressPoint = new ExpressPoint();
          // 收货线程
          new Thread(() -> {
              while (true){
                  expressPoint.receiving();
              }
          },"收货员").start();
          // 送货线程
          new Thread(() -> {
              while (true){
                  expressPoint.dispatch();
              }
          },"送货员").start();
      }
  }
  运行结果:发现收货员线程和送货员线程交替执行,并且库存满和送完之后都有对应的提示
  总结:在Condition中,用await()替换wait(),用signal()替换 notify(),用signalAll()替换notifyAll(),对于我们以前使用传统的Object方法,Condition都能够给予实现
  Condition 精准唤醒
  不同的 Condition 可以用来等待和唤醒不同的线程,类似于上边我们说的等待池,但是Condition是通过队列实现等待和唤醒,Condition的await()方法,会使得当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()返回时,当前线程一定是获取了Condition相关联的锁。Condition实现方式在后边我们再分析
  上边调用await 和 signalAll方法是控制所有该Condition对象的线程,我们有两个线程分别为收货和送货,我们可以创建两个Condition对象来精准控制等待和唤醒收货和送货线程。
  package com.stt.thread.communication;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.Condition;
  import java.util.concurrent.locks.ReentrantLock;
  /**
  定义两个 Condition 对象,一个控制收货线程等待和唤醒,一个控制送货线程的等待和唤醒
   */
  public class ExpressPoint {
      // 快递数量,使用原子类
      private AtomicInteger goodsNumber = new AtomicInteger();
      // 锁对象
      private ReentrantLock lock = new ReentrantLock();
      // 创建线程通信对象
      private Condition receivingCondition = lock.newCondition();
      private Condition dispatchCondition = lock.newCondition();
      // 收货方法,使用Lock锁,就不需要synchronized同步了
      public void receiving() {
          // 上锁
          lock.lock();
          // 写try...finally,保障无论是否发生异常都可以解锁,避免死锁
          try {
              // 判断是否继续接货
              while (goodsNumber.get() == 5) {
                  System.out.println("库房已满,已不能再接收!");
                  // 让收货线程进入等待
                  receivingCondition.await();
              }
              System.out.println(Thread.currentThread().getName() + "已收到编号:" + goodsNumber.incrementAndGet() + "的包裹");
              // 仅仅唤醒送货线程
              dispatchCondition.signal();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              // 解锁
              lock.unlock();
          }
      }
      // 派送方法
      public void dispatch() {
          // 上锁
          lock.lock();
          try {
              // 判断是否继续送货
              while (goodsNumber.get() == 0) {
                  System.out.println("没有包裹,不能派送!");
                  // 送货线程等待
                  dispatchCondition.await();
              }
              System.out.println(Thread.currentThread().getName() + "已送出编号:" + goodsNumber.get() + "的包裹");
              goodsNumber.decrementAndGet();
              // 唤醒收货线程
              receivingCondition.signal();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          } finally {
              // 解锁
              lock.unlock();
          }
      }
  }
  运行结果:运行结果是一样的,只是仅仅会让对应的线程等待和唤醒。
  Condition实现分析
  等待队列
  Conditiont的等待队列是一个FIFO队列,队列的每个节点都是等待在Condition对象上的线程的引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await(),那么该线程就会释放锁,构成节点加入等待队列并进入等待状态。
  从下图可以看出来Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可。上述节点引用更新过程没有使用CAS机制,因为在调用await()的线程必定是获取了锁的线程,该过程由锁保证线程的安全。
  一个Lock(同步器)拥有一个同步队列和多个等待队列:
  如上边的例子:就是拥有receivingCondition 和 dispatchCondition两个等待队列
  private Condition receivingCondition = lock.newCondition();
  private Condition dispatchCondition = lock.newCondition();
  等待
  调用Condition的await()方法,会使得当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()返回时,当前线程一定是获取了Condition相关联的锁。
  线程触发await()这个过程可以看作是同步队列的首节点【当前线程肯定是成功获得了锁,才会执行await方法,因此一定是在同步队列的首节点】移动到了Condition的等待队列的尾节点,并释放同步状态进入等待状态,同时会唤醒同步队列的后继节点。
  唤醒
  调用signal():会唤醒再等待队列中的首节点,该节点也是到目前为止等待时间最长的节点
  调用signalAll():将等待队列中的所有节点全部唤醒,相当于将等待队列中的每一个节点都执行一次signal()
  CountDownLatch
  Java5之后在 java.util.concurrent 也就是【JUC】包中提供了很多并发编程的工具类,如 CountDownLatch 计数器是基于 AQS 框架实现的多个线程之间维护共享变量的类。
  使用场景
  可以通过 CountDownLatch 使当前线程阻塞,等待其他线程完成给定任务,比如,等待线程完成下载任务之后,提示用户下载完成;导游等待所有游客参观完之后去下一个景点等。
  使用介绍
  CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待n个点完成,这里就传入n。这里所说的n个点,可以是n个线程,也可以是1个线程里的n个执行步骤。CountDownLatch 构造函数如下:
  public CountDownLatch(int count) {
          if (count < 0) throw new IllegalArgumentException("count < 0");
          this.sync = new Sync(count);
   }
  计数器参数count必须大于等于0,等于0的时候,调用await方法时不会阻塞当前线程。
  当我们调用CountDownLatch的countDown()方法时,n就会减1,CountDownLatch的await()方法会阻塞当前线程,直到n变成零,继续执行。
  CountDownLatch 方法
  ·await():阻塞当前线程,直到计数器为零为止
  · await(long timeout, TimeUnit unit):await()的重载方法,可以指定阻塞时长
  · countDown():计数器减1,如果计数达到零,释放所有等待的线程
  · getCount(): 返回当前计数
  案例:比如开一把英雄联盟,需要10个人加载完成才会进入游戏,可以理解为10个线程运行完毕之后进入游戏页面
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.List;
  import java.util.Random;
  import java.util.concurrent.CountDownLatch;
  public class LoadingGame {
      public static void main(String[] args) {
          // 计数器
          CountDownLatch latch = new CountDownLatch(10);
          // 玩家数组
          String[] player = new String[10];
          // 随机数,用来加载进度条时线程睡眠使用,防止直接加载到100
          Random random = new Random();
          // 循环开启10个线程,即10个玩家
          for (int i = 0; i < 10; i++) {
              // 记录玩家在数组中的下标
              int index = i;
              new Thread(() -> {
                  // 循环进度条到100
                  for (int j = 0; j <= 100; j++) {
                      try {
                          // 每加载 1% 就随机睡眠一段时间
                          Thread.sleep(random.nextInt(100));
                      } catch (InterruptedException e) {
                          throw new RuntimeException(e);
                      }
                      // 修改指定玩家进度条
                      player[index] = j +"%";
                      // 输出当前所有的玩家进度
                      System.out.print("\r" + Arrays.toString(player));
                  }
                  // 每加载完一个玩家计数-1
                  latch.countDown();
              }).start();
          }
          try {
              // 阻塞当前线程【main线程】,等待十个玩家加载结束后唤醒
              latch.await();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
          System.out.println("\n"+"游戏开始");
      }
  }
  运行结果:发现主线程等待10个子线程加载到100时才执行。
  高频面试题——如何保证多个线程按顺序执行
  其实就是让线程按照指定的顺序一个一个执行,这里结合同一案例给大家介绍4种方法:
  案例:老师布置作业之后,学生开始写作业,学生写完作业老师批改,之后老师再将学生的作业情况记录下来,这个顺序不可错乱
  Thread的join方法
  public class HomeworkJoin {
      public static void main(String[] args) {
          // 布置作业线程
          Thread t1 = new Thread(() -> {
              System.out.println("......老师布置作业......");
          });
          // 学生写作业,需要等待老师布置完
          Thread t2 = new Thread(() -> {
              try {
                  // t1插入执行,也就是插队
                  t1.join();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("......学生写作业......");
          });
          // 学生写作业,需要等待老师布置完
          Thread t3 = new Thread(() -> {
              try {
                  // t2插队
                  t2.join();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("......老师检查作业......");
          });
          // 学生写作业,需要等待老师布置完
          Thread t4 = new Thread(() -> {
              try {
                  // t3插队
                  t3.join();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
              System.out.println("......老师记录作业情况......");
          });
          // 开启线程
          t1.start();
          t2.start();
          t3.start();
          t4.start();
          // t1线程插队
          try {
              t4.join();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
          System.out.println("......作业布置和检查结束......");
      }
  }
  运行结果:
  使用Condition(条件变量)
  我们可以使用Condition精确唤醒下一个需要执行的线程
  import java.util.concurrent.locks.Condition;
  import java.util.concurrent.locks.Lock;
  import java.util.concurrent.locks.ReentrantLock;
  public class HomeworkCondition {
      // 锁对象
      private static Lock lock = new ReentrantLock();
      // 阻塞队列
      private static Condition doWork = lock.newCondition();
      private static Condition checkWork = lock.newCondition();
      private static Condition recordWork = lock.newCondition();
      /**
       * 为什么要加这三个标识状态?
       * 如果没有状态标识,线程就无法正确唤醒,就一直处于等待状态
       */
      private static Boolean t1Run = false;
      private static Boolean t2Run = false;
      private static Boolean t3Run = false;
      public static void main(String[] args) {
          // 布置作业线程
          Thread t1 = new Thread(() -> {
              lock.lock();
              try {
                  System.out.println("......老师布置作业......");
                  // t1执行完毕
                  t1Run = true;
                  // 唤醒doWork等待队列中的第一个线程
                  doWork.signal();
              }finally {
                  lock.unlock();
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t2 = new Thread(() -> {
              lock.lock();
              try {
                  // 判断是否布置作业
                  if(!t1Run) {
                      // 还没布置作业,先不写作业,进入等待队列
                      doWork.await();
                  }
                  System.out.println("......学生写作业......");
                  t2Run = true;
                  // 唤醒checkWork等待队列第一个线程
                  checkWork.signal();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  lock.unlock();
              }
          });
          // 老师家查作业,需要学生写完
          Thread t3 = new Thread(() -> {
              lock.lock();
              try {
                  // 判断学生是否写完作业
                  if(!t2Run) {
                      // 没写完,先不检查,进入等待队列
                      checkWork.await();
                  }
                  System.out.println("......老师检查作业......");
                  t3Run = true;
                  // 唤醒recordWork等待队列第一个线程
                  recordWork.signal();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  lock.unlock();
              }
          });
          // 老师上传作业情况,需要检查完
          Thread t4 = new Thread(() -> {
              lock.lock();
              try {
                  if(!t3Run) {
                      recordWork.await();
                  }
                  System.out.println("......老师记录作业情况......");
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } finally {
                  lock.unlock();
              }
          });
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      }
  }
  使用CountDownLatch(倒计数)
  声明三个 CountDownLatch 计数器,初始只都为 1,每次执行上一部操作之后下一步操作的计数器 -1,当计数器值为0时就继续执行,否则就陷入等待
  import java.util.concurrent.CountDownLatch;
  public class HomeworkCountDownLatch {
      public static void main(String[] args) {
          // 创建三个计数器
          CountDownLatch doWork = new CountDownLatch(1);
          CountDownLatch checkWork = new CountDownLatch(1);
          CountDownLatch recordWork = new CountDownLatch(1);
          // 布置作业线程
          Thread t1 = new Thread(() -> {
              System.out.println("......老师布置作业......");
              // 布置作业之后,做作业计数器 -1
              doWork.countDown();
          });
          // 学生写作业,需要等待老师布置完
          Thread t2 = new Thread(() -> {
              try {
                  doWork.await();
                  System.out.println("......学生写作业......");
                  // 对 检查作业 -1
                  checkWork.countDown();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t3 = new Thread(() -> {
              try {
                  doWork.await();
                  System.out.println("......老师检查作业......");
                  // 对 录入作业情况 -1
                  recordWork.countDown();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t4 = new Thread(() -> {
              try {
                  recordWork.await();
                  System.out.println("......老师记录作业情况......");
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          });
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      }
  }
  使用CyclicBarrier(回环栅栏)
  CyclicBarrier可以实现让一组线程等待至某个状态之后再全部同时执行,【回环】是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用,可以把这个状态当做barrier,当调用await()方法之后,线程就处于barrier了。示例如下:
  import java.util.concurrent.BrokenBarrierException;
  import java.util.concurrent.CyclicBarrier;
  public class HomeworkCyclicBarrier {
      public static void main(String[] args) {
          CyclicBarrier doWork = new CyclicBarrier(2);
          CyclicBarrier checkWork = new CyclicBarrier(2);
          CyclicBarrier recordWork = new CyclicBarrier(2);
          // 布置作业线程
          Thread t1 = new Thread(() -> {
              try {
                  System.out.println("......老师布置作业......");
                  //放开栅栏1
                  doWork.await();
              } catch (BrokenBarrierException e) {
                  throw new RuntimeException(e);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t2 = new Thread(() -> {
              try {
                  //放开栅栏1
                  doWork.await();
                  System.out.println("......学生写作业......");
                  //放开栅栏2
                  checkWork.await();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } catch (BrokenBarrierException e) {
                  throw new RuntimeException(e);
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t3 = new Thread(() -> {
              try {
                  //放开栅栏2
                  checkWork.await();
                  System.out.println("......老师检查作业......");
                  //放开栅栏3
                  recordWork.await();
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } catch (BrokenBarrierException e) {
                  throw new RuntimeException(e);
              }
          });
          // 学生写作业,需要等待老师布置完
          Thread t4 = new Thread(() -> {
              try {
                  //放开栅栏3
                  recordWork.await();
                  System.out.println("......老师记录作业情况......");
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              } catch (BrokenBarrierException e) {
                  throw new RuntimeException(e);
              }
          });
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      }
  }
  此四种方法都可以实现同样的效果,当然你也可以使用Object的wait() 和 notify()/notifyAll()实现
  高频面试题——Thread.join()和CountDownLatch的区别
  Thread.join()是Thread类的一个方法,Thread.join()的实现是依靠Object的wait()和notifyAll()来完成的,而CountDownLatch是JUC包中的一个工具类
  当我们使用ExecutorService 【线程池】,就不能使用join,必须使用CountDownLatch,比如:
  ExecutorService service = Executors.newFixedThreadPool(5);
  final CountDownLatch latch = new CountDownLatch(5);
  for(int x = 0; x < 5; x++) {
      service.submit(new Runnable() {
          public void run() {
              // do something
              latch.countDown();
          }
      });
  }
  latch.await();
  调用join方法需要等待thread执行完毕才能继续向下执行,而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景。
  为什么wait, notify和notifyAll这些方法在Object类中不在Thread类里面?
  Java提供的锁是对象级的而不是线程级的,线程为了进入临界区【也就是同步块内】,需要获得锁并等待锁可用,它们并不知道也不需要知道哪些线程持有锁,它们只需要知道当前资源是否被占用,是否可以获得锁,所以锁的持有状态应该由同步监视器来获取,而不是线程本身。
  如果Java不提供关键字来解决线程之间的通信,锁是对象级别,由于wait,notify,notifyAll都是锁级别的操作,每个对象都可以当做锁所以把他们定义在Object类中是最合适的。

TAG: 软件开发 Java java

 

评分:0

我来说两句

Open Toolbar