goroutine的暂停和恢复源码剖析

上一节《GC 对根对象扫描实现的源码分析》中,我们提到过在GC的时候,会对一些goroutine 栈扫描时会扫描前台触发 G 的暂停(suspendG)和恢复(resumeG)。

// markroot scans the i'th root.
//
// Preemption must be disabled (because this uses a gcWork).
//
// nowritebarrier is only advisory here.
//
//go:nowritebarrier
func markroot(gcw *gcWork, i uint32) {
    baseFlushCache := uint32(fixedRootCount)
    baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
    baseBSS := baseData + uint32(work.nDataRoots)
    baseSpans := baseBSS + uint32(work.nBSSRoots)
    baseStacks := baseSpans + uint32(work.nSpanRoots)
    end := baseStacks + uint32(work.nStackRoots)

    // Note: if you add a case here, please also update heapdump.go:dumproots.
    switch {
        ......

    default:
        var gp *g
        if baseStacks <= i && i < end {
            gp = allgs[i-baseStacks]
        } else {
            throw("markroot: bad index")
        }

        status := readgstatus(gp) // We are not in a scan state
        if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
            gp.waitsince = work.tstart
        }

        // scanstack must be done on the system stack in case
        // we're trying to scan our own stack.
        systemstack(func() {
            userG := getg().m.curg
            selfScan := gp == userG && readgstatus(userG) == _Grunning
            if selfScan {
                casgstatus(userG, _Grunning, _Gwaiting)
                userG.waitreason = waitReasonGarbageCollectionScan
            }

            // TODO: suspendG blocks (and spins) until gp
            // stops, which may take a while for
            // running goroutines. Consider doing this in
            // two phases where the first is non-blocking:
            // we scan the stacks we can and ask running
            // goroutines to scan themselves; and the
            // second blocks.
            stopped := suspendG(gp)
            if stopped.dead {
                gp.gcscandone = true
                return
            }
            if gp.gcscandone {
                throw("g already scanned")
            }
            scanstack(gp, gcw)
            gp.gcscandone = true
            resumeG(stopped)

            if selfScan {
                casgstatus(userG, _Gwaiting, _Grunning)
            }
        })
    }
}

那么它在暂停和恢复一个goroutine时都做了些什么工作呢,今天我们通过源码来详细看一下。 go version 1.16.2

G的抢占

一个G可以在任何 安全点(safe-point) 被抢占,目前安全点可以分为以下几类:

  1. 阻塞安全点出现在 goroutine 被取消调度、同步阻塞或系统调用期间;
  2. 同步安全点出现在运行goroutine检查抢占请求时;
  3. 异步安全点出现在用户代码中的任何指令上,其中G可以安全的暂停且可以保证堆栈和寄存器扫描找到 stack root(这个很重要,GC扫描开始的地方)。runtime 可以通过一个信号在一个异步安全点暂停一个G。

这里将安全点分为 阻塞安全点同步安全点异步安全点,每种安全点都出现在不同的场景。

阻塞安全点和同步安全点,一个G的CPU状态是最小的(无法理解这里最小的意思)。垃圾回收器拥有整个stack的完整信息。这样就有可能使用最小的空间重新调度G,并精确的扫描G的 栈。

同步安全点是通过在重载函数序言中stack bound check(栈边界检查)实现的。在下一个同步安全点抢占G,runtime 在G的 stack绑定一个值,该值将导致下一个 stack bound check 失败,从而进入栈的增涨实现,此实现将检测到它实际上是抢占并重写向到抢占处理逻辑。

异步安全点抢占是通过操作系统(如:信号)挂起一个线程并检查它的状态以确定G是否处于一个异步安全点。由于挂起线程本身是异步的,它将检查运行的G是否需要被抢占,这将引起一些改变。如果所有条件都满足,它将调整信号上下文,使其看起来像刚刚发起调用的asyncPreempt(异步抢占)信号线程并恢复此线程。asyncPreempt溢出所有寄存器并进入调度程序。

(另一种方法是抢占信号处理程序本身。这将允许操作系统保存和恢复寄存器状态,运行时只需要知道如何从信号上下文中提取可能包含指针的寄存器。但是,这将为每个抢占的G消耗一个M,并且调度器本身并不是设计为从信号处理程序运行的,因为它倾向于在抢占路径中分配内存和启动线程)

暂停状态

在G的暂停状态没有使用一个单独的变量来表示,而是通过一个 suspendGState 的结构体来表示。

type suspendGState struct {
    g *g
    dead bool
    stopped bool
}

字段意义:

  • g 表示当前暂停的G,将其放在状态结构体中,这样直需要什么一个结构体就可以了,不需要再单独占用一个参数来表示暂停的哪个G;
  • dead 表示当前G并没有暂停,而是处于 _Gdead 状态。这个 G 可以以后被复用,因为调用者不能一直认为它是 _Gdead 状态,见G的状态流转图;
  • stopped 表示通过 g.preemptStop 将G转换为 _Gwaiting 状态,因此负责在完成时做好准备

暂停G (suspendG)

