    Mutex是互斥体,广泛地应用在多线程编程中。本文以广为流程的Doug Lea的concurrent工具包的Mutex实现为例,进行一点探讨。在Doug Lea的concurrent工具包中,Mutex实现了Sync接口,该接口是concurrent工具包中所有锁(lock)、门(gate)和条件变量(condition)的公共接口,Sync的实现类主要有:Mutex、Semaphore及其子类、Latch、CountDown、ReentrantLock等。这也体现了面向抽象编程的思想,使我们可以在不改变代码或者改变少量代码的情况下,选择使用Sync的不同实现。下面是Sync接口的定义:

  1. public interface Sync {
  2.   public void acquire() throws java/lang/InterruptedException.java.html" target="_blank">InterruptedException//获取许可
  3.   public boolean attempt(long msecs) throws InterruptedException//尝试获取许可
  4.   public void release(); //释放许可
  5. }

通过使用Sync可以替代Java synchronized关键字,并提供更加灵活的同步控制。当然,并不是说concurrent工具包是和Java synchronized独立的技术,其实concurrent工具包也是在synchronized的基础上搭建的,从下面对Mutex源码的解析即可以看到这一点。synchronized关键字仅在方法内或者代码块内有效,而使用Sync却可以跨越方法甚至通过在对象之间传递,跨越对象进行同步。这是Sync及concurrent工具包比直接使用synchronized更加强大的地方。

  1. class X {
  2.    Sync gate;
  3.    // ...
  4.    public void m() { 
  5.      try {
  6.        gate.acquire();  // block until condition holds
  7.        try {
  8.          // ... method body
  9.        }
  10.        finally {
  11.          gate.release();
  12.        }
  13.      }
  14.      catch (InterruptedException ex) {
  15.        // ... evasive action
  16.      }
  17.    }
  18. }

Mutex是一个非重入的互斥锁。Mutex广泛地用在需要跨越方法的before/after类型的同步环境中。下面是Doug Lea的concurrent工具包中的Mutex的实现。

  1. public class Mutex implements Sync  {
  2.   /** The lock status **/
  3.   protected boolean inuse_ = false;
  4.   public void acquire() throws InterruptedException {
  5.     if (Thread.interrupted()) throw new InterruptedException(); //(1)
  6.     synchronized(this) {
  7.       try {
  8.         while (inuse_) wait(); 
  9.         inuse_ = true;
  10.       }
  11.       catch (InterruptedException ex) { //(2)
  12.         notify();
  13.         throw ex;
  14.       }
  15.     }
  16.   }
  17.   public synchronized void release()  {
  18.     inuse_ = false;
  19.     notify(); 
  20.   }
  21.   public boolean attempt(long msecs) throws InterruptedException {
  22.     if (Thread.interrupted()) throw new InterruptedException();
  23.     synchronized(this) {
  24.       if (!inuse_) {
  25.         inuse_ = true;
  26.         return true;
  27.       }
  28.       else if (msecs <= 0)
  29.         return false;
  30.       else {
  31.         long waitTime = msecs;
  32.         long start = System.currentTimeMillis();
  33.         try {
  34.           for (;;) { 
  35.             wait(waitTime);
  36.             if (!inuse_) {
  37.               inuse_ = true;
  38.               return true;
  39.             }
  40.             else {
  41.               waitTime = msecs - (System.currentTimeMillis() - start);
  42.               if (waitTime <= 0) // (3)
  43.                 return false;
  44.             }
  45.           }
  46.         }
  47.         catch (InterruptedException ex) {
  48.           notify();
  49.           throw ex;
  50.         }
  51.       }
  52.     }  
  53.   }
  54. }


  1. class Node { 
  2.    Object item; 
  3.    Node next; 
  4.    Mutex lock = new Mutex(); // 每一个节点都持有一个锁
  5.    Node(Object x, Node n) { item = x; next = n; }
  6.  }
  7.  class List {
  8.     protected Node head; // 指向列表的头
  9.     // 使用Java的synchronized保护head域
  10.     //  (我们当然可以使用Mutex,但是这儿似乎没有这样做的必要
  11.     protected synchronized Node getHead() { return head; }
  12.     boolean search(Object x) throws InterruptedException {
  13.       Node p = getHead();
  14.       if (p == nullreturn false;
  15.       //  (这儿可以更加紧凑,但是为了演示的清楚,各种情况都分别进行处理)
  16.       p.lock.acquire();                  // Prime loop by acquiring first lock.
  17.                                      //    (If the acquire fails due to
  18.                                      //    interrupt, the method will throw
  19.                                      //    InterruptedException now,
  20.                                      //    so there is no need for any
  21.                                      //    further cleanup.)
  22.       for (;;) {
  23.         if (x.equals(p.item)) {
  24.           p.lock.release();          // 释放当前节点的锁
  25.           return true;
  26.         }
  27.         else {
  28.           Node nextp = p.next;
  29.           if (nextp == null) {
  30.             p.lock.release();       // 释放最后持有的锁
  31.             return false;
  32.           }
  33.           else {
  34.             try {
  35.               nextp.lock.acquire(); // 在释放当前锁之前获取下一个节点的锁
  36.             }
  37.             catch (InterruptedException ex) {
  38.               p.lock.release();    // 如果获取失败,也释放当前的锁
  39.               throw ex;
  40.             }
  41.             p.lock.release();      // 释放上个节点的锁,现在已经持有新的锁了
  42.             p = nextp;
  43.           }
  44.         }
  45.       }
  46.     }
  47.     synchronized void add(Object x) { // 使用synchronized保护head域
  48.       head = new Node(x, head);
  49.     }
  50.     // ...  other similar traversal and update methods ...
  51.  }

