核心函数 runWorker
下面是简化的逻辑,注意:每个工作线程的run都执行下面的函数
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
w.unlock();
}
processWorkerExit(w, completedAbruptly);
}
· 从getTask()中获取任务
· 锁住 worker
· 执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
· 运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
· 执行afterExecute(task, thrown);
· 解锁 worker
· 如果获取到的任务为 null,关闭 worker
获取任务 getTask
线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。
private final BlockingQueue workQueue;
getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。
· 现有的线程数量超过最大线程数量
· 线程池处于STOP状态
· 线程池处于SHUTDOWN状态且工作队列为空
· 线程等待任务超时,且线程数量超过保留线程数量
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。
timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
在以下两种情况下等待任务会超时:
· 允许核心线程等待超时,即allowCoreThreadTimeOut(true)
· 当前线程是普通线程,此时wc > corePoolSize
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。
总结
ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。