golang源码分析-深入理解Mutex

前言

刚刚开始接触golang时,觉得mutex锁简单易用。到下个阶段自己觉得这种是悲观锁,效率很低。直到学习和了解它的原理才明白mutex是一个兼顾性能和公平的精妙设计。

本文将从mutex的几个阶段出发,梳理出每个阶段它的设计、所解决的问题以及不同场景下的特殊考虑。

锁介绍

上篇博客介绍了一些锁的分类、原理以及使用场景: lock-category

大家可以参考下。

Mutex使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var (
	m   sync.Mutex
	cnt int
)

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			inc()
		}()
	}

	wg.Wait()
	// output: cnt: 10
	fmt.Println("cnt:", cnt)
}

func inc() {
	m.Lock()
	defer m.Unlock()
	cnt++
}
  1. 这里是先声明两个全局变量:mutex和共享资源cnt
  2. 接着使用sync.waitGroup控制主协程等待子协程运行完。
  3. 子协程内使用mutex.Lock()将代码块锁住,在最后使用mutex.Unlock()释放代码块。
  4. 锁住的代码块内,只对cnt变量递增

这里总共起了10个子协程,理想情况下最终cnt的值为10。最终输出结果也如我们预想的一致,说明mutex的两个方法Lock()Unlock()的确能够控制并发。

至于这里的sync.WaitGroup,其实是为了控制主协程(main)等待子协程运行完,这一块不是本文的重点,有兴趣的话可以自行了解哈。

Locker接口

在了解mutex的版本演化前,需要介绍下sync.Locker接口。接口定义如下:

1
2
3
4
5
// Locker 该接口代表一个资源或对象能够被锁住或解锁
type Locker interface {
	Lock()
	Unlock()
}

接口定义很简单,调用Lock()方法可以锁住,调用Unlock()能够解锁。

Mutex version 1

看看第一版mutex是怎么实现sync.Locker接口的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

// Mutex结构, 包括两个字段
type Mutex struct {
    key  int32 // 标识锁是否被持有
    sema int32 // 专门用于阻塞/唤醒goroutine的信号量
}

// 保证可以成功地将delta增加到val上
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val
		// cas操作, 除非成功增加值, 否则一直循环操作
        if cas(val, v, v+delta) {
            return v + delta
        }
    }
    panic("unreached")
}

// 实现Lock()方法
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 { // 给m.key的值加1, 如果方法返回1, 说明上锁成功
        return
    }
    semacquire(&m.sema) // 否则调用semacquire方法将goroutine挂起
}

func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 { // 将m.key减去1, 如果方法返回0, 说明锁已经被解
        return
    }
    semrelease(&m.sema) // 唤醒阻塞的goroutine
}

先看代码中的xadd方法,里面的实现是一个for循环,每次都进行CAS操作,给val增加delta值。如果不成功则进入下一次循环,直到成功。

Lock方法的逻辑是:给m.key加1,如果xadd返回1,则说明之前key的值为0(没有协程占用该锁),加锁成功。如果返回的值大于1,说明不只有一个协程等待,则调用semacquire方法阻塞当前协程。

Unlock方法的逻辑是:给m.key减1,如果xadd返回0,说明目前没有其他协程等待了,解锁成功。如果返回的值大于0,说明仍然有多个协程阻塞,则调用semarelease方法唤醒阻塞的协程。

通过逻辑,可以知道第一版的mutex是通过key来标识锁是否被占用。

  • key == 0: 代表锁没有被占用,是空闲状态
  • key == 1: 锁当前被1个goroutine占用
  • key == n (n > 1): 当前有n-1个goroutine等待锁

目前这一版的mutex有两个问题:

  1. 请求的goroutine是按照FIFO(先入先出)的顺序获取锁的,这种情况会导致性能问题,因为会频繁的切换上下文。
  2. Unlock方法没有被协程绑定,会导致协程A调用Lock方法后,可能被其他协程解锁的情况。这个设计也沿用到了现在,所以需要遵循谁加锁,谁解锁的规范。

