谈谈自己对GO的Mutex的理解
_本文原始标题:谈谈自己对GO的Mutex的理解

文章图片
作者:iuoui
来源:SegmentFault思否
目前GO已经更新到了1.14的版本
咱们一般人如果直接去看mutex的源码的话 , 其实是比较难理解为什么写成了现在这个样子 , 尤其是加锁里面的各种逻辑判断太多了 , 各种位运算一脸懵逼 , 其实我们只要掌握它最初的设计思想 , 那么后面新增的逻辑 , 理解起来都很简单了 。
Mutex最初版本
Mutex第一版代码加上注释不过才109行 。 非常精简,下面介绍一下我对第一版Mutex源码的理解
//Mutex有state和sema两个成员变量 , 这一点是在1.14没有变化的//其中state字段代表当前锁的状态 , sema是控制锁状态的信号量 , 主要关注state就行////state比较复杂 , state一共32位//最低位代表locked状态 , 0表示未上锁 , 1表示上锁//倒数第二位woken状态 , 0表示未唤醒 , 1表示已唤醒//剩余30位用于表示当前有多少个goroutine等待互斥锁的释放 , 代表最多支持2^30个goroutinetypeMutexstruct{stateint32semauint32}我们接下来看它的Lock方法
func(m*Mutex)Lock{//首先直接CAS尝试获取锁ifatomic.CompareAndSwapInt32(&m.state,0,mutexLocked){ifraceenabled{raceAcquire(unsafe.Pointer(m))}//上锁成功后 , 直接返回return}//CAS获取锁失败//awoke默认是未唤醒状态awoke:=falsefor{//当前state赋值给oldold:=m.state//给old上锁new:=old|mutexLocked//如果old本身就已经上了锁的话ifold&mutexLocked!=0{//goroutine等待数+1new=old+1<<mutexWaiterShift}//如果当前g被唤醒了ifawoke{//把woken标记清除掉new&^=mutexWoken}//更新一下当前锁的状态ifatomic.CompareAndSwapInt32(&m.state,old,new){//如果old本身就是解锁状态ifold&mutexLocked==0{//那么代表抢锁成功,直接退出for循环break}//不是解锁状态//尝试获取信号量 , 进入等待队列 , 等待被唤醒runtime_Semacquire(&m.sema)//被唤醒 , awoke设置true , 继续for循环awoke=true}}
ifraceenabled{raceAcquire(unsafe.Pointer(m))}}
简单总结一下Lock的逻辑 , 分几种情况说明一下
第一种情况:第一次上锁的时候 , 直接走第一步CAS上锁 , 成功返回
第二种情况:Mutex已经被另一个g上锁 , 那么state的g等待数+1 , 更新当前的锁状态 , 然后就进入队列 , 等待被唤醒 , 等到另个一g调用了Unlock方法之后 , 当前g被唤醒 , 然后设置awoken=true , 再执行一遍for循环 , 此时locked位就是未上锁状态(0) , new就是代表上锁 , 然后清除woken位 , 然后再CAS更新new到state上 , 因为之前的锁是未上锁状态 , 那么就代表抢锁成功 , break , 返回
第三种情况:和第二种一样 , 只不过 , 在CAS更新new到state上时 , 有其他g先改掉了state的值 , 那么就继续for循环 , 然后重复到第二种情况 。
接下来看下Unlock方法
func(m*Mutex)Unlock{ifraceenabled{_=m.stateraceRelease(unsafe.Pointer(m))}//一开始也是直接去掉加锁状态new:=atomic.AddInt32(&m.state,-mutexLocked)//判断一下是否解锁了一个未加锁的Mutexif(new+mutexLocked)&mutexLocked==0{//直接panicpanic("sync:unlockofunlockedmutex")}
//把解锁后的值赋值给oldold:=newfor{//如果此时没有需要等待获取锁的G//或者当前Mutex已经被抢锁成功或者已经有被唤醒的G , 那么就可以直接返回ifold>>mutexWaiterShift==0||old&(mutexLocked|mutexWoken)!=0{return}
//g等待数-1 , 然后设置唤醒标记位new=(old-1<<mutexWaiterShift)|mutexWoken//更新Mutex的state的值ifatomic.CompareAndSwapInt32(&m.state,old,new){//手动唤醒一个被runtime_Semacquire阻塞的Gruntime_Semrelease(&m.sema)//返回return}//更新state失败 , 说明有其他G修改了state的值 , 那么 , 重新赋值一下 , 再进行下一次循环old=m.state}}
Unlock要比Lock简单很多 , 所以这里不总结了 , 看注释就能明白
到这里 , 最初版本的Mutex源码已经分析完了 , 关键还是在上锁的方法里面 。 上锁逻辑非常简单粗暴 , 直接CAS获取锁 , 失败就G等待数+1 , 然后进入队列 , 等待被唤醒 。
那么 , 如果仔细想想 , 就会发现性能上还是有可以改进的地方 。
我们应用Mutex的时候 , 肯定把锁粒度控制的越小越好 , 那么这样的话就很可能会出现这么一个问题 , 当第一次上锁CAS失败的时候 , mutex已经被其他G解锁了 , 但是当前G就还是直接进入队列 , 等待被唤醒 , 这样的话其实就会带来额外的调度开销 。
所以 , Mutex后面引进了自旋锁的概念自旋锁提交代码
Mutex引入自旋锁
Currentlysync.Mutexisfullycooperative.Thatis,oncecontentionisdiscovered,
thegoroutinecallsintoscheduler.Thisissuboptimalastheresourcecanbecome
freesoonafter(especiallyifcriticalsectionsareshort).Serversoftware
usuallyrunsat~~50%CPUutilization,thatis,switchingtoothergoroutines
isnotnecessaryprofitable.
Thischangeaddslimitedactivespinningtosync.Mutexif:
runningonamulticoremachineand
GOMAXPROCS>1and
thereisatleastoneotherrunningPand
localrunqisempty.Asopposedtoruntimemutexwedon'tdopassivespinning,becausetherecanbeworkonglobalrunqononother
Ps.
简单概括一下 , 就是为了解决锁粒度非常小的时候 , 给系统带来的不必要的调度开销
不过自旋要先满足几个条件
【谈谈自己对GO的Mutex的理解】首先程序要跑在多核的机器上 , 然后GOMAXPROCS要大于1 , 并且此时有至少一个P的localrunq是空的 , 才能进入到自旋的状态
自旋是一种多线程同步机制 , 当前的进程在进入自旋的过程中会一直保持CPU的占用 , 持续检查某个条件是否为真 。 在多核的CPU上 , 自旋可以避免Goroutine的切换 , 使用恰当会对性能带来很大的增益 , 但是使用的不恰当就会拖慢整个程序 , 所以Goroutine进入自旋的条件非常苛刻
看一下更新之后的Lock方法
func(m*Mutex)Lock{//Fastpath:grabunlockedmutex.ifatomic.CompareAndSwapInt32(&m.state,0,mutexLocked){ifraceenabled{raceAcquire(unsafe.Pointer(m))}return}awoke:=falseiter:=0//自旋的次数(<=4)for{old:=m.statenew:=old|mutexLocked//没有解锁ifold&mutexLocked!=0{//判断是否满足自旋的状态ifruntime_canSpin(iter){//当woken标记位没有被设置 , 而且等待G数量不等于0 , 并设置woken标记位成功//这里设置woken标记位的原因是 , 通知Unlock不用去唤醒等待队列里面的G了if!awoke&&old&mutexWoken==0&&old>>mutexWaiterShift!=0&&atomic.CompareAndSwapInt32(&m.state,old,old|mutexWoken){//标记awoke=trueawoke=true}//runtime_doSpin->sync_runtime_doSpin//每次自旋30个时钟周期 , 最多120个周期runtime_doSpiniter++//再次执行for循环continue}//自旋结束之后 , G等待数量+1new=old+1<<mutexWaiterShift}ifawoke{//这里多了个判断woken状态不一致的逻辑ifnew&mutexWoken==0{panic("sync:inconsistentmutexstate")}new&^=mutexWoken}ifatomic.CompareAndSwapInt32(&m.state,old,new){ifold&mutexLocked==0{break}runtime_Semacquire(&m.sema)awoke=trueiter=0//重置iter}}
ifraceenabled{raceAcquire(unsafe.Pointer(m))}}
相比于第一版的Mutex , 这里只在加锁的方法里面增加了自旋锁的逻辑
当Mutex已经上锁的时候 , 当前G在满足自旋条件下 , 进入自旋状态 , 在自旋中 , 其他G解锁了Mutex , 那么当前G就设置了woken标记位 , 这样其他G在Unlock的时候就不会去等待队列里面唤醒G了 , 然后当前G就顺理成章的抢到了锁
这样自旋锁在锁粒度非常小的场景下的能对其性能带来一定的优化 。
引入自旋锁之后 , 又带来了一个问题 。 就是G等待队列的长尾问题 。 因为从等待队列里面被唤醒 , 然后再去抢锁 , 对本身就在执行的G来说 , 被唤醒的G其实是很难抢过当前执行的G的 , 这样的话 , 等待队列里面的G , 就会被饿死(长时间获取不到锁) , 这样对等待队列的G来说其实是不公平的 。
所以Mutex后面引入了饥饿模式饥饿模式代码
Mutex引入饥饿模式
本次代码变动还是挺大的
先看下提交者的介绍
AddnewstarvationmodeforMutex.
Instarvationmodeownershipisdirectlyhandedofffrom
unlockinggoroutinetothenextwaiter.Newarrivinggoroutines
don'tcompeteforownership.
Unfairwaittimeisnowlimitedto1ms.
Alsofixalongstandingbugthatgoroutineswererequeued
atthetailofthewaitqueue.Thatleadtoevenmoreunfair
acquisitiontimeswithmultiplewaiters.
Performanceofnormalmodeisnotconsiderablyaffected.
简单概括一下 , 就是解决了等待G队列的长尾问题
饥饿模式下 , 直接由unlock把锁交给等待队列中排在第一位的G , 同时 , 饥饿模式下 , 新进来的G不会参与抢锁也不会进入自旋状态 , 会直接进入等待队列的尾部 。
饥饿模式的触发条件 , 当一个G等待锁时间超过1毫秒时 , Mutex切换到饥饿模式
饥饿模式的取消条件 , 当一个G获取到锁且在等待队列的末尾 , 或者这个G获取锁的等待时间在1ms内 , 那么Mutex切换回正常模式
带来的改变
Mutex.state的倒数第三位 , 变成了mutexStarving标记位 , 0表示正常模式 , 1表示饥饿模式 , 与此同时 , 支持的最大等待G数量从2^30^个变成了2^29^个
接下来还是主要关注Lock方法,我只在新增的逻辑上添加注释了 , 我直接贴1.14的Lock代码 , 较1.9的版本没什么改变
func(m*Mutex)Lock{//Fastpath:grabunlockedmutex.ifatomic.CompareAndSwapInt32(&m.state,0,mutexLocked){ifrace.Enabled{race.Acquire(unsafe.Pointer(m))}return}//Slowpath(outlinedsothatthefastpathcanbeinlined)//这里封装了一下m.lockSlow}func(m*Mutex)lockSlow{varwaitStartTimeint64starving:=false//默认是正常模式awoke:=falseiter:=0old:=m.statefor{//当前Mutex在饥饿模式下已经被锁了的话 , 当前G不进入自旋//只有Mutex在正常模式且被锁了的情况下 , 并且满足自旋的条件 , 才会进入到自旋逻辑里面ifold&(mutexLocked|mutexStarving)==mutexLocked&&runtime_canSpin(iter){//Activespinningmakessense.//TrytosetmutexWokenflagtoinformUnlock//tonotwakeotherblockedgoroutines.if!awoke&&old&mutexWoken==0&&old>>mutexWaiterShift!=0&&atomic.CompareAndSwapInt32(&m.state,old,old|mutexWoken){awoke=true}runtime_doSpiniter++old=m.statecontinue}new:=old//如果当前不是饥饿模式ifold&mutexStarving==0{//加锁new|=mutexLocked}//如果Mutex已经被锁 , 或者是在饥饿模式ifold&(mutexLocked|mutexStarving)!=0{//等待的G数量+1new+=1<<mutexWaiterShift}//Thecurrentgoroutineswitchesmutextostarvationmode.//Butifthemutexiscurrentlyunlocked,don'tdotheswitch.//Unlockexpectsthatstarvingmutexhaswaiters,whichwillnot//betrueinthiscase.//如果已经是饥饿模式 , 并且Mutex是被锁的状态ifstarving&&old&mutexLocked!=0{//切换成饥饿模式new|=mutexStarving}ifawoke{//Thegoroutinehasbeenwokenfromsleep,//soweneedtoresettheflagineithercase.ifnew&mutexWoken==0{throw("sync:inconsistentmutexstate")}new&^=mutexWoken}//更新state值ifatomic.CompareAndSwapInt32(&m.state,old,new){//非饥饿模式下抢锁成功ifold&(mutexLocked|mutexStarving)==0{//退出break//lockedthemutexwithCAS}//Ifwewerealreadywaitingbefore,queueatthefrontofthequeue.//如果之前已经设置过waitStartTime的话 , queueLifo就是true了queueLifo:=waitStartTime!=0//没有设置过 , 获取下运行时间ifwaitStartTime==0{waitStartTime=runtime_nanotime}//阻塞 , 等待被唤醒runtime_SemacquireMutex(&m.sema,queueLifo,1)//如果等待时间超过1ms , 设置starving=true , 否则就是falsestarving=starving||runtime_nanotime-waitStartTime>starvationThresholdNsold=m.state//如果Mutex已经是饥饿模式ifold&mutexStarving!=0{//Ifthisgoroutinewaswokenandmutexisinstarvationmode,//ownershipwashandedofftousbutmutexisinsomewhat//inconsistentstate:mutexLockedisnotsetandwearestill//accountedaswaiter.Fixthat.//如果当前G是在饥饿模式下被唤醒的//加个判断state是否正确设置的逻辑ifold&(mutexLocked|mutexWoken)!=0||old>>mutexWaiterShift==0{throw("sync:inconsistentmutexstate")}//delta=-7(1.....0111)delta:=int32(mutexLocked-1<<mutexWaiterShift)if!starving||old>>mutexWaiterShift==1{//退出饥饿模式delta-=mutexStarving}//更新stateatomic.AddInt32(&m.state,delta)break}awoke=trueiter=0}else{old=m.state}}
ifrace.Enabled{race.Acquire(unsafe.Pointer(m))}}
Unlock方法改动就非常小了
func(m*Mutex)Unlock{ifrace.Enabled{_=m.staterace.Release(unsafe.Pointer(m))}//Fastpath:droplockbit.new:=atomic.AddInt32(&m.state,-mutexLocked)ifnew!=0{//Outlinedslowpathtoallowinliningthefastpath.//TohideunlockSlowduringtracingweskiponeextraframewhentracingGoUnblock.m.unlockSlow(new)}}
func(m*Mutex)unlockSlow(newint32){if(new+mutexLocked)&mutexLocked==0{throw("sync:unlockofunlockedmutex")}//不是饥饿模式ifnew&mutexStarving==0{old:=newfor{//G等待队列==0 , 直接返回//(或者 , 处于woken模式 , 直接返回//或者 , 处于locked模式 , 直接返回//或者处于饥饿模式 , 直接返回)ifold>>mutexWaiterShift==0||old&(mutexLocked|mutexWoken|mutexStarving)!=0{return}new=(old-1<<mutexWaiterShift)|mutexWokenifatomic.CompareAndSwapInt32(&m.state,old,new){runtime_Semrelease(&m.sema,false,1)return}old=m.state}}else{//唤醒G等待队列的首个Gruntime_Semrelease(&m.sema,true,1)}}
总结
Mutex经过两次演进 , 都解决了不同的问题 。 Mutex用法非常简单 , 里面的原理不感兴趣的话其实没必要深究 , 知道个大概的逻辑就行了 。
补充:
mutex的等待G队列的顺序是FIFO
饥饿模式下 , 性能其实很低 , 主要就是为了解决长尾问题的
SegmentFault思否社区和文章作者展开更多互动和交流 。
推荐阅读
- 再忆两岸战疫共情故事 台湾青年:相信自己是那打败黑暗的一道光
- 本月底,对爱情从来不会强求,即使失败,也能坦然面对的4大星座
- 星车记|9万买了一辆宝马5系,本以为自己捡到宝,拆开底盘一看维修工懵了
- 星车记|奥迪Q5跟宝马X3停一起,对比后,网友:要是你,你会怎么选择呢?
- 青岛自驾游车友会|热死了,车子的空调怎么用?老司机:空调用不对,油耗高一倍
- 平台企业|双龙交警、运管联手出击!!对机场路上不文明行为实施“零”容忍
- 谷歌|毫不客气!谷歌对澳大利亚发表强硬公开信
- 商务部|商务部:对澳大利亚进口相关葡萄酒进行反倾销立案调查
- 阿里巴巴|美商务部长:美国没有对阿里巴巴采取任何正式行动
- 澜沧|营收额占比仅0.13% 澜沧古茶拿什么与对手抗衡?
