刚刚开始接触golang时,觉得mutex
锁简单易用。到下个阶段自己觉得这种是悲观锁,效率很低。直到学习和了解它的原理才明白mutex
是一个兼顾性能和公平的精妙设计。
本文将从mutex
的几个阶段出发,梳理出每个阶段它的设计、所解决的问题以及不同场景下的特殊考虑。
上篇博客介绍了一些锁的分类、原理以及使用场景: lock-category
大家可以参考下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
var (
m sync.Mutex
cnt int
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inc()
}()
}
wg.Wait()
// output: cnt: 10
fmt.Println("cnt:", cnt)
}
func inc() {
m.Lock()
defer m.Unlock()
cnt++
}
|
- 这里是先声明两个全局变量:
mutex
和共享资源cnt
- 接着使用
sync.waitGroup
控制主协程等待子协程运行完。
- 子协程内使用
mutex.Lock()
将代码块锁住,在最后使用mutex.Unlock()
释放代码块。
- 锁住的代码块内,只对
cnt
变量递增
这里总共起了10个子协程,理想情况下最终cnt
的值为10。最终输出结果也如我们预想的一致,说明mutex
的两个方法Lock()
与Unlock()
的确能够控制并发。
至于这里的sync.WaitGroup
,其实是为了控制主协程(main)等待子协程运行完,这一块不是本文的重点,有兴趣的话可以自行了解哈。
在了解mutex
的版本演化前,需要介绍下sync.Locker
接口。接口定义如下:
1
2
3
4
5
|
// Locker 该接口代表一个资源或对象能够被锁住或解锁
type Locker interface {
Lock()
Unlock()
}
|
接口定义很简单,调用Lock()
方法可以锁住,调用Unlock()
能够解锁。
看看第一版mutex
是怎么实现sync.Locker
接口的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// Mutex结构, 包括两个字段
type Mutex struct {
key int32 // 标识锁是否被持有
sema int32 // 专门用于阻塞/唤醒goroutine的信号量
}
// 保证可以成功地将delta增加到val上
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
// cas操作, 除非成功增加值, 否则一直循环操作
if cas(val, v, v+delta) {
return v + delta
}
}
panic("unreached")
}
// 实现Lock()方法
func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 { // 给m.key的值加1, 如果方法返回1, 说明上锁成功
return
}
semacquire(&m.sema) // 否则调用semacquire方法将goroutine挂起
}
func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 { // 将m.key减去1, 如果方法返回0, 说明锁已经被解
return
}
semrelease(&m.sema) // 唤醒阻塞的goroutine
}
|
先看代码中的xadd
方法,里面的实现是一个for循环,每次都进行CAS操作,给val增加delta值。如果不成功则进入下一次循环,直到成功。
Lock
方法的逻辑是:给m.key加1,如果xadd
返回1,则说明之前key的值为0(没有协程占用该锁),加锁成功。如果返回的值大于1,说明不只有一个协程等待,则调用semacquire
方法阻塞当前协程。
Unlock
方法的逻辑是:给m.key减1,如果xadd
返回0,说明目前没有其他协程等待了,解锁成功。如果返回的值大于0,说明仍然有多个协程阻塞,则调用semarelease
方法唤醒阻塞的协程。
通过逻辑,可以知道第一版的mutex
是通过key来标识锁是否被占用。
- key == 0: 代表锁没有被占用,是空闲状态
- key == 1: 锁当前被1个goroutine占用
- key == n (n > 1): 当前有n-1个goroutine等待锁
目前这一版的mutex
有两个问题:
- 请求的goroutine是按照FIFO(先入先出)的顺序获取锁的,这种情况会导致性能问题,因为会频繁的切换上下文。
Unlock
方法没有被协程绑定,会导致协程A调用Lock
方法后,可能被其他协程解锁的情况。这个设计也沿用到了现在,所以需要遵循谁加锁,谁解锁的规范。
根据第一版的mutex
的代码,我们可以观察到如果一个协程获取不到锁,就会被阻塞,直到其他协程解锁将其唤醒。这种情况下,如果有多个协程在等待锁,那么就会出现FIFO的情况。
而第二版改进的地方,就是避免FIFO的情况。先看Mutex
结构体的定义:
1
2
3
4
5
6
7
8
9
10
|
type Mutex struct {
state int32//这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。
sema uint32// 信号量专用,用以阻塞/唤醒goroutine
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
|
第一版的mutex
是通过key
字段来标识锁是否被占用,而第二版则是通过一个state
字段来实现这一功能。
它的每一个二进制位都有特定的含义(从低到高):
- 第一位:代表锁是否被持有
- 第二位:代表是否有唤醒的goroutine
- 剩余的位数:代表等待此锁的goroutine数
接着看Lock
方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
// goroutine是被唤醒的,
// 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
runtime.Semacquire(&m.sema) // 请求信号量
awoke = true
}
}
}
|
首先先试下通过CAS
操作,能否将锁状态从0变为1。如果成功,说明锁之前是空闲的,那么加锁成功,直接返回。如果失败,说明锁被占用,就继续Lock
的逻辑。这一段也被称为Fast path
。
之后会进入到一个for循环内:
- 先将
state
字段的值上锁(给最低位置为1)
- 如果之前的
state
字段的值已经被锁住,那么将等待者数量加一
- 如果goroutine是被唤醒的,那么新状态清除唤醒标志。(使用位清空操作)
- 尝试将新状态设置到
state
字段中,如果成功,说明本次循环内没有其他协程竞争。如果失败则继续下一轮循环
- 设置
state
成功后,如果之前的state
字段未被锁住,那么直接退出循环,加锁成功。如果锁原本就被锁住了,那么请求信号量阻塞当前协程
- 协程被唤醒时,将
awoke
标记为true,以便下一轮循环时清除唤醒标志
先较于第一版的加锁逻辑,最大的区别是:如果当前协程获取不到锁时,不会马上阻塞,而是会进入一个循环。这样减少了cpu上下文切换的次数,提升了效率。同时基于state
字段的复合型设计(一个字段代表多个含义),也奠定了后续版本的基础。
说完了加锁,再看下解锁的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
panic("sync: unlock of unlocked mutex")
}
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
old = m.state
}
}
|
- 通过原子包的
AddInt32
方法,将state
字段的值减去mutexLocked
(最后一位置为0)。如果本身就为0的话,则说明对一个未加锁的锁进行解锁,直接panic
- 进入循环内,有两种情况可以直接返回。如果当前没有等待者(通过右移两个比特,因为
state
从第3位代表等待锁的数量),直接返回。另外一种是,如果state
字段已经被加锁或者有唤醒的标志位,也直接返回(说明有其他协程再加锁,本次解锁方法不用管,其他协程自己会处理的)。
- 设置新的状态,等待者-1(第3个比特位减一)。并设置唤醒标志(第2个比特位设置为1),因为逻辑如果可以走到这里,说明有协程在等待锁,所以需要唤醒。
- 尝试将新状态设置到
state
字段中,如果成功,则释放信号量,唤醒一个等待的协程。如果失败则继续下一轮循环
第一次看源码到这里,其实我有点顶不住,是在草稿本上推了好几篇才理解。通过这一系列复杂的操作,可以安全的加锁和解锁了。本质上就是通过原子包判断二进制位,同时再加上一系列的边界检测。
第二版的mutex
给了新进来的协程一个机会,如果获取不到锁不会马上阻塞,而是会进入一个循环,略微减少了cpu切换上下文的次数。请注意这里我说“略微”,是因为这个循环大概率只是跑一次的。所以说这次优化力度还是不够的,于是第三版的mutex
给了新进来的协程更多机会。
看看第三版的Lock
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
func (m *Mutex) Lock() {
// Fast path: 幸运之路,正好获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态设置加锁标志
if old&mutexLocked != 0 { // 锁还没被释放
if runtime_canSpin(iter) { // 还可以自旋
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue // 自旋,再次尝试请求锁
}
new = old + 1<<mutexWaiterShift
}
if awoke { // 唤醒状态
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
break
}
runtime_Semacquire(&m.sema) // 阻塞等待
awoke = true // 被唤醒
iter = 0
}
}
}
|
- 第一步也是一样,先尝试CAS操作,判断是否可以直接持有锁。如果成功,直接返回。
- 进入循环内,首先给状态加锁并保存到
new
变量中,如果锁没释放,则进入自旋逻辑。
- 判断是否还可以自旋,如果可以则将唤醒状态置为1(防止
Unlock
时把其他阻塞的协程唤醒了),然后继续自旋。自旋时跳过本次循环(旋到锁释放)。
- 等待者加一。
- 后续逻辑和第二版类似了。清除唤醒标记,CAS获取锁,如果没获取到则阻塞等待。被唤醒后继续自旋。
这里的runtime_canSpin(iter)
和runtime_doSpin()
都是在runtime
包中实现的,用于判断是否可以自旋和自旋的逻辑。
第三版主要的改动是给了新进来的协程更多的机会,如果获取不到锁不会马上阻塞,而是会进入一个自旋的循环。这样减少了cpu切换上下文的次数,提升了效率。在一些临界区资源占用时间短的情况下,锁一般很快就能释放,协程自旋几次也能获取锁,而无需阻塞。
第三版的Unlock
方法基本没啥改动,这里就不再赘述了。
第三版的mutex
大幅度减少了cpu切换上下文的次数。但是有个问题,试想一下这个场景:
- 你和你的好兄弟都想要拉屎,只有一个厕所,你争不过他于是只能一直原地踏步等待(自旋),踏步到一定时间后你见好兄弟还没出来,于是你睡着了(阻塞)。
- 在这个时候,来了位新朋友,他也在原地踏步等待厕所。
- 此时你的兄弟出来了,而你还没醒,所以厕所被新来的朋友给占了。
- 极端情况下,这种情况可能一直出现,导致最开始的你一直没机会上厕所。
最开始来的协程可能一直没机会获取锁,这种情况也可以叫做饥饿问题。为了解决这个问题,第四版mutex
横空出世:
1
2
3
4
5
6
|
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
|
可以看到多了一个mutexStarving
的状态。这个状态是用来解决饥饿问题的。也意味着state
字段的含义又变了。
前两位含义不变(从低位开始),第三位代表是否处于饥饿状态。剩余的位数代表的是等待此锁的goroutine数。而starvationThresholdNs
代表协程多久没获取到锁后(单位:纳秒),就会进入饥饿状态。
进入饥饿状态后,mutex
会优先唤醒处于饥饿状态的协程。这样可以保证协程不会一直处于饥饿状态(新来的朋友给久等的老同志让下位置)。在原本的高性能基础上,又增加了公平性。
当满足以下两者之一时,mutex
会从饥饿模式切换回正常模式:
- 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
- 此 waiter 的等待时间小于 1 毫秒。
简化后的Lock
方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态
// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 有点绕,加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
|
第一步快速路径,不讲了。
接下来进入到了“慢路径”,这里golang
抽出一个方法并使用了内联的方式,减少了函数调用的开销,可以说很细致了。
- 首先初始化一些该协程的状态,包括是否饥饿、是否被唤醒、自旋次数、当前锁的状态等。
- 进入一个循环。判断当前锁的状态(是否被锁住、是否不处于饥饿模式),以及是否可以自旋。如果都满足则自旋(与第三版的逻辑一样)。这里和第三版最大的区别是:只有非饥饿模式下才会走到自旋的逻辑
- 增加一个
new
变量,后续设置状态都是对这个变量进行操作。
- 非饥饿模式下,设置加锁标志(比较好理解,新协程在非饥饿模式下才能直接获取锁)。
- 如果该锁目前的状态是锁住的或者处于饥饿模式,等待者数量加一。
- 如果当前协程是饥饿状态
starving
且锁是被占用的,则将锁置为饥饿模式。
- 清除唤醒标记。
- 将刚刚对
new
变量的操作设置到state
字段中。如果失败则进入下一轮循环。
- 成功则判断锁的原状态,如果锁未被占用且不处于饥饿模式,则直接返回,获取锁成功。
- 先判断等待时间是否为0,然后对
waitStartTime
赋值。接着对当前协程进行阻塞等待。这里如果等待时间为0则会放在等待队列尾部,不为0则放在头部。
- 走到这里,协程已经被唤醒了。判断是否当前协程处于饥饿状态,其实就是判断等待时间是否大于1ms。如果不为饥饿模式则继续回去自旋。
delta := int32(mutexLocked - 1<<mutexWaiterShift)
,这个我理解了很久,其实就是将锁的状态设置为锁住,等待者数量减一。
- 如果当前协程不处于饥饿模式或者等待者数量为1,说明当前协程是最后一个等待者,清除饥饿标记。
- 通过原子包,将新的状态加到
state
字段中,逻辑结束。
经过这么多轮的演变,mutex
的代码越来越复杂了,但是万里长征只剩最后一步了。Unlock
方法的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
|
Unlock
也有个快速路径,不讲了。
- 进入
unlockSlow
方法,如果对一个未加锁的mutex
解锁,抛出异常。
- 如果当前处于饥饿模式,则直接释放信号量唤醒队列的一个协程(队列第一个),解锁成功。
- 非饥饿模式下,有两种情况可以直接解锁成功。一是没有等待者,直接返回。二是锁被锁住|锁有唤醒的协程|锁处于饥饿模式|直接返回(被锁住/处于饥饿模式,是因为此时有其他协程操作
mutex
,它们自己会处理,我们不操劳,有唤醒是因为其他unlock
操作了,也是一样)。
- 等待者减1,并设置唤醒标记,将新值赋给
state
中。
- 唤醒一个等待的协程。
源码读到这里,已经接近崩溃。但还是要有个结尾,本质上mutex
就是对几个二进制位进行操作,多个协程通过原子包来"抢夺"对mutex
的控制权。如果没抢到则继续或者阻塞等待,直到抢到为止。
有一点值得我们学习的,无论mutex
内部的实现多么复杂,我们在使用的时候只需要关心Lock
和Unlock
方法。这也是golang
的设计哲学之一:
大道至简