sync.mutex

Go Mutex

实现思路

Golang中的互斥锁有两种模式:正常模式和饥饿模式;正常模式下,goroutinue按照正常的先进先出(FIFO)顺序出队,依次被唤醒,被唤醒的goroutine并不一定能直接获得锁,它需要与新请求锁的goroutines去争夺锁的所有权,因为新来的goroutinue正在CPU上运行且数量较多,刚被唤醒的goroutinue在抢锁上并没有优势(外来的和尚好念经...),如果一个等待的goroutine超过1ms时仍未获取到锁,会将这把锁转换为饥饿模式;饥饿模式下,锁的所有权会直接给到排在队头的goroutinue(保证公平性,不能让辛苦排队等待的goroutinues一直拿不到锁),将新来的goroutinue放到队尾.

如果一个被唤醒的goroutinue拿到了锁,且是队列中的最后一个或它的等待时间少于1ms,互斥锁切回正常模式(差不多得了,不能让人家新来的都往队尾排,毕竟人家带着资源);普通模式的好处是:一个goroutinue可以多次竞争锁,不用每次都从队尾排起(你行就直接上,不用等);饥饿模式的好处是可以避免尾部延迟,照顾了排在队列后部的goroutinue,不会让它们等待的时间过长.

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Mutex struct {
// state字段表示锁的状态第0个bit(1)表示锁已被获取,也就是已加锁,被某个goroutine拥有;第1个bit(2)表示是否有goroutine被唤醒,尝试获取锁;第2个bit(4)标记这把锁是否为饥饿状态
state int32
// sema表示信号量,用于从等待队列中唤醒协程
sema uint32
}
const (
// 锁标识, Mutex.state&mutexLocked==1表示已经上锁;Mutex.state&mutexLocked==0表示已经未锁
mutexLocked = 1 << iota
// 是否有goroutine被唤醒or新来的goroutine
// Mutex.state&mutexWoken==1表示存在运行中的协程;Mutex.state&mutexWoken==0表示不存在运行中的协程
mutexWoken
// 饥饿状态
// Mutex.state&mutexStarving==1表示饥饿模式;Mutex.state&mutexStarving==0表示正常模式
mutexStarving
// 等待者数量偏移量(值为3)
// Mutex.state右移3位表示等待队列中阻塞协程的数量,即有多少协程由于该锁阻塞
mutexWaiterShift = iota
)

mutex不能copy,如果要作为参数传递时,需要传指针

image-20210425235156694.png

部分源码

lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
func (m *Mutex) Lock() {
// 如果锁状态为0=>锁开启且无等待者,直接加锁成功
// atomic CAS操作,先判断指针指向的值(第一个参数)和要比较的值(第二个参数)是否相等,仅当此判断得到肯定的结果之后,才会用新数据(第三个参数)代表的新值替换掉原先的旧值,否则操作就会被忽略
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 加锁主流程
// 当前goroutine的等待时长
var waitStartTime int64
// 当前goroutinue是否处于饥饿状态(超过1ms还拿不到锁,区分和锁的饥饿模式)
starving := false
awoke := false
// 当前goroutine的自旋次数
iter := 0
// 记住之前锁的状态
old := m.state
// 死循环阻塞住一直等获取到锁(走到break退出这个循环说明拿到锁了)
for {
// 正常模式下自旋逻辑: (饥饿模式下,不需要自旋,因为锁会直接交给队头的goroutinue)
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 将自己的状态设置为唤醒,这样unlock时就不会唤醒别的阻塞goroutinue了(结合unlock看)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋等待
runtime_doSpin()
iter++
// // 再次获取锁的状态,用于后面检测锁是否被释放了(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变)
old = m.state
continue
}
// 当走到这一步的时候,可能会有以下的情况(分别对应上吗if的三个条件):
// 1. mutex处于饥饿模式
// 2. mutex处于空闲状态
// 3. 当前goroutine已经不能自旋(iter有次数限制)
// 之前锁的状态赋值给新状态
new := old
// 正常模式,新的锁状态为被占有
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 锁被占用 或者 饥饿模式时,新来的goroutinue乖乖去排队,waiter数量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前goroutinue进入了饥饿状态(1ms之后还拿不到锁,它怒了),则把锁的切换为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果当前goroutinue是被唤醒的
if awoke {
// 新状态清除唤醒标记
new &^= mutexWoken
}
// 上面new这个状态的总结下,基本上以下几种情况
// 1. 正常模式,直接去抢锁就完事了,new变为占有锁
// 2. 锁处于饥饿模式或被占用,只能去队尾排队,new更新为去队尾排队
// 3. 当前goroutine处于饥饿状态(等急了),把锁改为饥饿模式,new更新为锁饥饿模式
// 4. 当前goutine为唤醒,new更新为锁状态去除唤醒状态
// 这个new是mutex.state,是一个int32的类型,上述操作都是位运算,不是简单的赋值

// CAS 操作设置锁的新状态(不代表是抢锁成功了,也可能只更新了锁的模式和waiter数量)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取
// 如果之前锁是正常状态并且没有被占用,那么当前goroutine已经成功拿到锁,直接退出
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 走到这里,说明获取锁失败,使用sleep原语来阻塞当前goroutine
// 如果是被唤醒的等待锁的goroutine,就放到队列头部
// 如果是新来的goroutine,就放到队列尾部
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 被唤醒(unlock或正常调动)
// 更新goroutinue的饥饿状态,等待超过1ms即进入饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 获取当前锁状态
old = m.state
// 如果锁处于饥饿模式,直接抢到锁返回(饥饿模式下遵从严格的FIFO,不需要考虑别的goroutinue竞争)
if old&mutexStarving != 0 {
// 加锁并给waiter减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 已经不饥饿或仅剩一个waiter,清除饥饿标记
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 如果唤醒之后,锁不是饥饿模式,则进入下一次循环,尝试获取锁(正常模式,被唤醒的goroutinue要面对其它goroutinues的竞争)
awoke = true
iter = 0
} else {
// CAS 操作设置锁的新状态失败了,老状态不变进行下一次循环
old = m.state
}
}
}

上述提到的自旋添加应满足:

  1. 自旋次数小于4次;
  2. 在多核机器上;
  3. GOMAXPROCS > 1并且至少有一个其它的处于运行状态的P;
  4. 当前P没有其它等待运行的G;

unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// lock和unlock不是直接意义上的把资源给锁住,而是一种抽象:lock只是一次允许一个goroutinue去读写资源,其余的goroutinue要么sleep、要么自旋;unlock同理,并不是真正的解锁某个资源,更主要做的是唤醒下一个goroutinue去让它读写资源
func (m *Mutex) Unlock() {
// 直接drop lock bit
new := atomic.AddInt32(&m.state, -mutexLocked)
if new&mutexStarving == 0 {
// 正常模式
old := new
for {
// 如果队列中没有等待的waiter(没人可被唤醒了) 或 锁被占有了(在循环过程中,其它goroutinue拿了锁) 或 有goroutinue被唤醒(不需要再去尝试唤醒其它goroutine) 或 锁处于饥饿模式(锁会直接交给队头的goroutinue),则直接返回,啥都不用干
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待
// 更新mutexWoken,表示有goroutinue被唤醒,等待队列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果状态设置成功了,通过信号量去唤醒goroutine,移交锁的所有权 => 这一步很明显,unlock的本质是唤醒下一个goroutinue
runtime_Semrelease(&m.sema, false, 1)
return
}
// 循环结束的时候,更新一下状态,因为有可能在执行的过程中,状态被修改了(比如被其它goroutinue改为了饥饿模式)
old = m.state
}
} else {
// 饥饿模式,直接将当前锁交给下一个正在尝试获取锁的等待者
runtime_Semrelease(&m.sema, true, 1)
}
}
0%