二、显式条件队列
可以利用wait,notify,notifyAll实现自定义同步的过程(详见我的系列的多线程设计模式)。一般利用这种方式将内置锁与内置条件队列统一在一个类中,即一个类即是内置锁,又充当了条件队列了。凡是对于该类的wait都处于这个类的专有wait set中,这里将wait set称为条件队列。
条件队列指的就是它使得一组线程集合能够通过某种方式来等待特定的条件成真。这里的条件队列里存放的是一个个正在等待条件成真的线程。
所有等待的线程都必须要等待某种条件成真才进而执行。对于这种条件称作为条件谓词(或警戒条件)。条件谓词是依赖于许多个状态变量的,对于这些状态变量必须要由一个锁来保护,在每次测试条件谓词的时候必须先获取该锁,则就要求了锁对象和条件队列对象必须是同一个对象。
状态依赖的标准形式:这里和多线程设计模式中的Guarded Suspension Pattern一样的形式。
<pre class="brush:java;toolbar:false;"> public synchronized void concreteWork() { while(!condition) { try{ wait();//此时当前线程就进入了该this对象的条件队列中等候 }catch(InterruptedException e) { } } //具体的工作,当条件谓词成功后执行的 } //必须要保证在调用wait之前和从wait唤醒后都要对于条件谓词测试 |
状态通知的形式:
每当在等待一个条件的时候,一定要保证在条件谓词变成真的时候通过某种方式发出通知。一般就是利用Object的notifyAll和notify来实现。一般采用notifyAll来实现,虽然可能会导致上下文切换等性能。但是这种能保证所有的线程都能进行公平竞争,防止信号丢失。
一般多个线程可能会在同一个对象的同一个条件队列上等待多个不同的添加谓词。使用notify可能会唤醒本不该唤醒的线程。譬如,当线程A在条件队列中等待条件谓词PA,线程B在条件队列中等待条件谓词PB,若线程C执行了操作,使得PB变成真,此时利用notify,可能会唤醒线程A,A被唤醒发现PA还是不为真,则继续等待,此时线程B则以为PB没有真,就会彻底的丢失了信号,从而再次等待下去
使用notify的情况仅仅同时满足“所有等待线程都是等待相同的条件谓词”和“每次最多只能唤醒一个线程来执行”。
ReentrantLock是Lock的子类,它是一种广义的内置锁,Condition则是一种广义的内置条件队列。由于多个线程可能会在同一个条件队列中等待不同的条件谓词,有时候为了方便的进行管理,可以针对不同的条件谓词设置不同的条件队列来等待。
一般一个Condition和一个Lock关联在一起,就是如同一个内置锁和内置条件队列关联在同一个对象中。一般创建Condition,就是在需要关联的Lock上调用Lock.newCondition方法。一个Lock则可以对应多个Condition对象。在Condition中,与Object中wait,notify,notifyAll所对应的方法分别为await,signal,signalAll。
显式的Condition和内置条件队列的选择:
一般还是选择内置条件队列,当需要使用公平队列或在每个锁上对应多个等待线程集的时候,就可以运用Condition了
代码实例:一个利用Condition和Lock来实现的有界缓存,里面有两种条件队列
package whut.usersynchrnoized; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; //利用Condition和Lock来实现有界缓存 public class ConditionBoundedBuffer<T> { private final Lock lock = new ReentrantLock(); private final Condition full = lock.newCondition(); private final Condition empty = lock.newCondition(); private final T[] items; private int tail, head, count; public ConditionBoundedBuffer(int size) { items = (T[]) new Object[size]; } // 阻塞直到notfull public void put(T x) throws InterruptedException { lock.lock(); try { while (count == items.length) full.await();// 满了,则要在full条件队列中等待 items[tail] = x; if (++tail == items.length) tail = 0; ++count; empty.signal();// 每次唤醒一个线程 } finally { lock.unlock(); } } // 阻塞直到notEmpty public T take() throws InterruptedException { lock.lock(); try { while (count == 0) empty.await();// 空了,则要在empty条件队列中等待 T x = items[tail]; items[tail] = null; if (++head == items.length) head = 0; --count; full.signal();// 每次唤醒一个线程 return x; } finally { lock.unlock(); } } } |