Mutex version 2

根据第一版的mutex的代码,我们可以观察到如果一个协程获取不到锁,就会被阻塞,直到其他协程解锁将其唤醒。这种情况下,如果有多个协程在等待锁,那么就会出现FIFO的情况。

而第二版改进的地方,就是避免FIFO的情况。先看Mutex结构体的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Mutex struct {
    state int32//这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。
    sema  uint32// 信号量专用,用以阻塞/唤醒goroutine
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

第一版的mutex是通过key字段来标识锁是否被占用,而第二版则是通过一个state字段来实现这一功能。

它的每一个二进制位都有特定的含义(从低到高):

  • 第一位:代表锁是否被持有
  • 第二位:代表是否有唤醒的goroutine
  • 剩余的位数:代表等待此锁的goroutine数

state字段解析

接着看Lock方法的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Mutex) Lock() {
    // Fast path: 幸运case,能够直接获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    for {
        old := m.state
        new := old | mutexLocked // 新状态加锁
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift //等待者数量加一
        }
        if awoke {
            // goroutine是被唤醒的,
            // 新状态清除唤醒标志
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
            if old&mutexLocked == 0 { // 锁原状态未加锁
                break
            }
            runtime.Semacquire(&m.sema) // 请求信号量
            awoke = true
        }
    }
}

首先先试下通过CAS操作,能否将锁状态从0变为1。如果成功,说明锁之前是空闲的,那么加锁成功,直接返回。如果失败,说明锁被占用,就继续Lock的逻辑。这一段也被称为Fast path

之后会进入到一个for循环内:

  1. 先将state字段的值上锁(给最低位置为1)
  2. 如果之前的state字段的值已经被锁住,那么将等待者数量加一
  3. 如果goroutine是被唤醒的,那么新状态清除唤醒标志。(使用位清空操作)
  4. 尝试将新状态设置到state字段中,如果成功,说明本次循环内没有其他协程竞争。如果失败则继续下一轮循环
  5. 设置state成功后,如果之前的state字段未被锁住,那么直接退出循环,加锁成功。如果锁原本就被锁住了,那么请求信号量阻塞当前协程
  6. 协程被唤醒时,将awoke标记为true,以便下一轮循环时清除唤醒标志

先较于第一版的加锁逻辑,最大的区别是:如果当前协程获取不到锁时,不会马上阻塞,而是会进入一个循环。这样减少了cpu上下文切换的次数,提升了效率。同时基于state字段的复合型设计(一个字段代表多个含义),也奠定了后续版本的基础。

说完了加锁,再看下解锁的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
    if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
            return
        }
        new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime.Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}
  1. 通过原子包的AddInt32方法,将state字段的值减去mutexLocked(最后一位置为0)。如果本身就为0的话,则说明对一个未加锁的锁进行解锁,直接panic
  2. 进入循环内,有两种情况可以直接返回。如果当前没有等待者(通过右移两个比特,因为state从第3位代表等待锁的数量),直接返回。另外一种是,如果state字段已经被加锁或者有唤醒的标志位,也直接返回(说明有其他协程再加锁,本次解锁方法不用管,其他协程自己会处理的)。
  3. 设置新的状态,等待者-1(第3个比特位减一)。并设置唤醒标志(第2个比特位设置为1),因为逻辑如果可以走到这里,说明有协程在等待锁,所以需要唤醒。
  4. 尝试将新状态设置到state字段中,如果成功,则释放信号量,唤醒一个等待的协程。如果失败则继续下一轮循环

第一次看源码到这里,其实我有点顶不住,是在草稿本上推了好几篇才理解。通过这一系列复杂的操作,可以安全的加锁和解锁了。本质上就是通过原子包判断二进制位,同时再加上一系列的边界检测。

Mutex version 3

