先看看 BlockingQueue 接口的文档说明:
1、add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出 IllegalStateException 异常;
2、offer:添加元素到队列里,添加成功返回true,添加失败返回false;
3、put:添加元素到队列里,如果容量满了会阻塞直到容量不满;
4、poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;
5、remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;
6、take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。
先看一个简单的 ArrayBlockingQueue , ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm) 。
ArrayBlockingQueue
ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。
属性如下:
/** The queued items item的集合 */ final Object[] items; /** items index for next take, poll, peek or remove 拿数据的索引 */ int takeIndex; /** items index for next put, offer, or add 放数据的索引 */ int putIndex; /** Number of elements in the queue 队列元素的个数 */ int count; /** Main lock guarding all access 可重入的锁 */ final ReentrantLock lock; /** Condition for waiting takes 条件对象 */ private final Condition notEmpty; /** Condition for waiting puts 条件对象 */ private final Condition notFull; |
先看一下add方法:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } offer方法: public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } |
我们可以看到,如果满了返回false,如果没有满调用insert。整个方法是通过可重入锁来锁住的,并且最终释放。接着看一下 insert 方法:
private void insert(E x) {
items[putIndex] = x; // 元素添加到数组里
putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
++count; // 元素个数+1
notEmpty.signal(); // 使用条件对象notEmpty通知
}
这里 insert 被调用的时候就会唤醒 notEmpty 上等待的线程进行 take 操作。
再看一下 put 方法:
public void put(E e) throws InterruptedException { checkNotNull(e); // 不允许元素为空 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程 try { while (count == items.length) // 如果队列满了,阻塞当前线程,while用来防止假唤醒 notFull.await(); // 线程阻塞并被挂起,同时释放锁 insert(e); // 调用insert方法 } finally { lock.unlock(); // 释放锁,让其他线程可以调用put方法 } } |
add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。
继续看删除数据的相关操作,先看一下poll:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程 try { return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法 } finally { lock.unlock(); // 释放锁,让其他线程可以调用poll方法 } } |
看看这个 extract 方法(jdk源码的作者的起名水平真的非常高,代码素质好):
private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素 items[takeIndex] = null; // 对应取索引上的数据清空 takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0 --count; // 元素个数-1 notFull.signal(); // 使用条件对象notFull通知,原理同上面的insert中 return x; // 返回元素 } |
看一下 take 方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程 try { while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里 notEmpty.await(); // 线程阻塞并被挂起,同时释放锁 return extract(); // 调用extract方法 } finally { lock.unlock(); // 释放锁,让其他线程可以调用take方法 } } |
再看一下 remove 方法:
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程 try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素 if (o.equals(items[i])) { // 两个对象相等的话 removeAt(i); // 调用removeAt方法 return true; // 删除成功,返回true } } return false; // 删除成功,返回false } finally { lock.unlock(); // 释放锁,让其他线程可以调用remove方法 } } |
再看一下 removeAt 方法,这个方法反而比较有价值:
private void removeAt(int i) { final Object[] items = this.items; if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可 items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值 for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; // 元素个数-1 notFull.signal(); // 使用条件对象notFull通知 } |