信号量
用于线程的同步,类似PV操作。当计数减少到0的时候acquire()会堵塞,并且直到有其他线程调用release()线程释放信号量或者线程被中断。
1 /** 2 * Semaphore 3 */ 4 public void semaphore() { 5 Semaphore sem = new Semaphore(1); 6 int nThread = 5; 7 for (int i = 0; i < nThread; i++) { 8 Thread t = new Thread() { 9 public void run() { 10 try { 11 sem.acquire(); 12 int random = (int) (5 * Math.random()); 13 sleep(random); 14 System.out.println(currentThread().getId()); 15 sem.release(); 16 } catch (InterruptedException e) { 17 e.printStackTrace(); 18 } 19 } 20 }; 21 t.start(); 22 } 23 } |
栅栏
与闭锁类似,线程会堵塞在await()方法,直到一定数量的线程到达或者线程被中断。在一定数量的线程到达后,栅栏打开,这批线程可以从堵塞中恢复,然后栅栏再次关闭。
1 /** 2 * 栅栏 3 * @return 4 * @throws InterruptedException 5 */ 6 public long cyclicBarrier() throws InterruptedException { 7 int nThreads = 25; 8 final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() { 9 @Override 10 public void run() { 11 System.out.println("Barrier pass..."); 12 } 13 }); 14 15 for (int i = 0; i < nThreads; i++) { 16 Thread t = new Thread() { 17 public void run() { 18 try { 19 barrier.await(); 20 sleep(1000); 21 } catch (BrokenBarrierException | InterruptedException e) { 22 e.printStackTrace(); 23 } 24 } 25 }; 26 t.start(); 27 } 28 } |
稍微高级一点的内容
CompletionService
作为Executor的包装,主要用来在执行多线程的时候获取返回的执行结果进行处理。至于ExecutorService线程池的种类和相关配置请参考JDK文档。
1 /** 2 * CompletionService 3 */ 4 public void completionService() { 5 int nThread = 5; 6 ExecutorService executor = Executors.newFixedThreadPool(5); 7 CompletionService completionService = new ExecutorCompletionService(executor); 8 9 for (int i = 0; i < nThread; i++) { 10 completionService.submit(new Callable<String>() { 11 @Override 12 public String call() throws Exception { 13 if (Thread.currentThread().isInterrupted()) { 14 return "interrupted"; 15 } 16 int st = (int)(Math.random() * 5000); 17 Thread.sleep(st); 18 return Thread.currentThread().getId() + ":" + st; 19 } 20 }); 21 } 22 23 for (int i = 0; i < nThread; i++) { 24 try { 25 Future<String> f = completionService.take(); 26 System.out.println("Round" + i); 27 System.out.println(f.get()); 28 } catch (InterruptedException | ExecutionException e) { 29 e.printStackTrace(); 30 } 31 32 } 33 } |
设定执行时间的任务
下面这个例子只是说明可以这样做,但是并不建议这样处理,因为在非调用线程取消一个线程是一个不太合理的处理方式,最好是让调用者取消,这样调用者还可以进行下一步的处理。 另外这种方式看起来并不优雅。
1 /** 2 * 取消: 即使任务不响应中断,限时运行的方法仍能够返回到他的调用者。在任务启动以后偶timedRun执行一个限时的join方法, 3 * 在join返回后,将检查是否有异常抛出,有的话再次抛出异常,由于Throwable在两个线程之间共享,所以设置为volatile 4 */ 5 public void timeRun(final Runnable r) throws Throwable { 6 ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(5); 7 class RethrowableTask implements Runnable { 8 private volatile Throwable t; 9 @Override 10 public void run() { 11 try { 12 r.run(); 13 } catch (Throwable t) { 14 this.t = t; 15 } 16 } 17 void rethrow() throws Throwable { 18 if (t != null) { 19 throw t; 20 } 21 } 22 } 23 24 RethrowableTask task = new RethrowableTask(); 25 final Thread taskThread = new Thread(task); 26 taskThread.start(); 27 cancelExec.schedule(() -> taskThread.interrupt(), 100000, TimeUnit.MILLISECONDS); 28 taskThread.join(10000); 29 task.rethrow(); 30 } |
下面是通过Future带时间的get()方法实现的,有时限的任务,可以对比一下,下面这个方式显然要优雅很多。
1 public void betterTimeRun(Runnable r) throws Throwable {
2 ExecutorService executorService = Executors.newFixedThreadPool(5);
3 Future<?> task = executorService.submit(r);
4 try {
5 task.get(10000, TimeUnit.MILLISECONDS);
6 } catch (TimeoutException e) {
7 // 在finally中被取消
8 } catch (ExecutionException e) {
9 throw e.getCause();
10 } finally {
11 task.cancel(true);
12 }
13 }
在try…catch…finally中取消任务。
UncaughtExcption的处理
Thread的run方法是不抛出非检查异常的,所以外部的try…catch也无法捕获,有时候这会导致一些问题,比如资源没有被释放。
但是我们可以通过Thread的实例方法setUncaughtExceptionHandler去为任何一个线程设置一个 UncaughtExceptionHandler。当然也可以调用Thread类的静态方法setUncaughtExceptionHandler去 为所有线程设置一个UncaughtExceptionHandler。接口示例如下。
1 class MyHandler implements Thread.UncaughtExceptionHandler {
2 /**
3 * 对于unchecked异常,可以实现UncaughtExceptionHandler接口,当一个线程由于未捕获异常而退出时,JVM会把这个事件告报给应用程序提供的
4 * UncaughtExceptionHandler,如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.error
5 */
6 @Override
7 public void uncaughtException(Thread t, Throwable e) {
8 // Do something...
9 }
10 }
如果没有设置自己的Handler,那么JVM的默认行为是将栈信息输出到System.error。
保存被取消的任务
如果需要在关闭线程池的时候保存被取消的任务,那么可以扩展AbstractExcecutorService,对被取消的任务进行纪录,以便下次继续处理。
1 /** 2 * 当关闭线程池时保存被取消的任务 3 */ 4 class TrackingExecutor extends AbstractExecutorService { 5 private final ExecutorService exec; 6 private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); 7 8 public TrackingExecutor() { 9 this.exec = Executors.newCachedThreadPool(); 10 } 11 12 public List<Runnable> getCancelledTasks() { 13 if (!exec.isTerminated()) { 14 throw new IllegalStateException("..."); 15 } 16 return new ArrayList<Runnable>(tasksCancelledAtShutdown); 17 } 18 19 @Override 20 public void execute(Runnable command) { 21 exec.execute(new Runnable() { 22 @Override 23 public void run() { 24 try { 25 command.run(); 26 } finally { 27 if (isShutdown() && Thread.currentThread().isInterrupted()) { 28 tasksCancelledAtShutdown.add(command); 29 } 30 } 31 } 32 }); 33 } 34 } |
可以看到,用一个同步的集合保存被取消的任务,任务的run方法在try的包围中,如果线程被中断或者关闭,那么将相应的任务加入 taskCancelledAtShutdown中,以上代码只覆写了exceute方法,其他方法可以委托给ExecutorService。