关于多线程同步的初步教程--Barrier的设计及使用

发表于:2007-05-25来源:作者:点击数: 标签:线程--Barrier初步同步教程
Barrier是一个多线程编程中经常要用到的同步工具,尤其多用于大数据量的计算过程中的同步。本文以广为流程的DougLea的concurrent工具包的Barrier实现为例,进行一点探讨。在DougLea的concurrent工具包中,Barrier是一个接口,在concurrent包中提供了两个Barr

Barrier是一个多线程编程中经常要用到的同步工具,尤其多用于大数据量的计算过程中的同步。本文以广为流程的Doug Lea的concurrent工具包的Barrier实现为例,进行一点探讨。在Doug Lea的concurrent工具包中,Barrier是一个接口,在concurrent包中提供了两个Barrier的实现:CyclicBarrier和Rendezvous。下面是Barrier接口的定义:

  1. public interface Barrier {
  2.   /** 
  3.    * Return the number of parties that must meet per barrier
  4.    * point. The number of parties is always at least 1.
  5.    **/
  6.   public int parties();
  7.   /**
  8.    * Returns true if the barrier has been compromised
  9.    * by threads leaving the barrier before a synchronization
  10.    * point (normally due to interruption or timeout). 
  11.    * Barrier methods in implementation classes throw
  12.    * throw BrokenBarrierException upon detection of breakage.
  13.    * Implementations may also support some means
  14.    * to clear this status.
  15.    **/
  16.   public boolean broken();
  17. }


    Barrier接口中的方法非常简单,parties()返回所有需要在屏障处同步的线程数;broken()返回一个标志,指示释放是否已被破坏。Barrier接口中并没有提供加入屏障的方法,而是在c和Rendezvous的Barrier实现中提供的。你可以会疑问,为什么不在Barrier接口中提供这些方法呢?因为这些实现的差异迥异,以至于很难在这些实现中提炼出一个共用的方法签名。比如,对于CyclicBarrier加入屏障的方法是:barrier(), 

  1. // CyclicBarrier.java
  2.   public int barrier() throws InterruptedException, BrokenBarrierException {
  3.     return doBarrier(false, 0);
  4.   }
  5.   protected synchronized int doBarrier(boolean timed, long msecs) 
  6.     throws InterruptedException, TimeoutException, BrokenBarrierException  { 
  7.     
  8.     int index = --count_;
  9.     if (broken_) {
  10.       throw new BrokenBarrierException(index);
  11.     }
  12.     else if (Thread.interrupted()) {
  13.       broken_ = true;
  14.       notifyAll();
  15.       throw new InterruptedException();
  16.     }
  17.     else if (index == 0) {  // tripped
  18.       count_ = parties_;
  19.       ++resets_;
  20.       notifyAll();
  21.       try {
  22.         if (barrierCommand_ != null)
  23.           barrierCommand_.run();
  24.         return 0;
  25.       }
  26.       catch (RuntimeException ex) {
  27.         broken_ = true;
  28.         return 0;
  29.       }
  30.     }
  31.     else if (timed && msecs <= 0) {
  32.       broken_ = true;
  33.       notifyAll();
  34.       throw new TimeoutException(msecs);
  35.     }
  36.     else {                   // wait until next reset
  37.       int r = resets_;      
  38.       long startTime = (timed)? System.currentTimeMillis() : 0;
  39.       long waitTime = msecs;
  40.       for (;;) {
  41.         try {
  42.           wait(waitTime);
  43.         }
  44.         catch (InterruptedException ex) {
  45.           // Only claim that broken if interrupted before reset
  46.           if (resets_ == r) { 
  47.             broken_ = true;
  48.             notifyAll();
  49.             throw ex;
  50.           }
  51.           else {
  52.             Thread.currentThread().interrupt(); // propagate
  53.           }
  54.         }
  55.         if (broken_) 
  56.           throw new BrokenBarrierException(index);
  57.         else if (r != resets_)
  58.           return index;
  59.         else if (timed) {
  60.           waitTime = msecs - (System.currentTimeMillis() - startTime);
  61.           if  (waitTime <= 0) {
  62.             broken_ = true;
  63.             notifyAll();
  64.             throw new TimeoutException(msecs);
  65.           }
  66.         }
  67.       }
  68.     }
  69.   }


