`

队列阻塞浅析

    博客分类:
  • java
阅读更多

    这几天所做的项目中涉及到了队列阻塞机制,通过研究整理如下。在这里和大家分享。

       队列以一种先进先出的方式。如果你向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

  下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。
 
  java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
 
  生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
 
  我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。
 
 在这个程序中,我们使用队列数据结构作为一种同步机制。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest
{
   public static void main(String[] args)
   {
      Scanner in = new Scanner(System.in);
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
      String directory = in.nextLine();
      System.out.print("Enter keyword (e.g. volatile): ");
      String keyword = in.nextLine();

      final int FILE_QUEUE_SIZE = 10;
      final int SEARCH_THREADS = 100;

      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
      new Thread(enumerator).start();
      for (int i = 1; i <= SEARCH_THREADS; i++)
         new Thread(new SearchTask(queue, keyword)).start();
   }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable
{
   /**
    * Constructs a FileEnumerationTask.
    * @param queue the blocking queue to which the enumerated files are added
    * @param startingDirectory the directory in which to start the enumeration
    */
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
   {
      this.queue = queue;
      this.startingDirectory = startingDirectory;
   }

   public void run()
   {
      try
      {
         enumerate(startingDirectory);
         queue.put(DUMMY);
      }
      catch (InterruptedException e)
      {
      }
   }

   /**
    * Recursively enumerates all files in a given directory and its subdirectories
    * @param directory the directory in which to start
    */
   public void enumerate(File directory) throws InterruptedException
   {
      File[] files = directory.listFiles();
      for (File file : files)
      {
         if (file.isDirectory()) enumerate(file);
         else queue.put(file);
      }
   }

   public static File DUMMY = new File("");

   private BlockingQueue<File> queue;
   private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable
{
   /**
    * Constructs a SearchTask.
    * @param queue the queue from which to take files
    * @param keyword the keyword to look for
    */
   public SearchTask(BlockingQueue<File> queue, String keyword)
   {
      this.queue = queue;
      this.keyword = keyword;
   }

   public void run()
   {
      try
      {
         boolean done = false;
         while (!done)
         {
            File file = queue.take();
            if (file == FileEnumerationTask.DUMMY)
            {
               queue.put(file);
               done = true;
            }
            else search(file);            
         }
      }
      catch (IOException e)
      {
         e.printStackTrace();
      }
      catch (InterruptedException e)
      {
      }      
   }

   /**
    * Searches a file for a given keyword and prints all matching lines.
    * @param file the file to search
    */
   public void search(File file) throws IOException
   {
      Scanner in = new Scanner(new FileInputStream(file));
      int lineNumber = 0;
      while (in.hasNextLine())
      {
         lineNumber++;
         String line = in.nextLine().trim();
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);
      }
      in.close();
   }

   private BlockingQueue<File> queue;
   private String keyword;
}
 
31
11
分享到:
评论
14 楼 yutaozxy 2012-06-17  
while (!filesQueue.getEndSignal())  { 
       File file = queue.take(); 
       if (file == FileEnumerationTask.DUMMY) 
       { 
           filesQueue.setEndSignal(true); // queue.put(file);
       } 
       else search(file);             
}

这么写会有问题吧,如果有2个线程一个刚好从queue.take()最后一个元素而另一个线程刚好停到queue.take()那么第2个线程永远不会结束!
13 楼 w445097062 2012-06-17  
不错,可惜最近的项目接触不到这些
12 楼 railway 2012-06-16  
rogantian 写道
darrendu 写道
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?


确保其它搜索线程也能拿到虚拟包,从而结束自己!


因为Queue是阻塞式,较难判断到底遍历完,DUMMY是一种巧妙的方式,用来标示搜索完毕,为了能够让其他线程也能最终结束,所以有了搜到DUMMY的线程在结束自己的时候同时queue.put(file),让别的线程也结束:
if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            }
不过我觉得还可以有种方式,就是加入一个信号量,这需要设计一个新类,记为FilesQueue,FilesQueue中包含ArrayBlockingQueue和信号量endSignal,
public class FilesQueue {
   private ArrayBlockingQueue queue;
   private volatile boolean endSignal = false;

   //get,set
   //...
}

故SearchTask#run方法中大概是这样的:
....
while (!filesQueue.getEndSignal())  { 
       File file = queue.take(); 
       if (file == FileEnumerationTask.DUMMY) 
       { 
           filesQueue.setEndSignal(true); // queue.put(file);
       } 
       else search(file);             
}
....
11 楼 来这里学java 2012-06-15  
无法输入中文路径,试了多种编码都不行,求赐教。。
10 楼 rogantian 2012-06-15  
darrendu 写道
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?


确保其它搜索线程也能拿到虚拟包,从而结束自己!
9 楼 darrendu 2012-06-15  
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?

8 楼 pktangshao 2012-06-15  
总有一些心理变态的程序员 无论人家的文章写的多好.多认真.就是踩你没个说
7 楼 mn_1127 2012-06-15  
最近我也在学习多线程方面的知识! 写的不错……  
6 楼 季铵盐 2012-06-14  
虽然没有看,像这样的文章很不错 ;就应该有这样的专研精神!赞一个
5 楼 javawebsoa 2012-06-14  
谢谢,学习了
4 楼 aijuans 2012-06-14  
3 楼 hae 2012-06-14  
最近我也在看多线程,可是不是很清楚用途,
2 楼 那家渔村 2012-06-14  
呵呵,真巧。昨晚我也正看《Core Java》的多线程那章。阻塞队列像线程之间通信的管道。
1 楼 dingbuoyi 2012-06-14  
写的不错 学习一下

相关推荐

Global site tag (gtag.js) - Google Analytics