锁
type Locker interface {
Lock()
Unlock()
}
Mutex 互斥锁
互斥即不可同时运行。即使用了互斥锁的两个代码片段互相排斥,只有其中一个代码片段执行完成后,另一个才能执行。
type Mutex struct {
state int32
sema uint32
}
state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
- mutexLocked — 表示互斥锁的锁定状态;
- mutexWoken — 表示从正常模式被从唤醒;
- mutexStarving — 当前的互斥锁进入饥饿状态;
- waitersCount — 当前互斥锁上等待的 Goroutine 个数;
Mutex 正常模式和饥饿模式
- 在正常模式下,锁的等待者会按照先进先出的顺序获取锁。
- 但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
- 在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
- 如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
- 与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
const (
mutexLocked = 1 << iota //1 互斥锁被锁定 + 未被唤醒
mutexWoken //2 互斥锁未锁定 + 被唤醒
mutexStarving //4 互斥锁未锁定 + 未被唤醒 + 饥饿模式启动
mutexWaiterShift = iota //3 互斥锁被锁定 + 被唤醒
starvationThresholdNs = 1e6
)
加锁和解锁
func (m *Mutex) Lock() {
// 当锁的状态是 0 时,将state置位 mutexLocked 为 1:
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:
- 判断当前 Goroutine 能否进入自旋;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真
在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻
- 互斥锁只有在普通模式才能进入自旋;
- runtime.sync_runtime_canSpin 需要返回 true:
- 运行在多 CPU 的机器上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
下面代码为 进入自旋 逻辑
// 开始等待时间戳
var waitStartTime int64
// 饥饿模式标识
starving := false
// 唤醒标识
awoke := false
// 自旋次数
iter := 0
// 保存当前对象锁状态
old := m.state
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
// 判断相当于xxxx...x0xx & 0101 = 01,当前对象状态为:xxxx…x0xx 当前对象锁被使用
// runtime_canSpin(iter) 根据iter自旋次数判断
// 互斥锁被锁定时 进入自旋状态 continue
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke &&
// 判断相当于: xxxx...xx0x & 0010 = 0
// 当前对象状态为未被唤醒时
old&mutexWoken == 0 &&
// 当前有 Goroutine 在等待互斥锁的释放时,也就是有goroution在排队时 old>>mutexWaiterShift != 0 为 true
old>>mutexWaiterShift != 0 &&
// 将对象 改为唤醒状态:xxxx...xx0x | 0010 = xxxx...xx1x
// 在将 m.state 改为被唤醒状态
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
{
// 告知解锁,不要唤醒其他阻塞的goroutines
awoke = true
}
// 进入自旋状态 当前goroutine并不挂起,仍然在占用cpu资源,重试一定次数后,退出自旋状态
runtime_doSpin()
//表示自旋次数
iter++
// 保存mutex对象即将被设置成的状态
//再次获取锁的状态,之后会检查是否锁被释放了
old = m.state
continue
}
一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
new := old
// old 后三位为 mutexLocked,mutexWaiterShift,mutexWoken 时 old&mutexStarving == 0 为true
// xxxx...x0xx & 0100 = 0 xxxx...x0xx
// old&mutexStarving == 0 筛选当前状态为正常模式(非饥饿模式)
// 将新来的goroutines 互斥锁锁定
if old&mutexStarving == 0 {
// 将未锁定状态改为锁定状态
// 非饥饿状态,加锁
new |= mutexLocked
}
// xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;
// old 互斥锁被锁定时但是状态为饥饿模式时,mutex的等待goroutine数目加1
if old&(mutexLocked|mutexStarving) != 0 {
// new + 8 在等待互斥锁的释放的Goroutine数量 +1
// 更新阻塞goroutine的数量,表示mutex的等待goroutine数目加1
new += 1 << mutexWaiterShift
}
// xxxx...xxx1 & 0001 != 0;锁状态为锁定状态
if starving && old&mutexLocked != 0 {
// new + 4 表示从正常模式进入饥饿模式
// xxxx...xxx | 0100 => xxxx...x1xx
new |= mutexStarving
}
// 当 唤醒标志为 true时
if awoke {
// xxxx...xx1x & 0010 = 0, 如果唤醒标志为0 ,表示为被唤醒时,panic
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :设置唤醒状态位0,goroutine是被唤醒的,新状态清除唤醒标志
new &^= mutexWoken
}
计算了新的互斥锁状态之后,会使用 CAS 函数 sync/atomic.compareandswapint32 更新状态
//判断cas锁是否 设置新状态 成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
// xxxx...x0x0 & 0101 = 0, 互斥锁未加锁且为正常模式(非饥饿模式)
if old&(mutexLocked|mutexStarving) == 0 {
//结束cas
break
}
// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
//获取当前运行纳秒时间戳
waitStartTime = runtime_nanotime()
}
//通过信号量保证资源不会被两个 Goroutine 获取
//runtime.sync_runtime_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 判断 等待时间 是否超出限制 1e6 将starving 标志为 true
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// xxxx...x1xx & 0100 != 0 判断锁状态是否 进入饥饿状态
if old&mutexStarving != 0 {
//当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
// xxxx...xx00 & 0011 = 0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加锁并且将waiter数减1
// delta -7 7 = 0111
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//判断 等待队列中 是否只存在当前 Goroutine
if !starving || old>>mutexWaiterShift == 1 {
// delta -11 11 = 1011
delta -= mutexStarving
}
// m.state - 7 表示当前 Goroutine 会获得互斥锁
// m.state - 11 表示当前 Goroutine 会获得互斥锁还会从饥饿模式中退出
atomic.AddInt32(&m.state, delta)
break
}
// 设置唤醒标记 为true
awoke = true
// 重置迭代次数
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取
runtime.sync_runtime_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 的剩余代码也会继续执行。
- 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
- 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// m.state - 1
// 新状态等于 0,当前 Goroutine 就成功解锁了互斥锁
// 新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow 会先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {
// 校验锁状态的合法性
if (new+mutexLocked)&mutexLocked == 0 {
//如果 当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序
throw("sync: unlock of unlocked mutex")
}
// 分别处理正常模式和饥饿模式下的互斥锁
if new&mutexStarving == 0 {
// 正常状态
old := new
for {
// 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0(有唤醒者或锁已经加锁),那么当前方法可以直接返回,不需要唤醒其他等待者;
//如果没有其它的 waiter,说明对这个锁的竞争的 goroutine 只有一个,那就可以直接返回了;如果这个时候有唤醒的 goroutine,或者是又被别人加了锁,那么,无需我们操劳,其它 goroutine 自己干得都很好,当前的这个 goroutine 就可以放心返回了。
//xxxx…x000 & (0001 | 0010 | 0100) => xxxx…x000 & 0111 = 0
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease 唤醒等待者并移交锁的所有权;
//如果有等待者,并且没有唤醒的 waiter,那就需要唤醒一个等待的 waiter。在唤醒之前,需要将 waiter 数量减 1,并且将 mutexWoken 标志设置上,这样,Unlock 就可以返回了。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿状态
// 当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态
runtime_Semrelease(&m.sema, true, 1)
}
}
在正常模式下,上述代码会使用如下所示的处理过程:
- 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者;
- 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease 唤醒等待者并移交锁的所有权;
在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;
总结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
- 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
- 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
- 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
- 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
- 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;
RWMutex 读写锁
读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
sync.RWMutex 中总共包含以下 5 个字段:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- w — 复用互斥锁提供的能力;
- writerSem 和 readerSem — 分别用于写等待读和读等待写:
- readerCount 存储了当前正在执行的读操作数量;
- readerWait 表示当写操作被阻塞时等待的读操作个数;
写锁
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
//进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
- 调用结构体持有的 sync.Mutex 结构体的 sync.Mutex.Lock 阻塞后续的写操作;因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
- 调用 sync/atomic.AddInt32 函数阻塞后续的读操作:
- 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;
写锁的释放会调用 sync.RWMutex.Unlock:
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
与加锁的过程正好相反,写锁的释放分以下几个执行:
- 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
- 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
- 调用 sync.Mutex.Unlock 释放写锁;
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。
读锁
读锁的加锁方法 sync.RWMutex.RLock 很简单,该方法会通过 sync/atomic.AddInt32 将 readerCount 加一:
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
- 如果该方法返回负数 — 其他 Goroutine 获得了写锁,当前 Goroutine 就会调用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待锁的释放;
- 如果该方法的结果为非负数 — 没有 Goroutine 获得写锁,当前方法会成功返回;
当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock 方法:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
该方法会先减少正在读资源的 readerCount 整数,根据 sync/atomic.AddInt32 的返回值不同会分别进行处理:
- 如果返回值大于等于零 — 读锁直接解锁成功;
- 如果返回值小于零 — 有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow 会减少获取锁的写操作等待的读操作数 readerWait 并在所有读操作都被释放之后触发写操作的信号量 writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。
虽然读写互斥锁 sync.RWMutex 提供的功能比较复杂,但是因为它建立在 sync.Mutex 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:
- 调用 sync.RWMutex.Lock 尝试获取写锁时;
- 每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;
- 将 readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;
- 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
WaitGroup
sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我们可以通过 sync.WaitGroup 将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。
WaitGroup 等待多个 Goroutine
结构体
sync.WaitGroup 结构体中只包含两个成员变量:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
- noCopy — 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
- state1 — 存储着状态和信号量;
sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法
sync.WaitGroup 结构体中包含一个总共占用 12 字节的数组,这个数组会存储当前结构体的状态,在 64 位与 32 位的机器上表现也非常不同。
WaitGroup 在 64 位和 32 位机器的不同状态
- 64 位机器上本身就能保证 64 位对齐,所以按照 64 位对齐来取数据,拿到 state1[0], state1[1] 本身就是64 位对齐的。但是 32 位机器上并不能保证 64 位对齐,因为 32 位机器是 4 字节对齐,如果也按照 64 位机器取 state[0],state[1] 就有可能会造成 atmoic 的使用错误。
- 32 位机器上空出第一个 32 位,也就使后面 64 位天然满足 64 位对齐,第一个 32 位放入 sema 刚好合适
WaitGroup.state1 其实代表三个字段:counter,waiter,sema
- counter :可以理解为一个计数器,计算经过 wg.Add(N), wg.Done() 后的值。
- waiter :当前等待 WaitGroup 任务结束的等待者数量。其实就是调用 wg.Wait() 的次数,所以通常这个值是 1 。
- sema : 信号量,用来唤醒 Wait() 函数。
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能够帮我们从 state1 字段中取出它的状态和信号量。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
接口
sync.WaitGroup 对外暴露了三个方法 — sync.WaitGroup.Add、sync.WaitGroup.Wait 和 sync.WaitGroup.Done.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//更新worker计数器
state := atomic.AddUint64(statep, uint64(delta)<<32)
//worker计数器:v 是 statep *uint64 的左32位
//waiter计数器:w 是 statep *uint64 的右32位
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// worker 大于 0 或者 waiter 等于 0 说明还有Goroutine没有执行完,直接返回
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 状态设置为0
*statep = 0
// 通过 sync.runtime_Semrelease 唤醒处于等待状态的 Goroutine(唤醒 Wait())
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的计数器 counter。
虽然 sync.WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数就会发生程序崩溃。
当调用计数器归零,即所有任务都执行完成时,才会通过 sync.runtime_Semrelease 唤醒处于等待状态的 Goroutine。
sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
sync.WaitGroup 的另一个方法 sync.WaitGroup.Wait 会在计数器大于 0 并且不存在等待的 Goroutine 时,调用 runtime.sync_runtime_Semacquire 陷入睡眠。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 循环判断 是否满足退出条件
for {
//worker计数器:v 是 statep *uint64 的左32位
//waiter计数器:w 是 statep *uint64 的右32位
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 当计数器为0时,表示所有Goroutine 都执行完成,立即返回
if v == 0 {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 更新waiter计数器 atomic.CompareAndSwapUint64 对uint64值执行比较和交换操作
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
// Goroutine 进入睡眠状态,等待信号量唤醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
当 sync.WaitGroup 的计数器归零时,陷入睡眠状态的 Goroutine 会被唤醒,上述方法也会立刻返回。
通过对 sync.WaitGroup 的分析和研究,我们能够得出以下结论:
- sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
- sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;
- 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;
once
Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次
结构体
每一个 sync.Once 结构体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex:
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do 是 sync.Once 结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:
- 如果传入的函数已经执行过,会直接返回;
- 如果传入的函数没有执行过,会调用 sync.Once.doSlow 执行传入的函数:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 为当前 Goroutine 获取互斥锁;
- 执行传入的无入参函数;
- 运行延迟函数调用,将成员变量 done 更新成 1;
sync.Once 会通过成员变量 done 确保函数不会执行第二次。
作为用于保证函数执行次数的 sync.Once 结构体,它使用互斥锁和 sync/atomic 包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:
- sync.Once.Do方法中传入的函数只会被执行一次,哪怕函数中发生了 panic;
- 两次调用 sync.Once.Do 方法传入不同的函数只会执行第一次调传入的函数;
Cond
sync.Cond 用来协调想要访问共享资源的 goroutine。
sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
使用场景
- 有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。
- 这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事
结构体
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
- noCopy — 用于保证结构体不会在编译期间拷贝;
- copyChecker — 用于禁止运行期间发生的拷贝;
- L — 用于保护内部的 notify 字段,Locker 接口类型的变量;
- notify — 一个 Goroutine 的链表,它是实现同步机制的核心结构;
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
在 sync.notifyList 结构体中,head 和 tail 分别指向的链表的头和尾,wait 和 notify 分别表示当前正在等待的和已经通知到的 Goroutine 的索引。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
newCond
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
NewCond 创建 Cond 实例时,需要关联一个锁。
接口
wait
sync.Cond 对外暴露的 sync.Cond.Wait 方法会将当前 Goroutine 陷入休眠状态,它的执行过程分成以下两个步骤:
- 调用 runtime.notifyListAdd 将等待计数器加一并解锁;
- 调用 runtime.notifyListWait 等待其他 Goroutine 的唤醒并加锁:
func (c *Cond) Wait() {
// 检查c是否是被复制的,如果是就panic
c.checker.check()
// 将当前goroutine加入等待队列
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 等待队列中的所有的goroutine执行等待唤醒操作
runtime_notifyListWait(&c.notify, t)
// 锁
c.L.Lock()
}
调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。
如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。
runtime.notifyListWait 会获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
除了将当前 Goroutine 追加到链表的末端之外,我们还会调用 runtime.goparkunlock 将当前 Goroutine 陷入休眠,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。
sync.Cond.Signal 和 sync.Cond.Broadcast 就是用来唤醒陷入休眠的 Goroutine 的方法,它们的实现有一些细微的差别:
- sync.Cond.Signal 方法会唤醒队列最前面的 Goroutine;
- sync.Cond.Broadcast 方法会唤醒队列中全部的 Goroutine;
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。
runtime.notifyListNotifyOne 只会从 sync.notifyList 链表中找到满足 sudog.ticket == l.notify 条件的 Goroutine 并通过 runtime.readyWithTime 唤醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll 会依次通过 runtime.readyWithTime 唤醒链表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。
在一般情况下,我们都会先调用 sync.Cond.Wait 陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。
Cond使用例子
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:
- sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
- sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
- sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;