面试官:Semaphore在项目中有使用过吗?

上一篇 / 下一篇  2022-07-13 11:54:06

  Semaphore
  它就是我们之前在讲源码的时候提到的信号量,下面看下它的构造函数。
  public Semaphore(int permits) {
          sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
          sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
  从构造函数可以看出,它可以传入指定数量的资源和指定公平和非公平锁,公平和非公平就不多阐述了。
  我们重点关注的是acquire()和release(), 这两个方法字面意思很好理解, Semaphore往往用于资源有限的场景,比如我们需要限制某个操作的线程数量。下面通过例子感受一下。
  public class SemaphoreTest {
      public static final class Task implements Runnable {
          private int num;
          private Semaphore semaphore;
          public Task(int num, Semaphore semaphore) {
              this.num = num;
              this.semaphore = semaphore;
          }
          @Override
          public void run() {
              try {
                  // 获取
                  semaphore.acquire();
                  System.out.println(String.format("num: %d, 剩余%d个资源, 还有%d个线程在等待", num, semaphore.availablePermits(), semaphore.getQueueLength()));
                  System.out.println(System.currentTimeMillis());
                  Thread.sleep(3000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  // 释放
                  System.out.println("释放资源");
                  semaphore.release();
              }
          }
      }
      public static void main(String[] args) {
          Semaphore semaphore = new Semaphore(2);
          IntStream.range(0, 20).forEach(i -> new Thread(new Task(i, semaphore)).start());
      }
  }
  实际输出:
  num: 1, 剩余0个资源, 还有0个线程在等待
  1657591518171
  num: 0, 剩余1个资源, 还有0个线程在等待
  1657591518172
  释放资源
  ....
  释放资源
  num: 18, 剩余0个资源, 还有1个线程在等待
  1657591545235
  num: 19, 剩余0个资源, 还有0个线程在等待
  1657591545236
  释放资源
  释放资源
  进程已结束,退出代码0
  源码剖析
  我们重点看下acquire()源码实现。
  从这个信号量获取一个许可,阻塞直到有一个可用,或者线程被中断。获得一个许可,如果一个可用并立即返回,将可用许可的数量减少一个。
  public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }
  重点是这个sync。
  // 首先它继承 AbstractQueuedSynchronizer 这个大家肯定不陌生了 就是AQS
  abstract static class Sync extends AbstractQueuedSynchronizer {
      private static final long serialVersionUID = 1192457210091910933L;
      // 初始化的时候会写入一个状态
      Sync(int permits) {
          setState(permits);
      }
      // 获取当前的状态
      final int getPermits() {
          return getState();
      }
      // 非公平方式获取信号量
      final int nonfairTryAcquireShared(int acquires) {
          for (;;) {
              // 当前可获取的
              int available = getState();
              // 计算剩余数量
              int remaining = available - acquires;
              // 如果剩余数量大于0 就是进行cas修改
              if (remaining < 0 ||
                  compareAndSetState(available, remaining))
                  return remaining;
          }
      }
      // 释放信号量
      protected final boolean tryReleaseShared(int releases) {
          for (;;) {
              int current = getState();
              // 释放后剩余的数量
              int next = current + releases;
              // 如果超出就
              if (next < current) // overflow
                  throw new Error("Maximum permit count exceeded");
              if (compareAndSetState(current, next))
                  return true;
          }
      }
      final void reducePermits(int reductions) {
          for (;;) {
              int current = getState();
              int next = current - reductions;
              // 超出最大限量抛异常
              if (next > current) // underflow
                  throw new Error("Permit count underflow");
              if (compareAndSetState(current, next))
                  return;
          }
      }
      final int drainPermits() {
          for (;;) {
              int current = getState();
              if (current == 0 || compareAndSetState(current, 0))
                  return current;
          }
      }
  }
  在构造函数中FairSync和NonfairSync他们都继承Sync。
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  默认情况下非公平的Semaphore会去调用Sync的nonfairTryAcquireShared。
  static final class NonfairSync extends Sync {
          private static final long serialVersionUID = -2694183684443567898L;
          NonfairSync(int permits) {
              super(permits);
          }
          protected int tryAcquireShared(int acquires) {
              return nonfairTryAcquireShared(acquires);
          }
      }
  公平的Semaphore内部实现了tryAcquireShared()。
  static final class FairSync extends Sync {
      private static final long serialVersionUID = 2014338818796000944L;
      FairSync(int permits) {
          super(permits);
      }
      protected int tryAcquireShared(int acquires) {
          for (;;) {
              if (hasQueuedPredecessors())
                  return -1;
              int available = getState();
              int remaining = available - acquires;
              if (remaining < 0 ||
                  compareAndSetState(available, remaining))
                  return remaining;
          }
      }
  }
  下面我们再回过头看下acquire(), 内部方法acquireSharedInterruptibly是AQS的内部方法。
  public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }
  如果线程中断,直接抛异常, 如果没拿到资源就进入排队机制。
  public final void acquireSharedInterruptibly(int arg)
          throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
          // 可获取的资源数小于0进入排队 这里的实现在子类,就是上边提到的
      if (tryAcquireShared(arg) < 0)
          doAcquireSharedInterruptibly(arg);
  }
  重点看下这个doAcquireSharedInterruptibly()。
  private void doAcquireSharedInterruptibly(int arg)
      throws InterruptedException {
      // 以共享模式加入到阻塞队列 之前讲源码的时候都讲过     
      final Node node = addWaiter(Node.SHARED);
      // 默认失败
      boolean failed = true;
      try {
          for (;;) {
              // 获取前置节点
              final Node p = node.predecessor();
              // 如果上一个节点就是头部节点 再次尝试获取 (原因是头部节点可能释放资源了)
              if (p == head) {
                  int r = tryAcquireShared(arg);
                  // 如果获取到了 并且还有剩余资源
                  if (r >= 0) {
                      // 1. 将当前节点设置为头部节点
                      // 2. 判断后续节点是否是共享等待节点
                      // 3. 唤醒后续的节点
                      setHeadAndPropagate(node, r);
                      p.next = null; // help GC
                      failed = false;
                      return;
                  }
              }
              // 这一步主要是检查未能获取到资源的节点状态
              // 如果线程需要阻塞返回true
              // parkAndCheckInterrupt 如果线程中断了 抛出异常
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  throw new InterruptedException();
          }
      } finally {
          // 如果失败 取消正在进行的获取
          if (failed)
              cancelAcquire(node);
      }
  }
  shouldParkAfterFailedAcquire()的细节我们也来看下,可能有的同学不大清楚。
  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;
          // 只要前置节点释放锁,就会通知标识为SIGNAL(-1)状态的后续节点的线程
          // 如果前置节点为SIGNAL,只需要等待其他前置节点的线程被释放,
          if (ws == Node.SIGNAL)
              return true;
          // 这里的判断指的是取消状态, 如果取消了就讲这个节点移除掉
          if (ws > 0) {
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              // cas 更新
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }
  这里的SIGNAL一类的常量,大家可以自行到源码查看,这也是细节地方。发现这段代码主要的作用就是检查节点状态,对后续节点做一些操作,这里并没有阻塞操作,下面我们看下parkAndCheckInterrupt()。
  private final boolean parkAndCheckInterrupt() {
          LockSupport.park(this);
          return Thread.interrupted();
      }
  这里我们可以看到加了锁,所以阻塞发生在这。那么释放锁在哪呢?其实在release阶段。
  private void unparkSuccessor(Node node) {
          int ws = node.waitStatus;
          if (ws < 0)
              compareAndSetWaitStatus(node, ws, 0);
          Node s = node.next;
          if (s == null || s.waitStatus > 0) {
              s = null;
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      s = t;
          }
          if (s != null)
              LockSupport.unpark(s.thread);
      }
  可以看到在unparkSuccessor中进行了锁的释放,这个过程发生在释放阶段。
  release()相对简单一些,大家可以自己对着源码看下,实现有些类似。
  结束语
  其实本节带大家看源码,主要是想给大家讲下共享锁的知识,Semaphore其实就是使用了共享锁。另外AQS这个类很值得大家好好研究一下,你会发现很多的好用的类都是基于它实现。

TAG: Java Semaphore

 

评分:0

我来说两句

Open Toolbar