基于Java阻塞队列的搜索实例

发表于:2012-6-15 09:23

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:frogong    来源:51Testing软件测试网采编

  队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

  下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。

  java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。

  生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。

  我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。

  注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。

  1. import java.io.*;  
  2. import java.util.*;  
  3. import java.util.concurrent.*;  
  4. public class BlockingQueueTest  
  5. {  
  6.    public static void main(String[] args)  
  7.    {  
  8.       Scanner in = new Scanner(System.in);  
  9.       System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");  
  10.       String directory = in.nextLine();  
  11.       System.out.print("Enter keyword (e.g. volatile): ");  
  12.       String keyword = in.nextLine();  
  13.       final int FILE_QUEUE_SIZE = 10;  
  14.       final int SEARCH_THREADS = 100;  
  15.       BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  
  16.       FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));  
  17.       new Thread(enumerator).start();  
  18.       for (int i = 1; i <= SEARCH_THREADS; i++)  
  19.          new Thread(new SearchTask(queue, keyword)).start();  
  20.    }  
  21. }  
  22. /** 
  23.  * This task enumerates all files in a directory and its subdirectories. 
  24.  */ 
  25. class FileEnumerationTask implements Runnable  
  26. {  
  27.    /** 
  28.     * Constructs a FileEnumerationTask. 
  29.     * @param queue the blocking queue to which the enumerated files are added 
  30.     * @param startingDirectory the directory in which to start the enumeration 
  31.     */ 
  32.    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)  
  33.    {  
  34.       this.queue = queue;  
  35.       this.startingDirectory = startingDirectory;  
  36.    }  
  37.    public void run()  
  38.    {  
  39.       try 
  40.       {  
  41.          enumerate(startingDirectory);  
  42.          queue.put(DUMMY);  
  43.       }  
  44.       catch (InterruptedException e)  
  45.       {  
  46.       }  
  47.    }  
  48.    /** 
  49.     * Recursively enumerates all files in a given directory and its subdirectories 
  50.     * @param directory the directory in which to start 
  51.     */ 
  52.    public void enumerate(File directory) throws InterruptedException  
  53.    {  
  54.       File[] files = directory.listFiles();  
  55.       for (File file : files)  
  56.       {  
  57.          if (file.isDirectory()) enumerate(file);  
  58.          else queue.put(file);  
  59.       }  
  60.    }  
  61.    public static File DUMMY = new File("");  
  62.    private BlockingQueue<File> queue;  
  63.    private File startingDirectory;  
  64. }  
  65. /** 
  66.  * This task searches files for a given keyword. 
  67.  */ 
  68. class SearchTask implements Runnable  
  69. {  
  70.    /** 
  71.     * Constructs a SearchTask. 
  72.     * @param queue the queue from which to take files 
  73.     * @param keyword the keyword to look for 
  74.     */ 
  75.    public SearchTask(BlockingQueue<File> queue, String keyword)  
  76.    {  
  77.       this.queue = queue;  
  78.       this.keyword = keyword;  
  79.    }  
  80.    public void run()  
  81.    {  
  82.       try 
  83.       {  
  84.          boolean done = false;  
  85.          while (!done)  
  86.          {  
  87.             File file = queue.take();  
  88.             if (file == FileEnumerationTask.DUMMY)  
  89.             {  
  90.                queue.put(file);  
  91.                done = true;  
  92.             }  
  93.             else search(file);              
  94.          }  
  95.       }  
  96.       catch (IOException e)  
  97.       {  
  98.          e.printStackTrace();  
  99.       }  
  100.       catch (InterruptedException e)  
  101.       {  
  102.       }        
  103.    }  
  104.    /** 
  105.     * Searches a file for a given keyword and prints all matching lines. 
  106.     * @param file the file to search 
  107.     */ 
  108.    public void search(File file) throws IOException  
  109.    {  
  110.       Scanner in = new Scanner(new FileInputStream(file));  
  111.       int lineNumber = 0;  
  112.       while (in.hasNextLine())  
  113.       {  
  114.          lineNumber++;  
  115.          String line = in.nextLine().trim();  
  116.          if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);  
  117.       }  
  118.       in.close();  
  119.    }  
  120.    private BlockingQueue<File> queue;  
  121.    private String keyword;  
  122. }

《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号