基于Java阻塞队列的搜索实例

上一篇 / 下一篇  2012-06-18 09:30:21 / 个人分类:Java

队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

3pX{Xl FHk0  下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。51Testing软件测试网8k{L/z`7wm t F

51Testing软件测试网)b\hwn CdYv8C

  java.util.concurrent 包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、 PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。 ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通 常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。51Testing软件测试网!RA k.Hgp.T`Q$~I8p;J.K

P?*c"F-H,k+`0  生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。51Testing软件测试网o;T:icwj5v

;m&m mxs3CK0   我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来 在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取 到这个虚拟对象时,就将其放回并终止。51Testing软件测试网4Rf6DKy(_[

51Testing软件测试网&K.L^e7fhv

  注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。

;X N%u YIf+^0
  1. import java.io.*;  
  2. import java.util.*;  
  3. import java.util.concurrent.*;  
  4. 51Testing软件测试网)f |A {b\\ g:L#Y
  5. public class BlockingQueueTest  
  6. {  
  7.    public static void main(String[] args)  
  8.    {  
  9.       Scanner in = new Scanner(System.in);  
  10.       System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");  
  11.       String directory = in.nextLine();  
  12.       System.out.print("Enter keyword (e.g. volatile): ");  
  13.       String keyword = in.nextLine();  
  14. 51Testing软件测试网(AJ)XXl
  15.       final int FILE_QUEUE_SIZE = 10;  
  16.       final int SEARCH_THREADS = 100;  
  17. 51Testing软件测试网dW(lU*]t8z!e6NS"od
  18.       BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  

  19. |*n*z/}.w0
  20.       FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));  
  21.       new Thread(enumerator).start();  
  22.       for (int i = 1; i <= SEARCH_THREADS; i++)  
  23.          new Thread(new SearchTask(queue, keyword)).start();  
  24.    }  
  25. }  

  26. v4^n&XDT w'O6I)a g0
  27. /** 
  28.  * This task enumerates all files in a directory and its subdirectories. 
  29.  */ 
  30. class FileEnumerationTask implements Runnable  
  31. {  
  32.    /** 
  33.     * Constructs a FileEnumerationTask. 
  34.     * @param queue the blocking queue to which the enumerated files are added 
  35.     * @param startingDirectory the directory in which to start the enumeration 
  36.     */ 
  37.    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)  
  38.    {  
  39.       this.queue = queue;  
  40.       this.startingDirectory = startingDirectory;  
  41.    }  
  42. 51Testing软件测试网#JAH4|5h8]'_9g%\^
  43.    public void run()  
  44.    {  
  45.       try 
  46.       {  
  47.          enumerate(startingDirectory);  
  48.          queue.put(DUMMY);  
  49.       }  
  50.       catch (InterruptedException e)  
  51.       {  
  52.       }  
  53.    }  
  54. 51Testing软件测试网UL^x;AUW
  55.    /** 
  56.     * Recursively enumerates all files in a given directory and its subdirectories 
  57.     * @param directory the directory in which to start 
  58.     */ 
  59.    public void enumerate(File directory) throws InterruptedException  
  60.    {  
  61.       File[] files = directory.listFiles();  
  62.       for (File file : files)  
  63.       {  
  64.          if (file.isDirectory()) enumerate(file);  
  65.          else queue.put(file);  
  66.       }  
  67.    }  

  68. 4X"O B'S'@2q4R3E0
  69.    public static File DUMMY = new File("");  

  70. %v"h:? eSi ^'U0
  71.    private BlockingQueue<File> queue;  
  72.    private File startingDirectory;  
  73. }  
  74. 51Testing软件测试网 f6J0a%P1\y~B
  75. /** 
  76.  * This task searches files for a given keyword. 
  77.  */ 
  78. class SearchTask implements Runnable  
  79. {  
  80.    /** 
  81.     * Constructs a SearchTask. 
  82.     * @param queue the queue from which to take files 
  83.     * @param keyword the keyword to look for 
  84.     */ 
  85.    public SearchTask(BlockingQueue<File> queue, String keyword)  
  86.    {  
  87.       this.queue = queue;  
  88.       this.keyword = keyword;  
  89.    }  
  90. 51Testing软件测试网%Ly-g_b4k-QJ tJu
  91.    public void run()  
  92.    {  
  93.       try 
  94.       {  
  95.          boolean done = false;  
  96.          while (!done)  
  97.          {  
  98.             File file = queue.take();  
  99.             if (file == FileEnumerationTask.DUMMY)  
  100.             {  
  101.                queue.put(file);  
  102.                done = true;  
  103.             }  
  104.             else search(file);              
  105.          }  
  106.       }  
  107.       catch (IOException e)  
  108.       {  
  109.          e.printStackTrace();  
  110.       }  
  111.       catch (InterruptedException e)  
  112.       {  
  113.       }        
  114.    }  
  115. 51Testing软件测试网$O&s6OklBv;zc*[A
  116.    /** 
  117.     * Searches a file for a given keyword and prints all matching lines. 
  118.     * @param file the file to search 
  119.     */ 
  120.    public void search(File file) throws IOException  
  121.    {  
  122.       Scanner in = new Scanner(new FileInputStream(file));  
  123.       int lineNumber = 0;  
  124.       while (in.hasNextLine())  
  125.       {  
  126.          lineNumber++;  
  127.          String line = in.nextLine().trim();  
  128.          if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);  
  129.       }  
  130.       in.close();  
  131.    }  

  132. Y5?S/E/d[-[\TH0
  133.    private BlockingQueue<File> queue;  
  134.    private String keyword;  
  135. }

TAG:

 

评分:0

我来说两句

Open Toolbar