在FreeBSD5.0中,有很多类型的锁:互斥体(struct mtx)、共享/独占锁(struct sx)、lockmgr锁(struct lock)、条件变量(struct cv)和信号量(struct sema)。本文将探讨这些锁机制的含义、应用和实现。另外,有一种文件锁(struct lockf)用于文件字段的保护,嵌在inode结构中使用,是属于另外的范畴,这里不做说明,有兴趣可以参考flock(2)和VOP_ADVLOCK(9)。 1 基本锁机制 struct lock_class { const char *lc_name; u_int lc_flags; }; 本文讨论的锁机制,任何一种类型都直接/间接的包含该结构。每一个锁都有一个类(lock_class),该类描述了某个锁类(lc_name)和基本特性(lc_flags)。 其中,lc_flags的宏定义:LC_SLEEPLOCK是普通锁;LC_SPINLOCK是自旋锁;LC_SLEEPABLE说明可以在对该锁加锁的情况下,休眠;LC_RECURSABLE说明该锁是否支持递归;LC_UPGRADABLE说明该锁支持从共享锁变为独占锁,抑或相反。 目前,系统支持如下的lock_class: struct lock_class lock_class_mtx_sleep = { "sleep mutex", LC_SLEEPLOCK | LC_RECURSABLE }; struct lock_class lock_class_mtx_spin = { "spin mutex", LC_SPINLOCK | LC_RECURSABLE }; struct lock_class lock_class_sx = { "sx", LC_SLEEPLOCK | LC_SLEEPABLE | LC_RECURSABLE | LC_UPGRADABLE }; 前两个说明了互斥体支持的两个类(MTX_DEF、MTX_SPIN);后一个说明了共享/独占锁的类。 struct lock_object { struct lock_class *lo_class; const char *lo_name; /* Individual lock name. */ const char *lo_type; /* General lock type. */ u_int lo_flags; TAILQ_ENTRY(lock_object) lo_list; /* List of all locks in system. */ struct witness *lo_witness; }; 用于存储一个具体锁的数据结构,lo_name是单个具体锁的名字;如果在锁初始化时指定了类型描述的字符串,则赋给lo_type,否则lo_type的值和lo_name一样。 成员lo_flags的值,及其说明如下: #define LO_CLASSFLAGS 0x0000ffff /* Class specific flags. */ #define LO_INITIALIZED 0x00010000 /* Lock has been initialized. */ #define LO_WITNESS 0x00020000 /* Should witness monitor this lock. */ #define LO_QUIET 0x00040000 /* Don't log locking operations. */ #define LO_RECURSABLE 0x00080000 /* Lock may recurse. */ #define LO_SLEEPABLE 0x00100000 /* Lock may be held while sleeping. */ #define LO_UPGRADABLE 0x00200000 /* Lock may be upgraded/downgraded. */ #define LO_DUPOK 0x00400000 /* Don't check for duplicate acquires */ 余下两个成员,lo_list用于加入全局all_locks链表中,lo_witness用于指向witness数据结构。这两个成员的作用是用于witness模块,跟踪锁的获得、释放。这个部分需要参考witness(9)。 本节介绍了FreeBSD锁机制的最基本的结构和意义,下面介绍各种锁机制的基本含义。 2互斥体(Mutex) FreeBSD5.0进程同步控制的基本和主要方法是Mutex。在设计时,主要有如下考虑: Ø 申请和释放一个无争议的互斥体应该尽可能的简单。 Ø 互斥体的数据结构应该有足够的信息和存储空间来支持优先级的继承。 Ø 一个进程应该可以递归地使用某一个互斥体(比如最常用的全局互斥体Giant)。 根据Mutex阻塞时,是否做上下文切换,可以分为两类:MTX_DEF(做切换)和MTX_SPIN(不做切换)。 MTX_DEF:当内核线程不能获得锁时,该线程会放弃CPU资源;采用这样类型的锁,不必考虑产生死锁的情况。 MTX_SPIN:当内核线程不能获得锁时,该线程不会放弃CPU资源,它会自旋,等待直到请求的锁被其它CPU释放。这样做,可能会产生死锁的情况,因此,我们需要在使用这种类型的锁屏蔽本地CPU的所有中断。 关于互斥体的其他类型可以和以上两种类型并存,它们说明了互斥体的其它属性。 MTX_RECURSE:具有该标志的锁,可以被递归使用。 MTX_QUIET:对该锁不做任何日志操作。 MTX_NOWITNESS:通知witness,不必跟踪该锁。 MTX_DUPOK:当该锁被复制时,witness不必写日志消息。 2.1 互斥体数据结构 struct mtx { struct lock_object mtx_object; /* Common lock properties. */ volatile uintptr_t mtx_lock; /* Owner and flags. */ volatile u_int mtx_recurse; /* Number of recursive holds. */ TAILQ_HEAD(, thread) mtx_blocked; /* Threads blocked on us. */ LIST_ENTRY(mtx) mtx_contested; /* Next contested mtx. */ /* Fields used only if MUTEX_PROFILING is configured: */ u_int64_t mtx_acqtime; const char *mtx_filename; int mtx_lineno; }; 成员mtx_object的意义和前面介绍lock_object结构所说的一样,其lo_class只能是lock_class_mtx_sleep或lock_class_mtx_spin。 成员mtx_lock是对互斥体锁状态的额外说明,其值如下: #define MTX_RECURSED 0x00000001 /* lock recursed (for MTX_DEF only) */ #define MTX_CONTESTED 0x00000002 /* lock contested (for MTX_DEF only) */ #define MTX_UNOWNED 0x00000004 /* Cookie for free mutex */ 成员mtx_recurse是统计递归调用该锁次数的计数器。 成员mtx_blocked是阻塞于该锁的线程链表的头指针。 成员mtx_contested作为一个实体,链接在thread结构中td_contested指向的该线程的竞争互斥体链表中。 余下的三个成员用于Mutex调试。 2.2 Mutex接口函数 函数void mtx_init(struct mtx *mutex, const char *name, const char *type, int opts): 在互斥体锁能够被使用之前,该锁mutex必须通过mtx_init初始化;入参name和type都是说明性的字符串,用于调试和witness;opts指明了该锁的类型和特性。 函数void mtx_lock_flags(struct mtx *mutex, int flags); 该函数是为目前正在运行的内核线程A申请一个相互排斥的MTX_DEF类型锁,如果正在运行的另一个内核线程B持有该锁,则线程A会放弃CPU资源,直到该锁能被再次使用,通常在等待该锁,线程A会sleep。函数mtx_lock()是其一个特例。 函数void mtx_lock_spin_flags(struct mtx *mutex, int flags); 该函数是为目前正在运行的内核线程A申请一个相互排斥的MTX_SPIN类型锁,如果正在运行的另一个内核线程B持有该锁,则线程A会自旋直到该锁能够被再次使用,在线程A自旋及获得该锁的过程中,对于线程A而言,所有的中断应该屏蔽。函数mtx_lock_spin()是其一个特例。 函数int mtx_trylock_flags(struct mtx *mutex, int flags): 该函数试图获得一个互斥体mutex,如果不能,则立即返回0,如果可以,则获得该锁,并返回非零值。函数mtx_trylock()是其一个特例。 函数void mtx_unlock_flags(struct mtx *mutex, int flags): 该函数释放其持有的相互排斥的MTX_DEF类型锁,如果有一个高优先级的线程正在等待该锁,则释放该锁的线程释放CPU资源以便高优先级的线程能够马上获得该锁,并执行。只有在释放该锁的线程处于临界区时,是例外。函数mtx_unlock()是其一个特例。 函数void mtx_unlock_spin_flags(struct mtx *mutex, int flags): 该函数释放其持有的相互排斥的MTX_ SPIN类型锁,恢复在获得该锁前的中断状态。函数mtx_unlock_spin()是其一个特例。 函数void mtx_destroy(struct mtx *mutex): 该函数用于销毁一个互斥体锁,并释放或重写该锁的所有数据。一个没有初始化(mtx_init)的锁是不应该被销毁的。如果该锁只有一个持有统计数时,是允许被销毁的。当该锁处于递归调用中,或是有另外的进程阻塞于该锁时,该锁是不允许被销毁的。 宏int mtx_initialized(struct mtx *mutex):如果该锁已经初始化了,返回非零,否则返回零。宏int mtx_owned(struct mtx *mutex):如果当前线程持有该锁,返回非零,否则返回零。宏int mtx_recursed(struct mtx *mutex):如果该锁正在被递归使用,则返回非零,否则返回零。 前面,我们提到的mtx_xxx_flag函数系列,其flags入参说明相关锁作是否需要log日志(MTX_QUIET);相应的mtx_xxx函数系列,是不须log日志。 另外,函数void mtx_assert(struct mtx *mutex, int what):是用于诊断的,于互斥体锁机制的原理关系并不紧密,因此,不作过多论述。 宏MTX_SYSINIT是对互斥体锁初始化的一个封装。 2.3 系统对互斥体子系统以及互斥体池的初始化 我们以i386体系为例,在内核加载的很早的时候(在与平台相关的初始化阶段,产生局部描述符表(LDT)之前),互斥体锁子系统就已经初始化了(在init386()函数中调用mutex_init()函数实现)。 /* Setup thread0 so that mutexes work. */ LIST_INIT(&thread0.td_contested); /* Initialize mutexes. */ mtx_init(&Giant, "Giant", NULL, MTX_DEF | MTX_RECURSE); mtx_init(&sched_lock, "sched lock", NULL, MTX_SPIN | MTX_RECURSE); mtx_init(&proc0.p_mtx, "process lock", NULL, MTX_DEF | MTX_DUPOK); mtx_lock(&Giant); 初始化第一个线程的竞争互斥体链表。 初始化Giant全局锁,支持递归调用的MTX_DEF类型。锁Giant被广泛地用在内核的几乎每个角落,保护内核一般性代码。 初始化sched_lock全局锁,支持递归调用的MTX_SPIN类型。锁sched_lock用于保护内核调度队列。 以上两个锁都是可递归的,这里,需要补充说明一下可递归锁的意义,一个非递归锁加锁后,是不能够被再次加锁;而当递归锁遇到这样的情况,首先检查加锁的持有者,如果是当前进程,那么对递归计数器加1,如果不是当前进程,则按非递归锁那样处理;当解锁一个递归锁时,递归计数器减1,当计数器为0时,正真解锁(和非递归锁调用mtx_unlock_xxx()一样)。 初始化proc0变量的内部锁p_mtx,该锁用于保护对proc结构的操作。 最后对Giant加锁。为什么必需在这里就对Giant加锁?因为Giant必须先于其他任何互斥体先获得锁。即:在持有其他锁的情况下,是不可能以非递归的方式获得Giant(即第一次调用mtx_lock(&Giant));在持有Giant的情况下,时可以获得其他互斥体;在持有其他锁的情况下,是可以以递归方式获得Giant。 在内核加载的稍后阶段,会初始化互斥体池(在mi_startup()函数中,通过加载SI_SUB_MTX_POOL子模块实现)。 初始化一个长度为MTX_POOL_SIZE(128)的互斥锁数组mtx_pool_ary。互斥锁池主要用在lock和sx中,参考lockinit(9)和sx(9)。以后涉及到一般性锁和共享/独占锁时,再讨论其用法。 2.4 互斥体加锁/解锁的过程 互斥体锁的初始化和销毁函数非常简单,这里就不做论述了,有兴趣的读者可以参考源码的实现。本节主要论述互斥体锁的加锁/解锁的过程。 MTX_DEF: 获得/释放互斥体锁(MTX_DEF)的原子操作: 获得互斥体锁是通过_obtain_lock宏(展开后是atomic_cmpset_int()嵌入汇编的内联函数)实现的,其思想是:用mtx的成员mtx_lock与MTX_UNOWNED比较,如果相同,则说明没有加锁,则将当前线程(curthread)的地址传给mtx_lock,并返回非零;否则,说明该锁已经被加锁,不做任何操作,返回零。 释放互斥体锁是通过_release_lock宏(展开后是atomic_cmpset_int()嵌入汇编的内联函数)实现的,其思想是:用mtx的成员mtx_lock与当前线程(curthread)比较,如果相同,则说明该锁被当前线程加锁,则将MTX_UNOWNED传给mtx_lock,并返回非零;否则,说明该锁不是被当前线程加锁,不做任何操作,返回零。 如果能正常加锁/解锁,则无需执行后面的函数。我们现在考虑不能加锁/解锁的情况,根据MTX_DEF的特性,当前线程应该试图sleep,放弃其CPU资源,这是分别通过函数_mtx_lock_sleep/_mtx_unlock_sleep实现的。 函数_mtx_lock_sleep():(省略了和锁操作日志、witness相关代码,以线程A为调用者) if ((m->mtx_lock & MTX_FLAGMASK) == (uintptr_t)td) { m->mtx_recurse++; atomic_set_ptr(&m->mtx_lock, MTX_RECURSED); return; } 如果该锁支持递归,而且该锁的目前持有者是当前线程,则递归计数器加1,直接返回。 while (!_obtain_lock(m, td)) { … } 试图获得锁,如果不行则休息一定时间,再次试图获得锁;不停循环,直至获得该锁。我们接下来分析while语句里的代码。 mtx_lock_spin(&sched_lock); if ((v = m->mtx_lock) == MTX_UNOWNED) { mtx_unlock_spin(&sched_lock); ia32_pause(); continue; } 开始准备调度其他线程运行,这需要在sched_lock锁的保护下进行。在对sched_lock加锁的过程中,有可能该锁m已经被解锁了,因此,再次通过_obtain_lock()试图获得该锁。其中ia32_pause()是i386体系的一个暂停操作语句。之所以将mtx_lock记录v值,是因为在随后的操作中,可能mtx_lock会被改变(由调度产生)。 if (v == MTX_CONTESTED) { td1 = TAILQ_FIRST(&m->mtx_blocked); m->mtx_lock = (uintptr_t)td | MTX_CONTESTED; if (td1->td_priority < td->td_priority) td->td_priority = td1->td_priority; mtx_unlock_spin(&sched_lock); return; } 当v的值为MTX_CONTESTED,说明还有其他线程B、C阻塞在该锁上,则将mtx_lock设置为线程A所持有,并改变线程A的优先级,完成线程调度,返回。 if ((v & MTX_CONTESTED) == 0 && !atomic_cmpset_ptr(&m->mtx_lock, (void *)v, (void *)(v | MTX_CONTESTED))) { mtx_unlock_spin(&sched_lock); ia32_pause(); continue; } 如果v值还没有设置MTX_CONTESTED,并且v值和mtx_lock相同,说明该线程正阻塞于该锁,应该设置为MTX_CONTESTED的状态,继续执行if以外的语句。如果v值已经设置了MTX_CONTESTED,则继续执行if以外语句。如果v值还没设置MTX_CONTESTED,并且mtx_lock和v值不相同,暂停操作,再次试图获得该锁。 if (TAILQ_EMPTY(&m->mtx_blocked)) { td1 = mtx_owner(m); LIST_INSERT_HEAD(&td1->td_contested, m, mtx_contested); TAILQ_INSERT_TAIL(&m->mtx_blocked, td, td_lockq); } else { TAILQ_FOREACH(td1, &m->mtx_blocked, td_lockq) if (td1->td_priority > td->td_priority) break; if (td1) TAILQ_INSERT_BEFORE(td1, td, td_lockq); else TAILQ_INSERT_TAIL(&m->mtx_blocked, td, td_lockq); } 当代码执行到这里,说明该锁被其他线程持有,将该线程A插入该m的阻塞队列mtx_blocked中。 d->td_blocked = m; td->td_lockname = m->mtx_object.lo_name; TD_SET_LOCK(td); propagate_priority(td); td->td_proc->p_stats->p_ru.ru_nvcsw++; mi_switch(); mtx_unlock_spin(&sched_lock); 最后,完成一些线程相关的设置,然后调用mi_switch()切换线程。 函数_mtx_unlock_sleep(): if (mtx_recursed(m)) { if (--(m->mtx_recurse) == 0) atomic_clear_ptr(&m->mtx_lock, MTX_RECURSED); return; } 如果该锁m是支持递归的,则递归计数器减1,如果计数器为零,则,清除mtx_lock的设置,返回。 mtx_lock_spin(&sched_lock); td1 = TAILQ_FIRST(&m->mtx_blocked); TAILQ_REMOVE(&m->mtx_blocked, td1, td_lockq); 选出阻塞于该锁的第一个线程A,并将其移出mtx_blocked队列。 if (TAILQ_EMPTY(&m->mtx_blocked)) { LIST_REMOVE(m, mtx_contested); _release_lock_quick(m); } else atomic_store_rel_ptr(&m->mtx_lock, (void *)MTX_CONTESTED); 如果mtx_blocked队列为空,则将mtx_lock设置为MTX_UNOWNED,否则设置为MTX_CONTESTED。 pri = PRI_MAX; LIST_FOREACH(m1, &td->td_contested, mtx_contested) { int cp = TAILQ_FIRST(&m1->mtx_blocked)->td_priority; if (cp < pri) pri = cp; } if (pri > td->td_base_pri) pri = td->td_base_pri; td->td_priority = pri; 遍历当前线程B的所有竞争锁,找出一个优先级最高的(td_priority最小),并将该值赋给pri变量。并调整线程B的活动优先级。 td1->td_blocked = NULL; TD_CLR_LOCK(td1); if (!TD_CAN_RUN(td1)) { mtx_unlock_spin(&sched_lock); return; } setrunqueue(td1); 如果线程A可以运行,则加入可运行线程队列,等待分时调度。如果不可运行,说明线程A还在等待别的资源。 if (td->td_critnest == 1 && td1->td_priority < pri) { td->td_proc->p_stats->p_ru.ru_nivcsw++; mi_switch(); } mtx_unlock_spin(&sched_lock); 如果线程B进入临界区,并且线程A的优先级高于线程B,则通过mi_switch()做上下文切换。 MTX_SPIN: 对MTX_SPIN类型的锁加锁,展开mtx_lock_spin()得到: critical_enter(); if (!_obtain_lock((mp), (tid))) { if ((mp)->mtx_lock == (uintptr_t)(tid)) (mp)->mtx_recurse++; else _mtx_lock_spin((mp), (opts), (file), (line)); } 假设该函数的调用者是线程A,通过critical_enter()函数,对当前线程A的td_critnest成员加1,说明线程A进入临界区。如果能获得该锁,则成功返回。如果不能:如果该锁的持有者是线程A,则将递归计数器加1,如果不是,则调用_mtx_lock_spin函数处理。下面讨论该函数。 该函数是一个for(;循环主体,我们讨论for语句里的内容,即自旋锁的自旋主体。 if (_obtain_lock(m, curthread)) break; critical_exit(); 试图过得该锁,如果可以,停止自旋。如果不能,继续自旋。通过critical_exit()函数退出临界区,这样允许我们在自旋的时候,如果有中断产生,中断能够有机会处理。 while (m->mtx_lock != MTX_UNOWNED) { if (i++ < 10000000) { ia32_pause(); continue; } if (i < 60000000) DELAY(1); else panic("spin lock %s held by %p for > 5 seconds", m->mtx_object.lo_name, (void *)m->mtx_lock); ia32_pause(); } critical_enter(); 自旋,如果,自旋时间大于5秒,则认为出错。如果mtx_lock的值为MTX_UNOWNED,则说明该锁已被释放。因此,可以进入临界区,再次试图获得该锁。 对MTX_SPIN类型的锁加锁,展开mtx_unlock_spin()得到: if (mtx_recursed((mp))) (mp)->mtx_recurse--; else _release_lock_quick((mp)); critical_exit(); 如果该锁递归值不为0,则递归计数器减1。如果为零,则将该锁的mtx_lock设置为MTX_UNOWNED。最后退出临界区。 3 条件变量(Condition variables) 条件变量构建于互斥体之上,和互斥体联合使用,等待条件发生。我们可以理解为基于锁的msleep机制(参考:msleep(9))。 3.1 条件变量数据结构 struct cv { struct cv_waitq cv_waitq; struct mtx *cv_mtx; const char *cv_description; }; 成员cv_waitq是阻塞于该条件变量的线程链表表头;成员cv_mtx应该指向通过函数系列cv_*wait*()传入的参数(mtx类型指针),目前只有在定义了INVARIANTS的时候有意义;成员cv_description是该条件变量的说明性文字。 3.2 条件变量的接口函数 函数void cv_init(struct cv *cvp, const char *desc): 通过该函数初始化条件变量cvp,其说明性文字是通过desc指针获得。 TAILQ_INIT(&cvp->cv_waitq); cvp->cv_mtx = NULL; cvp->cv_description = desc; 这段初始化的代码十分简单。 函数void cv_destroy(struct cv *cvp): 通过该函数销毁一个条件变量cvp。 KASSERT(cv_waitq_empty(cvp), ("%s: cv_waitq non-empty", __func__)); 通过宏cv_waitq_empty判断阻塞于该条件变量的线程队列cv_waitq是否为空。如果不为空则出错。 当线程等待条件变量发生时,阻塞等待;当条件变量满足后,通过信号通知。用于阻塞等待的函数系列:cv_wait()、cv_wait_sig()、cv_timedwait()和cv_timedwait_sig()。 函数void cv_wait(struct cv *cvp, struct mtx *mp): 该函数用于等待一个条件变量。当等待条件变量时,将当前线程放置在该条件变量的等待队列(cv_waitq)中,然后挂起该线程。 mtx_lock_spin(&sched_lock); CV_WAIT_VALIDATE(cvp, mp); DROP_GIANT(); mtx_unlock(mp); cv_waitq_add(cvp, td); cv_switch(td); mtx_unlock_spin(&sched_lock); PICKUP_GIANT(); mtx_lock(mp); 这段代码简单的说,就是在调度自旋锁的保护下,将当前线程挂起,并做上下文切换。宏CV_WAIT_VALIDATE的意义:如果该条件变量cvp目前没有阻塞线程,则将成员cv_mtx指向互斥体mp;如果有阻塞线程,而且其cv_mtx指向的互斥体和mp不一样,则异常出错。 通过cv_waitq_add()函数,将td线程加入cvp的阻塞队列中,并完成td的相关设置。通过cv_switch()函数完成td的相关工作,并调用mi_switch()做上下文切换。之所以要对mp解锁,是给阻塞于该mp的其他线程运行的机会。在等待到条件变量,并且轮到自己的时间片,则善后处理,并对mp再次加锁。 宏DROP_GIANT和PICKUP_GIANT是有关Giant互斥体操作的一对宏,必须联合使用。这对宏的作用是:在该对宏的代码中,存在主动上下文切换,因此,需要先把Giant解锁,在处理完中间代码后,再将Giant恢复到DROP_GIANT宏之前的状态。其宏展开如下: do { int _giantcnt; for (_giantcnt = 0; mtx_owned(&Giant); _giantcnt++) mtx_unlock(&Giant) 我们中间代码,关于这对宏,我省略了witness和断言的相关代码,这样更清晰些。 while (_giantcnt--) mtx_lock(&Giant); } while (0) 函数int cv_wait_sig(struct cv *cvp, struct mtx *mp): 该函数同样用于等待条件变量,当等待条件变量时,允许信号的中断。给出其主要代码,和cv_wait()函数相同的部分就不讨论了。 mtx_lock_spin(&sched_lock); CV_WAIT_VALIDATE(cvp, mp); DROP_GIANT(); mtx_unlock(mp); cv_waitq_add(cvp, td); sig = cv_switch_catch(td); mtx_unlock_spin(&sched_lock); PROC_LOCK(p); if (sig == 0) sig = cursig(td); /* XXXKSE */ if (sig != 0) { if (SIGISMEMBER(p->p_sigacts->ps_sigintr, sig)) rval = EINTR; else rval = ERESTART; } PROC_UNLOCK(p); if (p->p_flag & P_WEXIT) rval = EINTR; PICKUP_GIANT(); mtx_lock(mp); 在这里,是通过cv_switch_catch()调用,决定是否需要上下文切换,在该函数里,信号中断是允许的。在宏PROC_LOCK(进程结构互斥体保护机制)和PROC_UNLOCK的保护下,会再给一次机会给当前线程处理信号中断。信号值会作为cv_wait_sig()函数的返回值,由调用者决定如何处理。 另外两个函数:函数int cv_timedwait(struct cv *cvp, struct mtx *mp, int timo)和函数int cv_timedwait_sig(struct cv *cvp, struct mtx *mp, int timo)和前面两个函数功能分别对应,只不过加入了等待的最大时间(timo/hz秒)的限制。 接下来,我们讨论当条件变量满足时,信号通知的相关内容,涉及函数cv_signal()和cv_broadcast()。 函数void cv_signal(struct cv *cvp): mtx_lock_spin(&sched_lock); if (!TAILQ_EMPTY(&cvp->cv_waitq)) { CV_SIGNAL_VALIDATE(cvp); cv_wakeup(cvp); } mtx_unlock_spin(&sched_lock); 如果阻塞于该条件变量的线程队列为空,则不用做任何事。如果不为空,通过cv_wakeup()函数唤醒线程队列的第一个线程。由于该函数的处理主体是cv_wakeup()函数,下面讨论它的实现。 td = TAILQ_FIRST(&cvp->cv_waitq); cv_waitq_remove(td); TD_CLR_SLEEPING(td); setrunnable(td); 得到阻塞于该条件变量的第一个线程,将其移出阻塞队列,并修改其状态值,通过调用函数setrunnable(),将该线程加入线程的运行队列中,等待调度。 函数 void cv_broadcast(struct cv *cvp): mtx_lock_spin(&sched_lock); CV_SIGNAL_VALIDATE(cvp); while (!TAILQ_EMPTY(&cvp->cv_waitq)) cv_wakeup(cvp); mtx_unlock_spin(&sched_lock); 该函数唤醒阻塞于该条件变量的所有线程。 在前面,我们讨论当线程阻塞于某个条件变量,涉及到两个上下文切换函数的调用:cv_switch()和cv_switch_catch()。这两个函数主要涉及内核调度和线程状态转换等知识点,和锁关系不大,这里就讨论了。 4共享/独占锁(Share/exclusive locks) 共享/独占锁(sx)是非常有效的读写锁。之所以称之为sx是因为考虑以后添加额外的功能,不仅仅是读写功能,但是目前仅用于读写控制。由于sx锁机制的花费比互斥体高许多,因此必须谨慎使用。 4.1 共享/独占锁的数据结构 struct sx { struct lock_object sx_object; /* Common lock properties. */ struct mtx *sx_lock; /* General protection lock. */ int sx_cnt; /* -1: xlock, > 0: slock count. */ struct cv sx_shrd_cv; /* slock waiters. */ int sx_shrd_wcnt; /* Number of slock waiters. */ struct cv sx_excl_cv; /* xlock waiters. */ int sx_excl_wcnt; /* Number of xlock waiters. */ struct thread *sx_xholder; /* Thread presently holding xlock. */ }; 成员sx_object说明该共享/独占锁的一般性属性,正如我们第一节提及的,其lock_class是lock_class_sx。成员sx_lock是互斥体锁指针,用于保护对该共享/独占锁成员的操作。成员sx_cnt是锁的统计,初始化状态是0,如果是-1,则说明该锁是独占锁;如果是大于0的值,则说明该锁是共享锁,并统计了其引用的次数。成员sx_shrd_cv是用于共享锁的条件变量控制,相应的成员sx_shrd_wcnt记录了等待该共享锁的线程数目。成员sx_excl_cv是用于独占锁的条件变量控制,相应的成员sx_excl_wcnt记录了等待该独占锁的线程数目。成员sx_xholder指向了当前支持有该独占锁的线程。 4.2 共享/独占锁的接口函数 函数void sx_init(struct sx *sx, const char *description): lock = &sx->sx_object; bzero(sx, sizeof(*sx)); lock->lo_class = &lock_class_sx; lock->lo_type = lock->lo_name = description; lock->lo_flags = LO_WITNESS | LO_RECURSABLE | LO_SLEEPABLE | LO_UPGRADABLE; sx->sx_lock = mtx_pool_find(sx); sx->sx_cnt = 0; cv_init(&sx->sx_shrd_cv, description); sx->sx_shrd_wcnt = 0; cv_init(&sx->sx_excl_cv, description); sx->sx_excl_wcnt = 0; sx->sx_xholder = NULL; 共享/独占锁是通过该函数初始化的,这段代码非常清晰,唯一需要说明的是,该结构的内部互斥体sx_lock是从互斥体池中获得,参考第二节。 函数void sx_destroy(struct sx *sx): sx->sx_lock = NULL; cv_destroy(&sx->sx_shrd_cv); cv_destroy(&sx->sx_excl_cv); 通过该函数销毁一个共享/独占锁。因为sx_lock是从互斥体池中获得,所以只需断掉链接,就可以了。 共享锁的管理主要由三个函数组成:sx_slock()、sx_try_slock()和sx_sunlock()。 函数void sx_slock(struct sx *sx): mtx_lock(sx->sx_lock); while (sx->sx_cnt < 0) { sx->sx_shrd_wcnt++; cv_wait(&sx->sx_shrd_cv, sx->sx_lock); sx->sx_shrd_wcnt--; } sx->sx_cnt++; mtx_unlock(sx->sx_lock); 该函数申请一个共享锁sx,在互斥体sx_lock的保护下,如果sx_cnt小于0,说明该sx正被独占,因此共享申请等待计数器sx_shrd_wcnt加1,调用cv_wait函数,等待共享锁的条件变量满足,当条件满足后,共享申请等待计数器sx_shrd_wcnt减1。获得共享锁后,共享锁持有者计数器sx_cnt加1。 函数int sx_try_slock(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt >= 0) { sx->sx_cnt++; mtx_unlock(sx->sx_lock); return (1); } else { return (0); } 该函数试图获得共享锁,如果可以则将sx_cnt计数器加1,返回非零值,表示成功;如果不能,则说明该锁正被独占,直接返回0,表示失败。 函数void sx_sunlock(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt--; if (sx->sx_excl_wcnt > 0) { if (sx->sx_cnt == 0) cv_signal(&sx->sx_excl_cv); } else if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); mtx_unlock(sx->sx_lock); 该函数释放共享锁,首先将共享持有者计数器sx_cnt减1;如果sx_excl_wcnt大于0,说明现在有申请独占锁的线程阻塞,如果sx_cnt为0,说明目前该锁完全自由,即:没有被共享占有,则向独占等待条件变量sx_excl_cv发一个信号,通知它可以试图申请独占锁。在没有独占锁申请的情况下,如果有共享锁申请等待,即sx_shrd_wcnt大于0,则向共享等待条件变量sx_shrd_cv发一个广播信号,唤醒所有等待在该条件变量的线程。 独占锁的管理主要由三个函数组成:sx_xlock()、sx_try_xlock()和sx_xunlock()。 函数void sx_xlock(struct sx *sx): mtx_lock(sx->sx_lock); while (sx->sx_cnt != 0) { sx->sx_excl_wcnt++; cv_wait(&sx->sx_excl_cv, sx->sx_lock); sx->sx_excl_wcnt--; } sx->sx_cnt--; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); 该函数申请一个独占锁,在互斥体sx_lock的保护下,如果sx_cnt不等于0,则说明该锁正被独占或者共享占有,因此,将独占等待计数器sx_excl_wcnt加1,调用cv_wait()函数等待条件变量sx_excl_cv满足条件。当条件满足后,独占等待计数器sx_excl_wcnt减1,sx_cnt计数器减1,说明现在被独占,将sx_xholder成员指向当前申请独占锁的线程地址。 函数int sx_try_xlock(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt == 0) { sx->sx_cnt--; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); return (1); } else { mtx_unlock(sx->sx_lock); return (0); } 该函数试图获得独占锁,如果可以则将sx_cnt计数器为0,则说明该锁是自由的,可以申请独占锁,返回非零值,表示成功;如果不能,则说明该锁正被独占或是共享,直接返回0,表示失败。 函数void sx_xunlock(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt++; sx->sx_xholder = NULL; if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); else if (sx->sx_excl_wcnt > 0) cv_signal(&sx->sx_excl_cv); mtx_unlock(sx->sx_lock); 该函数释放独占锁,将sx_cnt加1,恢复到自由状态(sx_cnt为0),断开sx_xholder的指针链接。如果有共享锁申请等待队列(sx_shrd_wcnt大于0),则发一个广播信号,唤醒所有等待在条件变量sx_shrd_cv上的线程;如果没有共享锁申请,则检查是否有其他独占锁申请(sx_excl_wcnt大于0),如果有,则发一个信号,唤醒这个等待队列的第一个线程。 另外,共享/独占锁支持相互的转换,即共享锁可以转换为独占锁,独占锁也可以转换为共享锁。这是通过函数sx_try_upgrade()和sx_downgrade()实现的。 函数int sx_try_upgrade(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt == 1) { sx->sx_cnt = -1; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); return (1); } else { mtx_unlock(sx->sx_lock); return (0); } 该函数实现试图将共享锁转换成独占锁。在sx_lock互斥体的保护下,如果当前有且仅有一个共享锁,则可以转换,转换成功后,返回1,表示成功;否则返回0,表示失败。 函数void sx_downgrade(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt = 1; sx->sx_xholder = NULL; if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); mtx_unlock(sx->sx_lock); 该函数实现将独占锁转换成共享锁。因为在独占的情况下,是不可能有共享的情况存在。因此,直接将sx_cnt变为1,说明现在有一个共享锁,断开独占锁的线程链接。如果还有其他共享锁申请,由于该锁原来是独占而被阻塞,因此,需要发一个广播信号,通知阻塞线程队列,现在可以共享。 5 信号灯(semaphore) 信号灯统计机制为访问一组资源(资源池)提供了一种同步机制。不同于互斥体,信号灯并没有持有者的概念,因此,信号灯常用于一个线程需要一个资源,而另一个线程需要释放该资源的情况。记录资源(释放资源provider,该资源的信号灯加1)总是成功,而等待资源(申请资源consumer,该资源信号灯减1)只有在信号灯大于0的情况才能成功。信号灯的管理比互斥体复杂许多,因此花费更大,所以其效率不高。 5.1 信号灯数据结构 struct sema { struct mtx sema_mtx; /* General protection lock. */ struct cv sema_cv; /* Waiters. */ int sema_waiters; /* Number of waiters. */ int sema_value; /* Semaphore value. */ }; 成员sema_mtx用于保护对sema结构的操作。成员sema_cv是等待信号灯的条件变量,成员sema_waiters统计等待信号灯的线程数目。成员sema_value记录信号灯的值。 5.2 信号灯的函数 函数void sema_init(struct sema *sema, int value, const char *description): bzero(sema, sizeof(*sema)); mtx_init(&sema->sema_mtx, description, "sema backing lock", MTX_DEF | MTX_NOWITNESS | MTX_QUIET); cv_init(&sema->sema_cv, description); sema->sema_value = value; 该函数用于初始化一个信号灯。初始化该sema的内部互斥体sema_mtx和条件变量sema_cv。而信号灯计数器sema_value的值则是通过入参value获得。 函数void sema_destroy(struct sema *sema): mtx_destroy(&sema->sema_mtx); cv_destroy(&sema->sema_cv); 该函数用于销毁一个信号灯。该函数包含销毁互斥体sema_mtx和条件变量sema_cv。 函数void sema_post(struct sema *sema): mtx_lock(&sema->sema_mtx); sema->sema_value++; if (sema->sema_waiters && sema->sema_value > 0) cv_signal(&sema->sema_cv); mtx_unlock(&sema->sema_mtx); 该函数用于增加信号灯计数器sema_value的值,如果目前有其他线程阻塞于该信号灯,并且该信号灯计数器大于0。则发一个信号,唤醒等待该信号灯队列的第一个线程。 函数void sema_wait(struct sema *sema): mtx_lock(&sema->sema_mtx); while (sema->sema_value == 0) { sema->sema_waiters++; cv_wait(&sema->sema_cv, &sema->sema_mtx); sema->sema_waiters--; } sema->sema_value--; mtx_unlock(&sema->sema_mtx); 该函数申请得到一个信号灯。如果该信号灯的计数器sema_value为0,则该信号灯代表的资源不可再申请,则将该线程加入sema_cv条件变量的等待队列中,同时将该信号灯的等待计数器加1。当得到该信号灯后,将等待计数器sema_waiters减1,同时将该信号灯计数器sema_value减1,说明该信号灯代表的资源已经被申请了一次。 函数 int sema_timedwait(struct sema *sema, int timo): mtx_lock(&sema->sema_mtx); for (timed_out = 0; sema->sema_value == 0 && timed_out == 0 { sema->sema_waiters++; timed_out = cv_timedwait(&sema->sema_cv, &sema->sema_mtx, timo); sema->sema_waiters--; } if (sema->sema_value > 0) { sema->sema_value--; ret = 1; } else { ret = 0; } mtx_unlock(&sema->sema_mtx); 该函数和上一个函数功能一样,只不过它有等待时间的限定。如果成功获得该信号灯,则返回1。如果不成功,则返回0。 函数int sema_trywait(struct sema *sema): mtx_lock(&sema->sema_mtx); if (sema->sema_value > 0) { sema->sema_value--; ret = 1; } else { ret = 0; } mtx_unlock(&sema->sema_mtx); 该函数试图获得一个信号灯,如果不能,立即返回0,能则返回1。 函数int sema_value(struct sema *sema): mtx_lock(&sema->sema_mtx); ret = sema->sema_value; mtx_unlock(&sema->sema_mtx); 该函数用于获得信号灯的计数器值。因为信号灯的值随时有可能变化,因此,得到sema_value的值必须在sema_mtx的保护下。 6 lockmgr锁机制(lock) lockmgr锁提供了多重共享锁机制,支持共享锁向独占锁的转变功能。其等待lockmgr锁可用的过程,采用msleep机制(参考 msleep(9)) - 基于事件的进程阻塞机制。因此,lockmgr锁实现所谓的加锁/解锁,是通过lockmgr锁的上行(upgrade)/下行(downgrade)操作,这是前面提及的锁机制的策略变化。和前面锁机制不同的另一个方面是,其调用者是进程。 6.1 lockmgr锁的数据结构 我们略过DEBUG相关的调试成员。 struct lock { struct mtx *lk_interlock; /* lock on remaining fields */ u_int lk_flags; /* see below */ int lk_sharecount; /* # of aclearcase/" target="_blank" >ccepted shared locks */ int lk_waitcount; /* # of processes sleeping for lock */ short lk_exclusivecount; /* # of recursive exclusive locks */ short lk_prio; /* priority at which to sleep */ const char *lk_wmesg; /* resource sleeping (for tsleep) */ int lk_timo; /* maximum sleep time (for tsleep) */ pid_t lk_lockholder; /* pid of exclusive lock holder */ struct lock *lk_newlock; /* lock taking over this lock */ } 成员lk_interlock用户保护lock结构内部成员的操作,我们稍后结合lockmgr锁的初始化再讨论。 成员lk_flags在初始化的时候代表了该锁的类型,在通过lockmgr管理该锁时,也可以作为入参表示其期望的操作。 #define LK_TYPE_MASK 0x0000000f /* type of lock sought */ #define LK_SHARED 0x00000001 /* shared lock */ #define LK_EXCLUSIVE 0x00000002 /* exclusive lock */ #define LK_UPGRADE 0x00000003 /* shared-to-exclusive upgrade */ #define LK_EXCLUPGRADE 0x00000004 /* first shared-to-exclusive upgrade */ #define LK_DOWNGRADE 0x00000005 /* exclusive-to-shared downgrade */ #define LK_RELEASE 0x00000006 /* release any type of lock */ #define LK_DRAIN 0x00000007 /* wait for all lock activity to end */ #define LK_EXCLOTHER 0x00000008 /* other process holds lock */ LK_SHARED:一个进程获得一个共享lockmgr锁,如果该进程独占持有该lockmgr锁,则需要下行处理,使其变为共享锁。 LK_EXCLUSIVE:当lockmgr锁被独占时,是不允许共享申请的。如果现在有共享存在,则需要将其上行处理为独占。通常仅有一个独占状态存在,但是,一个持有独占lockmgr锁的进程,在其设置了LK_CANRECURSE标志的情况下,可以申请额外的独占锁。 LK_UPGRADE:一个持有共享状态的lockmgr锁可以通过上行处理,将共享态变为独占态。我们需要在意的是,在上行处理的过程中,其它进程也同样有机会得到独占机会。 LK_EXCLUPGRADE:和LK_UPGRADE意义一样,只不过在上行过程中,其它进程不可能获得独占机会。 LK_DOWNGRADE:一个独占持有lockmgr锁的进程,可以通过下行处理,将其转换为共享状态。如果该进程持有递归独占锁,那么所有的独占状态都需要下行处理。 LK_RELEASE:释放一个lockmgr锁实体。 LK_DRAIN:这用于等待该锁所有活动状态结束,并标识为退役,一般用在释放锁之前。 LK_EXCLOTHER:一般用在函数lockstatus()返回里,说明该锁正被其他进程独占。 关于这些标识,我们在后面结合函数更详细的说明。 成员lk_sharecount统计该锁已被接受的共享状态的数目。 成员lk_waitcount是阻塞于该锁的进程数目。 成员lk_exclusivecount说明了递归独占该锁的数目。 成员lk_prio,如果某个进程不能获得该锁,而不得不sleep,那么它应该sleep在什么优先级的队列上,是由该成员指出的。 成员lk_wmesg是sleep的说明性文字。 成员lk_timo说明了该进程最大的sleep时间。 成员lk_lockholder,如果该锁被某个进程独占,该成员说明了该进程ID。 成员lk_newlock,如果该锁被其他锁托管,则该成员指向托管锁的地址。 6.2 lockmgr锁的函数接口 函数void lockinit(struct lock *lkp, int prio, const char *wmesg, int timo, int flags): 初始化一个lockmgr锁。 if (lock_mtx_valid == 0) { mtx_init(&lock_mtx, "lockmgr", NULL, MTX_DEF); lock_mtx_valid = 1; } 这段代码的目的是初始化全局互斥体lock_mtx。通常,该段代码永远不会调用。因为,在系统启动时,通过宏SYSINIT(lmgrinit, SI_SUB_LOCK, SI_ORDER_FIRST, lockmgr_init, NULL)注册lock子系统时,在lockmgr_init()函数中已经初始化了lock_mtx,并且将lock_mtx_valid设置为1了。 if (mtx_pool_valid) { mtx_lock(&lock_mtx); lkp->lk_interlock = mtx_pool_alloc(); mtx_unlock(&lock_mtx); } else { lkp->lk_interlock = &lock_mtx; } 通过这段代码,我们可以明白lock_mtx的作用,如果系统支持互斥体池,则lock_mtx用于保护从互斥体池中获得互斥体A的过程,由互斥体A来保护该lockmgr锁的操作;如果系统不支持互斥体,由lock_mtx充当lockmgr锁的内部成员操作的保护机制。 lkp->lk_flags = (flags & LK_EXTFLG_MASK); lkp->lk_sharecount = 0; lkp->lk_waitcount = 0; lkp->lk_exclusivecount = 0; lkp->lk_prio = prio; lkp->lk_wmesg = wmesg; lkp->lk_timo = timo; lkp->lk_lockholder = LK_NOPROC; lkp->lk_newlock = NULL; 初始化lock结构的其他成员,lk_flags 、lk_prio、lk_wmesg和lk_timo由入参指定。 函数void lockdestroy(struct lock *lkp): 该函数用于销毁一个lockmgr锁,但是目前没有任何操作。 函数int lockcount(struct lock *lkp): mtx_lock(lkp->lk_interlock); count = lkp->lk_exclusivecount + lkp->lk_sharecount; mtx_unlock(lkp->lk_interlock); 该函数用于统计目前共享和独占持有该锁的数目。 函数int lockstatus(struct lock *lkp, struct thread *td): mtx_lock(lkp->lk_interlock); if (lkp->lk_exclusivecount != 0) { if (td == NULL || lkp->lk_lockholder == td->td_proc->p_pid) lock_type = LK_EXCLUSIVE; else lock_type = LK_EXCLOTHER; } else if (lkp->lk_sharecount != 0) lock_type = LK_SHARED; mtx_unlock(lkp->lk_interlock); 该函数用于得到目前锁的状态。如果存在递归调用独占状态,则检查入参td是否为空,如果为空,或是不为空,并且该线程的进程ID和lk_lockholder一样,则说明该锁被当前进程占有;否则该锁被其它进程独占。如果不存在独占,如果共享统计不为0,则说明该锁被共享占有。如果什么都不是,则说明该锁是自由的,返回0。 函数int lockmgr(struct lock *lkp, u_int flags, struct mtx *interlkp, struct thread *td): 该函数是lockmgr锁里最关键、最复杂的函数,是lock机制管理的核心。 error = 0; if (td == NULL) pid = LK_KERNPROC; else pid = td->td_proc->p_pid; mtx_lock(lkp->lk_interlock); if (flags & LK_INTERLOCK) { mtx_unlock(interlkp); } 这段代码根据td的入参,设定pid的值,根据输入的flags是否含有LK_INTERLOCK值,决定是否需要输入的互斥体interlkp的保护。如果指明了LK_INTERLOCK,则说明就用lk_interlock保护就够了。 extflags = (flags | lkp->lk_flags) & LK_EXTFLG_MASK; 该语句用于分离lock的额外标志。这里有必要介绍一下lock的额外标志: #define LK_NOWAIT 0x00000010 /* do not sleep to await lock */ #define LK_SLEEPFAIL 0x00000020 /* sleep, then return failure */ #define LK_CANRECURSE 0x00000040 /* allow recursive exclusive lock */ #define LK_REENABLE 0x00000080 /* lock is be reenabled after drain */ #define LK_NOPAUSE 0x01000000 /* no spinloop */ #define LK_TIMELOCK 0x02000000 /* use lk_timo, else no timeout */ LK_NOWAIT、LK_SLEEPFAIL和LK_CANRECURSE通常实在lockinit()时设置的。当一个lockmgr锁已经设置了LK_DRAIN 后,说明该锁准备释放,但是可以通过LK_REENABLE,重新使用该锁。LK_NOPAUSE说明该锁不支持自旋等待。LK_TIMELOCK说明该锁支持等待时间的设定,具体时间是通过lk_timo获得。 函数lockmgr()完成预处理后,主要是通过一个很大的switch (flags & LK_TYPE_MASK)语句来处理不同的管理要求。下面我们根据入参flags含有的标识来说明: LK_SHARED: if (lkp->lk_lockholder != pid) { lockflags = LK_HAVE_EXCL; mtx_lock_spin(&sched_lock); if (td != NULL && !(td->td_flags & TDF_DEADLKTREAT)) lockflags |= LK_WANT_EXCL | LK_WANT_UPGRADE; mtx_unlock_spin(&sched_lock); error = acquire(&lkp, extflags, lockflags); if (error) break; sharelock(lkp, 1); break; } sharelock(lkp, 1); LK_DOWNGRADE: sharelock(lkp, lkp->lk_exclusivecount); lkp->lk_exclusivecount = 0; lkp->lk_flags &= ~LK_HAVE_EXCL; lkp->lk_lockholder = LK_NOPROC; if (lkp->lk_waitcount) wakeup((void *)lkp); break; 这段代码由两个部分组成,如果是LK_SHARED的标识,则需要执行这正段代码;如果是LK_DOWNGRADE,则只用执行后半段。 我们考虑LK_SHARED的情况,函数sharelock(struct lock *lkp, int incr)就是将LK_SHARE_NONZERO赋给该lkp的lk_flags成员,并将lk_sharecount加incr。 在什么情况下,lkp->lk_lockholder != pid不成立呢? 只有在该锁lkp正被当前进程独占的情况下,lkp->lk_lockholder才等于pid。这时,根据LK_SHARED的意义,我们不仅需要调用增加这次共享请求的计数(sharelock函数),而且需要下行处理,故而需要执行LK_DOWNGRADE的功能。将所有独占态改变为共享态,并试图唤醒所有阻塞于该lkp的所有进程。 当上述判断成立,则有两种情况:A、该锁本就是共享态;B、该锁被其他进程独占。无论是哪种情况,都涉及到lockmgr锁的内部函数acquire(),因此,有必要在这里插入对该函数的分析。 函数static int acquire(struct lock **lkpp, int extflags, int wanted): if ((extflags & LK_NOWAIT) && (lkp->lk_flags & wanted)) { return EBUSY; } if (((lkp->lk_flags | extflags) & LK_NOPAUSE) == 0) { error = apause(lkp, wanted); if (error == 0) return 0; } 如果该锁的额外标识了不必sleep来等待获得该锁(LK_NOWAIT),并且该锁lk_flags标志含有想申请的值(wanted),则直接返回EBUSY,申请失败。 如果所有的标志都没有含有LK_NOPAUSE(不支持自旋等待),则调用apause()函数在解锁(lk_interlock)的情况下等待一段时间直到lk_flags不再含有wanted的标志,成功返回0,失败返回1。 我们考虑情况A,在这里就会成功返回。再考虑在lockmgr()函数里,会调用sharelock()完成LK_SHARED的设置。 函数acquire()余下的部分是等待该锁可用(lk_flags不再含有wanted的标志)。所有的等待操作在splhigh的保护下进行(参考 splhigh(9))。下面考虑等待主体部分。 lkp->lk_flags |= LK_WAIT_NONZERO; lkp->lk_waitcount++; error = msleep(lkp, lkp->lk_interlock, lkp->lk_prio, lkp->lk_wmesg, ((extflags & LK_TIMELOCK) ? lkp->lk_timo : 0)); 添加LK_WAIT_NONZERO标志到lk_flags中,等待计数器lk_waitcount加1,调用msleep等待lkp满足条件。 if (lkp->lk_waitcount == 1) { lkp->lk_flags &= ~LK_WAIT_NONZERO; lkp->lk_waitcount = 0; } else { lkp->lk_waitcount--; } if (error) { splx(s); return error; } 当条件满足后,如果lk_waitcount为1,清除LK_WAIT_NONZERO标志,lk_waitcount计为0。如果lk_waitcount不为1,则只是lk_waitcount减1(有可能别的进程也在等待)。如果出错,则返回出错值。 if (extflags & LK_SLEEPFAIL) { splx(s); return ENOLCK; } 如果设置了LK_SLEEPFAIL,则说明只要sleep了,就算出错,返回ENOLOCK。 if (lkp->lk_newlock != NULL) { mtx_lock(lkp->lk_newlock->lk_interlock); mtx_unlock(lkp->lk_interlock); if (lkp->lk_waitcount == 0) wakeup((void *)(&lkp->lk_newlock)); *lkpp = lkp = lkp->lk_newlock; } 如果有其他锁托管了该锁,则做转换。 现在,我们已经了解了acquire()函数的功能,考虑情况B,则根据acquire()的返回,如果没出错,则说明,该锁的独占已经释放,可以共享占有,同样调用sharelock()函数实现共享。我们接下来分析lockmgr()函数switch中的其它情况。 根据LK_EXCLUPGRADE和LK_UPGRADE的区别,就是前者在上行处理时,多了独占的考虑。因此有一段预处理,如下: LK_EXCLUPGRADE: if (lkp->lk_flags & LK_WANT_UPGRADE) { shareunlock(lkp, 1); error = EBUSY; break; } 首先判断lk_flags是否含有LK_WANT_UPGRADE标识,如果含有说明有其他进程试图上行处理该锁,那么调用shareunlock()函数释放该锁,说明不能该进程不能LK_EXCLUPGRADE,返回EBUSY。如果该进程可以独占上行处理,余下的处理和一般上行处理一致。 LK_UPGRADE: if ((lkp->lk_lockholder == pid) || (lkp->lk_sharecount <= 0)) panic("lockmgr: upgrade exclusive lock"); shareunlock(lkp, 1); 如果lk_lockholder等于pid,说明该锁被独占,或者是lk_sharecount小于0,说明该锁没有被共享占有,则说明出错。不出错,说明当前至少有一个共享锁,为了上行处理,先对该锁的共享占有解锁一次。 if ((extflags & LK_NOWAIT) && ((lkp->lk_flags & LK_WANT_UPGRADE) || lkp->lk_sharecount > 1)) { error = EBUSY; break; } 如果额外标志包含LK_NOWAIT(不支持sleep等待该锁),并且lk_flags标志包含LK_WANT_UPGRADE(说明有其他进程正在上行处理)或者lk_sharecount大于1(说明不止一个共享状态),那么认为不能上行处理,返回EBUSY。 if ((lkp->lk_flags & LK_WANT_UPGRADE) == 0) { lkp->lk_flags |= LK_WANT_UPGRADE; error = acquire(&lkp, extflags, LK_SHARE_NONZERO); lkp->lk_flags &= ~LK_WANT_UPGRADE; if (error) break; lkp->lk_flags |= LK_HAVE_EXCL; lkp->lk_lockholder = pid; if (lkp->lk_exclusivecount != 0) panic("lockmgr: non-zero exclusive count"); lkp->lk_exclusivecount = 1; break; } 如果lk_falgs不包含LK_WANT_UPGRADE,则目前没有其他进程上行,本进程可以设置LK_WANT_UPGRADE,说明本进程正在上行处理。然后,调用acquire()函数等待该锁可用,成功等待后,设置LK_HAVE_EXCL(说明获得独占态的锁),设置lk_lockholder。最后判断lk_exclusivecount是否为0,不为0,则出现异常错误。为0,则完全成功返回。 就LK_EXCLUPGRADE而言,如果能获得,执行到这里,就可以成功。 if ( (lkp->lk_flags & (LK_SHARE_NONZERO|LK_WAIT_NONZERO)) == LK_WAIT_NONZERO) wakeup((void *)lkp); 程序走到这一步,说明别的进程B在上行处理,通过该语句判断说明,本进程是最后一个持有共享态的该锁,由于在前面已经解锁了,因此可以唤醒进程B继续上行处理。 就LK_UPGRADE操作而言,如果执行到这一步,说明别的进程正在上行处理。无论别的进程在做什么,LK_UPGRADE余下的操作和LK_EXCLUSIVE操作并无二样。 LK_EXCLUSIVE: if (lkp->lk_lockholder == pid && pid != LK_KERNPROC) { if ((extflags & (LK_NOWAIT | LK_CANRECURSE)) == 0) panic("lockmgr: locking against myself"); if ((extflags & LK_CANRECURSE) != 0) { lkp->lk_exclusivecount++; break; } } 如果该锁已经被本进程独占持有,而且该锁支持LK_CANRECURSE,只须将lk_exclusivecount计数器加1。 if ((extflags & LK_NOWAIT) && (lkp->lk_flags & (LK_HAVE_EXCL | LK_WANT_EXCL | LK_WANT_UPGRADE | LK_SHARE_NONZERO))) { error = EBUSY; break; } 如果该进程不支持等待该锁(LK_NOWAIT),并且该锁已经获得独占态、或正在上行处理、或共享态存在,则返回EBUSY,不能获得。 error = acquire(&lkp, extflags, (LK_HAVE_EXCL | LK_WANT_EXCL)); if (error) break; lkp->lk_flags |= LK_WANT_EXCL; error = acquire(&lkp, extflags, LK_WANT_UPGRADE | LK_SHARE_NONZERO); lkp->lk_flags &= ~LK_WANT_EXCL; if (error) break; lkp->lk_flags |= LK_HAVE_EXCL; lkp->lk_lockholder = pid; if (lkp->lk_exclusivecount != 0) panic("lockmgr: non-zero exclusive count"); lkp->lk_exclusivecount = 1; break; 这段代码,第一次调用acquire()函数,等待其他进程独占该锁使用完成;第二次调用acquire()函数是等待共享使用锁的进程完成。当这些处理完后,说明独占获得该锁。 LK_RELEASE: if (lkp->lk_exclusivecount != 0) { if (lkp->lk_lockholder != pid && lkp->lk_lockholder != LK_KERNPROC) { panic("lockmgr: pid %d, not %s %d unlocking", pid, "exclusive lock holder", lkp->lk_lockholder); } if (lkp->lk_exclusivecount == 1) { lkp->lk_flags &= ~LK_HAVE_EXCL; lkp->lk_lockholder = LK_NOPROC; lkp->lk_exclusivecount = 0; } else { lkp->lk_exclusivecount--; } } else if (lkp->lk_flags & LK_SHARE_NONZERO) shareunlock(lkp, 1); if (lkp->lk_flags & LK_WAIT_NONZERO) wakeup((void *)lkp); break; 首先考虑该锁独占计数器lk_exclusivecount不为0(说明该锁被独占使用,不可能还有共享态存在),如果lk_lockholder不等于pid,而且也不等于LK_KERNPROC,这是lockmgr锁规则不允许的,出错。如果该计数器lk_exclusivecount为1,无论该锁是否支持递归调用,都是最后一个独占态,应该将该锁状态还原为初始态。如果该计数器lk_exclusivecount不为0,说明该锁被本进程递归调用,而且还有其他独占态存在,只须将lk_exclusivecount减1就可以了。 考虑该锁被共享使用(LK_SHARE_NONZERO),调用shareunlock()函数处理就可以了。 如果该锁的lk_flags 标志含有LK_WAIT_NONZERO,说明有其他进程正阻塞于该锁,试图唤醒这些进程。处理完成。 LK_DRAIN: if (lkp->lk_lockholder == pid) panic("lockmgr: draining against myself"); error = acquiredrain(lkp, extflags); if (error) break; lkp->lk_flags |= LK_DRAINING | LK_HAVE_EXCL; lkp->lk_lockholder = pid; lkp->lk_exclusivecount = 1; break; 如果lk_lockholder等于pid,说明该锁正被本进程独占持有,而本进程不能等待本进程该锁独占活动状态结束,所以出错。 调用acquiredrain()函数等待该锁所有状态的操作完成。如果成功,则善后处理。 如果lockmgr的操作不是以上需求,则出错(switch语句的default处理)。 if ((lkp->lk_flags & LK_WAITDRAIN) && (lkp->lk_flags & (LK_HAVE_EXCL | LK_WANT_EXCL | LK_WANT_UPGRADE | LK_SHARE_NONZERO | LK_WAIT_NONZERO)) == 0) { lkp->lk_flags &= ~LK_WAITDRAIN; wakeup((void *)&lkp->lk_flags); } mtx_unlock(lkp->lk_interlock); 最后这段代码是上述操作成功完成后,最后处理部分。如果lk_flags标志含有LK_WAITDRAIN(说明有其他进程B等待结束锁的活动状态),并且所有的活动状态确实结束了,则唤醒进程B。 至此,lockmgr()函数的讨论结束。 函数void lockmgr_printinfo(lkp): if (lkp->lk_sharecount) printf(" lock type %s: SHARED (count %d)", lkp->lk_wmesg, lkp->lk_sharecount); else if (lkp->lk_flags & LK_HAVE_EXCL) printf(" lock type %s: EXCL (count %d) by pid %d", lkp->lk_wmesg, lkp->lk_exclusivecount, lkp->lk_lockholder); if (lkp->lk_waitcount > 0) printf(" with %d pending", lkp->lk_waitcount); 该函数是用于打印lockmgr锁的当前信息,十分简单。 7. 关于FreeBSD5.0的锁机制再讨论 A、尽量使用sleep类型(MTX_DEF)的互斥体机制,目前自旋锁(MTX_SPIN)主要用于SMP调度方面。 B、在持有一个互斥体(不是Giant)的时候,不要使用tsleep(),因为在该函数中,除了Giant以外,不会释放任何其他互斥体。 C、当持有一个互斥体A(不包括Giant)的时候,在使用msleep()和cv_wait()函数的时候,应该将A作为入参传入。在这两个函数里,会一开始释放该互斥体A,在函数结束的时候,对互斥体A重新加锁。 D、尽量避免使用递归锁机制。