在安全点暂停G将返回一个 suspendGState 结构体的状态值,调用者在此期间将一直拥有此G的读权限,直到恢复 resumeG 为止。

多个调用者在同一时间试图suspend同一个G时,它是安全的。goroutine 可以在后续成功挂起操作之间执行。当前实现授予对G的独占访问权限,所以多个调用者将会序列化。但是,其目的是授予共享read权限,所以不要依赖独占访问。

suspend操作必须在系统栈执行,并且在M(如果有的话)上的用户goroutine必须处于一个可抢占的状态。这样可以防止两个goroutine试图互相挂起并且都处于非抢占状态时出现死锁。有其它的方式来解决这个死锁,但看起来非常的简单。

// go:systemstack
func suspendG(gp *g) suspendGState {
    // 当前暂停的G正是自己,且自己还处于_Grunning,直接抛出异常
    if mp := getg().m; mp.curg != nil && readgstatus(mp.curg) == _Grunning {
        throw("suspendG from non-preemptible goroutine")
    }

    const yieldDelay = 10 * 1000
    var nextYield int64

    stopped := false
    var asyncM *m
    var asyncGen uint32
    var nextPreemptM int64
    for i := 0; ; i++ {
        switch s := readgstatus(gp); s {
        default:
            if s&_Gscan != 0 {
                break
            }

            dumpgstatus(gp)
            throw("invalid g status")
        case _Gdead:
            return suspendGState{dead: true}
        case _Gcopystack:

        case _Gpreempted:
            if !casGFromPreempted(gp, _Gpreempted, _Gwaiting) {
                break
            }

            stopped = true

            s = _Gwaiting
            fallthrough
        case _Grunnable, _Gsyscall, _Gwaiting:
            if !castogscanstatus(gp, s, s|_Gscan) {
                break
            }

            gp.preemptStop = false
            gp.preempt = false
            gp.stackguard0 = gp.stack.lo + _StackGuard
            return suspendGState{g: gp, stopped: stopped}
        case _Grunning:
            if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
                break
            }

            // Temporarily block state transitions.
            if !castogscanstatus(gp, _Grunning, _Gscanrunning) {
                break
            }

            // Request synchronous preemption.
            gp.preemptStop = true
            gp.preempt = true
            gp.stackguard0 = stackPreempt

            // Prepare for asynchronous preemption.
            asyncM2 := gp.m
            asyncGen2 := atomic.Load(&asyncM2.preemptGen)
            needAsync := asyncM != asyncM2 || asyncGen != asyncGen2
            asyncM = asyncM2
            asyncGen = asyncGen2

            casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)

            if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
                now := nanotime()
                if now >= nextPreemptM {
                    nextPreemptM = now + yieldDelay/2
                    preemptM(asyncM)
                }
            }
        }

        if i == 0 {
            nextYield = nanotime() + yieldDelay
        }
        if nanotime() < nextYield {
            procyield(10)
        } else {
            osyield()
            nextYield = nanotime() + yieldDelay/2
        }
    }

整体流程是通过一个 for 方法,不断的检查G的状态并在合适的机会返回suspendGState。

  • _Gdead 已处于 dead状态,直接返回 suspendGState{dead: true},注意这时没有g;
  • _Gcopystack 处于复制stack状态,当前处于栈的扩容或缩减,继续等待直到完成;
  • _Gpreempted 可抢占状态;将G变为 _Gwaiting 状态,同时设置变量 stopped=true。继续等待;
  • _Grunnable, _Gsyscall, _Gwaiting : 标记为扫描状态;取消抢占请求等,返回 suspendGState{g: gp, stopped: true};
  • _Grunning 这里指非当前G的运行状态; 先将 _Grunning 变为 _Gscanrunning;设置同步抢占标记并做一些抢占准备,再恢复 _Grunning 状态;最后再发送异步抢占

这里提到过几个与转换G状态的函数,如casfrom_Gscanstatus()castogscanstatus()casGFromPreempted()

恢复G (resumeG)

所谓恢复G就是指暂停的撤销,允许暂停的G从当前 安全点(safe-point) 继续执行。

func resumeG(state suspendGState) {
    if state.dead {
        // We didn't actually stop anything.
        return
    }
    gp := state.g
    switch s := readgstatus(gp); s {
    default:
        dumpgstatus(gp)
        throw("unexpected g status")

    case _Grunnable | _Gscan,
        _Gwaiting | _Gscan,
        _Gsyscall | _Gscan:
        casfrom_Gscanstatus(gp, s, s&^_Gscan)
    }

    if state.stopped {
        // We stopped it, so we need to re-schedule it.
        ready(gp, 0, true)
    }
}

主要是最后一句,调用 ready() ,将其G设置为运行_Grunnable 状态,这样就可以立即在下次被执行。

总结

可以看到对G的暂停和恢复,其实是对G 的状态进行改变。对于suspend操作只会在安全点才会发生,它会一直重试尝试着修改G的状态,同时会对一些抢占标记做一些修改直到修改成功为止。

参考资料

由于个人能力有限,文章中若有错误,可以联系本人。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341

推荐阅读更多精彩内容