Java并发编程二三事

发表于:2016-9-01 11:22

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:Katsura    来源:51Testing软件测试网采编

分享:
  信号量
  用于线程的同步,类似PV操作。当计数减少到0的时候acquire()会堵塞,并且直到有其他线程调用release()线程释放信号量或者线程被中断。
1   /**
2      * Semaphore
3      */
4     public void semaphore() {
5         Semaphore sem = new Semaphore(1);
6         int nThread = 5;
7         for (int i = 0; i < nThread; i++) {
8             Thread t = new Thread() {
9                 public void run() {
10                     try {
11                         sem.acquire();
12                         int random = (int) (5 * Math.random());
13                         sleep(random);
14                         System.out.println(currentThread().getId());
15                         sem.release();
16                     } catch (InterruptedException e) {
17                         e.printStackTrace();
18                     }
19                 }
20             };
21             t.start();
22         }
23     }
  栅栏
  与闭锁类似,线程会堵塞在await()方法,直到一定数量的线程到达或者线程被中断。在一定数量的线程到达后,栅栏打开,这批线程可以从堵塞中恢复,然后栅栏再次关闭。
1   /**
2      * 栅栏
3      * @return
4      * @throws InterruptedException
5      */
6     public long cyclicBarrier() throws InterruptedException {
7         int nThreads = 25;
8         final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
9             @Override
10             public void run() {
11                 System.out.println("Barrier pass...");
12             }
13         });
14
15         for (int i = 0; i < nThreads; i++) {
16             Thread t = new Thread() {
17                 public void run() {
18                     try {
19                         barrier.await();
20                         sleep(1000);
21                     } catch (BrokenBarrierException | InterruptedException e) {
22                         e.printStackTrace();
23                     }
24                 }
25             };
26             t.start();
27         }
28     }
  稍微高级一点的内容
  CompletionService
  作为Executor的包装,主要用来在执行多线程的时候获取返回的执行结果进行处理。至于ExecutorService线程池的种类和相关配置请参考JDK文档。
1   /**
2      * CompletionService
3      */
4     public void completionService() {
5         int nThread = 5;
6         ExecutorService executor = Executors.newFixedThreadPool(5);
7         CompletionService  completionService = new ExecutorCompletionService(executor);
8
9         for (int i = 0; i < nThread; i++) {
10             completionService.submit(new Callable<String>() {
11                 @Override
12                 public String call() throws Exception {
13                     if (Thread.currentThread().isInterrupted()) {
14                         return "interrupted";
15                     }
16                     int st = (int)(Math.random() * 5000);
17                     Thread.sleep(st);
18                     return Thread.currentThread().getId() + ":" + st;
19                 }
20             });
21         }
22
23         for (int i = 0; i < nThread; i++) {
24             try {
25                 Future<String> f = completionService.take();
26                 System.out.println("Round" + i);
27                 System.out.println(f.get());
28             } catch (InterruptedException | ExecutionException e) {
29                 e.printStackTrace();
30             }
31
32         }
33     }
  设定执行时间的任务
  下面这个例子只是说明可以这样做,但是并不建议这样处理,因为在非调用线程取消一个线程是一个不太合理的处理方式,最好是让调用者取消,这样调用者还可以进行下一步的处理。 另外这种方式看起来并不优雅。
1   /**
2      * 取消: 即使任务不响应中断,限时运行的方法仍能够返回到他的调用者。在任务启动以后偶timedRun执行一个限时的join方法,
3      * 在join返回后,将检查是否有异常抛出,有的话再次抛出异常,由于Throwable在两个线程之间共享,所以设置为volatile
4      */
5     public void timeRun(final Runnable r) throws Throwable {
6         ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(5);
7         class  RethrowableTask implements Runnable {
8             private volatile Throwable t;
9             @Override
10             public void run() {
11                 try {
12                     r.run();
13                 } catch (Throwable t) {
14                     this.t = t;
15                 }
16             }
17             void rethrow() throws Throwable {
18                 if (t != null) {
19                     throw t;
20                 }
21             }
22         }
23
24         RethrowableTask task = new RethrowableTask();
25         final Thread taskThread = new Thread(task);
26         taskThread.start();
27         cancelExec.schedule(() -> taskThread.interrupt(), 100000, TimeUnit.MILLISECONDS);
28         taskThread.join(10000);
29         task.rethrow();
30     }
  下面是通过Future带时间的get()方法实现的,有时限的任务,可以对比一下,下面这个方式显然要优雅很多。
  1   public void betterTimeRun(Runnable r) throws Throwable {
  2         ExecutorService executorService = Executors.newFixedThreadPool(5);
  3         Future<?> task = executorService.submit(r);
  4         try {
  5             task.get(10000, TimeUnit.MILLISECONDS);
  6         } catch (TimeoutException e) {
  7             // 在finally中被取消
  8         } catch (ExecutionException e) {
  9             throw e.getCause();
  10         } finally {
  11             task.cancel(true);
  12         }
  13     }
  在try…catch…finally中取消任务。
  UncaughtExcption的处理
  Thread的run方法是不抛出非检查异常的,所以外部的try…catch也无法捕获,有时候这会导致一些问题,比如资源没有被释放。
  但是我们可以通过Thread的实例方法setUncaughtExceptionHandler去为任何一个线程设置一个 UncaughtExceptionHandler。当然也可以调用Thread类的静态方法setUncaughtExceptionHandler去 为所有线程设置一个UncaughtExceptionHandler。接口示例如下。
  1 class MyHandler implements Thread.UncaughtExceptionHandler {
  2         /**
  3          * 对于unchecked异常,可以实现UncaughtExceptionHandler接口,当一个线程由于未捕获异常而退出时,JVM会把这个事件告报给应用程序提供的
  4          * UncaughtExceptionHandler,如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.error
  5          */
  6         @Override
  7         public void uncaughtException(Thread t, Throwable e) {
  8             // Do something...
  9         }
  10     }
  如果没有设置自己的Handler,那么JVM的默认行为是将栈信息输出到System.error。
  保存被取消的任务
  如果需要在关闭线程池的时候保存被取消的任务,那么可以扩展AbstractExcecutorService,对被取消的任务进行纪录,以便下次继续处理。
1   /**
2      * 当关闭线程池时保存被取消的任务
3      */
4     class TrackingExecutor extends AbstractExecutorService {
5         private final ExecutorService exec;
6         private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());
7
8         public TrackingExecutor() {
9             this.exec = Executors.newCachedThreadPool();
10         }
11
12         public List<Runnable> getCancelledTasks() {
13             if (!exec.isTerminated()) {
14                 throw new IllegalStateException("...");
15             }
16             return new ArrayList<Runnable>(tasksCancelledAtShutdown);
17         }
18
19         @Override
20         public void execute(Runnable command) {
21             exec.execute(new Runnable() {
22                 @Override
23                 public void run() {
24                     try {
25                         command.run();
26                     } finally {
27                         if (isShutdown() && Thread.currentThread().isInterrupted()) {
28                             tasksCancelledAtShutdown.add(command);
29                         }
30                     }
31                 }
32             });
33         }
34     }
  可以看到,用一个同步的集合保存被取消的任务,任务的run方法在try的包围中,如果线程被中断或者关闭,那么将相应的任务加入 taskCancelledAtShutdown中,以上代码只覆写了exceute方法,其他方法可以委托给ExecutorService。
32/3<123>
春暖花开更文季,点击参与还有惊喜礼品~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计

法律顾问:上海漕溪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2023
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号