第二版的mutex给了新进来的协程一个机会,如果获取不到锁不会马上阻塞,而是会进入一个循环,略微减少了cpu切换上下文的次数。请注意这里我说“略微”,是因为这个循环大概率只是跑一次的。所以说这次优化力度还是不够的,于是第三版的mutex给了新进来的协程更多机会。

看看第三版的Lock方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (m *Mutex) Lock() {
    // Fast path: 幸运之路,正好获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    iter := 0
    for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
        old := m.state // 先保存当前锁的状态
        new := old | mutexLocked // 新状态设置加锁标志
        if old&mutexLocked != 0 { // 锁还没被释放
            if runtime_canSpin(iter) { // 还可以自旋
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue // 自旋,再次尝试请求锁
            }
            new = old + 1<<mutexWaiterShift
        }
        if awoke { // 唤醒状态
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // 新状态清除唤醒标记
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                break
            }
            runtime_Semacquire(&m.sema) // 阻塞等待
            awoke = true // 被唤醒
            iter = 0
        }
    }
}
  1. 第一步也是一样,先尝试CAS操作,判断是否可以直接持有锁。如果成功,直接返回。
  2. 进入循环内,首先给状态加锁并保存到new变量中,如果锁没释放,则进入自旋逻辑。
  3. 判断是否还可以自旋,如果可以则将唤醒状态置为1(防止Unlock时把其他阻塞的协程唤醒了),然后继续自旋。自旋时跳过本次循环(旋到锁释放)。
  4. 等待者加一。
  5. 后续逻辑和第二版类似了。清除唤醒标记,CAS获取锁,如果没获取到则阻塞等待。被唤醒后继续自旋。

这里的runtime_canSpin(iter)runtime_doSpin()都是在runtime包中实现的,用于判断是否可以自旋和自旋的逻辑。

第三版主要的改动是给了新进来的协程更多的机会,如果获取不到锁不会马上阻塞,而是会进入一个自旋的循环。这样减少了cpu切换上下文的次数,提升了效率。在一些临界区资源占用时间短的情况下,锁一般很快就能释放,协程自旋几次也能获取锁,而无需阻塞。

第三版的Unlock方法基本没啥改动,这里就不再赘述了。

Mutex version 4

第三版的mutex大幅度减少了cpu切换上下文的次数。但是有个问题,试想一下这个场景:

  1. 你和你的好兄弟都想要拉屎,只有一个厕所,你争不过他于是只能一直原地踏步等待(自旋),踏步到一定时间后你见好兄弟还没出来,于是你睡着了(阻塞)。
  2. 在这个时候,来了位新朋友,他也在原地踏步等待厕所。
  3. 此时你的兄弟出来了,而你还没醒,所以厕所被新来的朋友给占了。
  4. 极端情况下,这种情况可能一直出现,导致最开始的你一直没机会上厕所。

最开始来的协程可能一直没机会获取锁,这种情况也可以叫做饥饿问题。为了解决这个问题,第四版mutex横空出世:

1
2
3
4
5
6
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota

starvationThresholdNs = 1e6

可以看到多了一个mutexStarving的状态。这个状态是用来解决饥饿问题的。也意味着state字段的含义又变了。

state字段解析v2

前两位含义不变(从低位开始),第三位代表是否处于饥饿状态。剩余的位数代表的是等待此锁的goroutine数。而starvationThresholdNs代表协程多久没获取到锁后(单位:纳秒),就会进入饥饿状态。

进入饥饿状态后,mutex会优先唤醒处于饥饿状态的协程。这样可以保证协程不会一直处于饥饿状态(新来的朋友给久等的老同志让下位置)。在原本的高性能基础上,又增加了公平性。

当满足以下两者之一时,mutex会从饥饿模式切换回正常模式:

  1. 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
  2. 此 waiter 的等待时间小于 1 毫秒。

Lock