而Rendezvous中则是:rendezvous(Object x):

  1. // Rendezvous.java
  2.   public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
  3.     return doRendezvous(x, false, 0);
  4.   }
  5.   protected Object doRendezvous(Object x, boolean timed, long msecs) 
  6.     throws InterruptedException, TimeoutException, BrokenBarrierException {
  7.     // rely on semaphore to throw interrupt on entry
  8.     long startTime;
  9.     if (timed) {
  10.       startTime = System.currentTimeMillis();
  11.       if (!entryGate_.attempt(msecs)) {
  12.         throw new TimeoutException(msecs);
  13.       }
  14.     }
  15.     else {
  16.       startTime = 0;
  17.       entryGate_.acquire();
  18.     }
  19.     synchronized(this) {
  20.       Object y = null;
  21.       int index =  entries_++;
  22.       slots_[index] = x;
  23.       try { 
  24.         // last one in runs function and releases
  25.         if (entries_ == parties_) {
  26.           departures_ = entries_;
  27.           notifyAll();
  28.           try {
  29.             if (!broken_ && rendezvousFunction_ != null)
  30.             rendezvousFunction_.rendezvousFunction(slots_);
  31.           }
  32.           catch (RuntimeException ex) {
  33.             broken_ = true;
  34.           }
  35.         }
  36.         else {
  37.           while (!broken_ && departures_ < 1) {
  38.             long timeLeft = 0;
  39.             if (timed) {
  40.               timeLeft = msecs - (System.currentTimeMillis() - startTime);
  41.               if (timeLeft <= 0) {
  42.                 broken_ = true;
  43.                 departures_ = entries_;
  44.                 notifyAll();
  45.                 throw new TimeoutException(msecs);
  46.               }
  47.             }
  48.             
  49.             try {
  50.               wait(timeLeft); 
  51.             }
  52.             catch (InterruptedException ex) { 
  53.               if (broken_ || departures_ > 0) { // interrupted after release
  54.                 Thread.currentThread().interrupt();
  55.                 break;
  56.               }
  57.               else {
  58.                 broken_ = true;
  59.                 departures_ = entries_;
  60.                 notifyAll();
  61.                 throw ex;
  62.               }
  63.             }
  64.           }
  65.         }
  66.       }
  67.       finally {
  68.         y = slots_[index];
  69.         
  70.         // Last one out cleans up and allows next set of threads in
  71.         if (--departures_ <= 0) {
  72.           for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
  73.           entryGate_.release(entries_);
  74.           entries_ = 0;
  75.         }
  76.       }
  77.       // continue if no IE/TO throw
  78.       if (broken_)
  79.         throw new BrokenBarrierException(index);
  80.       else
  81.         return y;
  82.     }
  83.   }


既然这样,那提供一个共用的Barrier接口还有什么意义呢?Doug Lea也觉察出了这个问题。所以在即将在JDK1.5中作为标准并发工具包发布的java.util.concurrent中,就去除了Barrier接口。

最常用的Barrier实现是CyclicBarrier,下面是CyclicBarrier的一个简单使用实例,Rendezvous的使用实例可以参考concurrent包的API文档。

  1.  class Solver {
  2.     final int N;
  3.     final float[][] data;
  4.     final CyclicBarrier barrier;
  5.     
  6.     class Worker implements Runnable {
  7.        int myRow;
  8.        Worker(int row) { myRow = row; }
  9.        public void run() {
  10.           while (!done()) {
  11.              processRow(myRow);
  12.  
  13.              try {
  14.                barrier.barrier(); 
  15.              }
  16.              catch (InterruptedException ex) { return; }
  17.              catch (BrokenBarrierException ex) { return; }
  18.           }
  19.        }
  20.     }
  21.  
  22.     public Solver(float[][] matrix) {
  23.       data = matrix;
  24.       N = matrix.length;
  25.       barrier = new CyclicBarrier(N);
  26.       barrier.setBarrierCommand(new Runnable() {
  27.         public void run() { mergeRows(...); }
  28.       });
  29.       for (int i = 0; i < N; ++i) {
  30.         new Thread(new Worker(i)).start();
  31.       waitUntilDone();
  32.      }
  33.  }


原文转自:http://www.ltesting.net

评论列表(网友评论仅供网友表达个人看法,并不表明本站同意其观点或证实其描述)