简化后的Lock方法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (m *Mutex) Lock() {
    // Fast path: 幸运之路,一下就获取到了锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 此goroutine的饥饿标记
    awoke := false // 唤醒标记
    iter := 0 // 自旋次数
    old := m.state // 当前的锁的状态
    for {
        // 锁是非饥饿状态,锁还没被释放,尝试自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
            continue
        }
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked // 非饥饿状态,加锁
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift // waiter数量加1
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving // 设置饥饿状态
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // 新状态清除唤醒标记
        }
        // 成功设置新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 处理饥饿状态

            // 如果以前就在队列里面,加入到队列头
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 阻塞等待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 唤醒之后检查锁是否应该处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果锁已经处于饥饿状态,直接抢到锁,返回
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 有点绕,加锁并且将waiter数减1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

第一步快速路径,不讲了。

接下来进入到了“慢路径”,这里golang抽出一个方法并使用了内联的方式,减少了函数调用的开销,可以说很细致了。

  1. 首先初始化一些该协程的状态,包括是否饥饿、是否被唤醒、自旋次数、当前锁的状态等。
  2. 进入一个循环。判断当前锁的状态(是否被锁住、是否不处于饥饿模式),以及是否可以自旋。如果都满足则自旋(与第三版的逻辑一样)。这里和第三版最大的区别是:只有非饥饿模式下才会走到自旋的逻辑
  3. 增加一个new变量,后续设置状态都是对这个变量进行操作。
  4. 非饥饿模式下,设置加锁标志(比较好理解,新协程在非饥饿模式下才能直接获取锁)。
  5. 如果该锁目前的状态是锁住的或者处于饥饿模式,等待者数量加一。
  6. 如果当前协程是饥饿状态starving且锁是被占用的,则将锁置为饥饿模式。
  7. 清除唤醒标记。
  8. 将刚刚对new变量的操作设置到state字段中。如果失败则进入下一轮循环。
  9. 成功则判断锁的原状态,如果锁未被占用且不处于饥饿模式,则直接返回,获取锁成功。
  10. 先判断等待时间是否为0,然后对waitStartTime赋值。接着对当前协程进行阻塞等待。这里如果等待时间为0则会放在等待队列尾部,不为0则放在头部。
  11. 走到这里,协程已经被唤醒了。判断是否当前协程处于饥饿状态,其实就是判断等待时间是否大于1ms。如果不为饥饿模式则继续回去自旋。
  12. delta := int32(mutexLocked - 1<<mutexWaiterShift),这个我理解了很久,其实就是将锁的状态设置为锁住,等待者数量减一。
  13. 如果当前协程不处于饥饿模式或者等待者数量为1,说明当前协程是最后一个等待者,清除饥饿标记。
  14. 通过原子包,将新的状态加到state字段中,逻辑结束。

Unlock

经过这么多轮的演变,mutex的代码越来越复杂了,但是万里长征只剩最后一步了。Unlock方法的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		fatal("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		runtime_Semrelease(&m.sema, true, 1)
	}
}

Unlock也有个快速路径,不讲了。

  1. 进入unlockSlow方法,如果对一个未加锁的mutex解锁,抛出异常。
  2. 如果当前处于饥饿模式,则直接释放信号量唤醒队列的一个协程(队列第一个),解锁成功。
  3. 非饥饿模式下,有两种情况可以直接解锁成功。一是没有等待者,直接返回。二是锁被锁住|锁有唤醒的协程|锁处于饥饿模式|直接返回(被锁住/处于饥饿模式,是因为此时有其他协程操作mutex,它们自己会处理,我们不操劳,有唤醒是因为其他unlock操作了,也是一样)。
  4. 等待者减1,并设置唤醒标记,将新值赋给state中。
  5. 唤醒一个等待的协程。

总结

源码读到这里,已经接近崩溃。但还是要有个结尾,本质上mutex就是对几个二进制位进行操作,多个协程通过原子包来"抢夺"对mutex的控制权。如果没抢到则继续或者阻塞等待,直到抢到为止。

有一点值得我们学习的,无论mutex内部的实现多么复杂,我们在使用的时候只需要关心LockUnlock方法。这也是golang的设计哲学之一:

大道至简

updatedupdated2024-02-292024-02-29