Go创建协程的过程

本文阅读 6 分钟
首页 golang 正文
题目来源:虾皮

答案:小禾先生

想要启动一个新的 Goroutine 来执行任务时,我们需要使用 Go 语言的 go 关键字,编译器会通过 cmd/compile/internal/gc.state.stmtcmd/compile/internal/gc.state.call 两个方法将该关键字转换成 runtime.newproc 函数调用:

func (s *state) call(n *Node, k callKind) *ssa.Value {
    if k == callDeferStack {
        ...
    } else {
        switch {
        case k == callGo:
            call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())
        default:
        }
    }
    ...
}

runtime.newproc 的入参是参数大小和表示函数的指针 funcval,它会获取 Goroutine 以及调用方的程序计数器,然后调用 runtime.newproc1 函数获取新的 Goroutine 结构体、将其加入处理器的运行队列并在满足条件时调用 runtime.wakep 唤醒新的处理执行 Goroutine:

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
        newg := newproc1(fn, argp, siz, gp, pc)

        _p_ := getg().m.p.ptr()
        runqput(_p_, newg, true)

        if mainStarted {
            wakep()
        }
    })
}

runtime.newproc1 会根据传入参数初始化一个 g 结构体,我们可以将该函数分成以下几个部分介绍它的实现:

  1. 获取或者创建新的 Goroutine 结构体;
  2. 将传入的参数移到 Goroutine 的栈上;
  3. 更新 Goroutine 调度相关的属性;

首先是 Goroutine 结构体的创建过程:

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
    _g_ := getg()
    siz := narg
    siz = (siz + 7) &^ 7

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg)
    }
    ...

上述代码会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体。

接下来,我们会调用 runtime.memmovefn 函数的所有参数拷贝到栈上,argpnarg 分别是参数的内存空间和大小,我们在该方法中会将参数对应的内存空间整块拷贝到栈上:

    ...
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
    totalSize += -totalSize & (sys.SpAlign - 1)
    sp := newg.stack.hi - totalSize
    spArg := sp
    if narg > 0 {
        memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
    }
    ...

拷贝了栈上的参数之后,runtime.newproc1 会设置新的 Goroutine 结构体的参数,包括栈指针、程序计数器并更新其状态到 _Grunnable 并返回:

    ...
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.startpc = fn.fn
    casgstatus(newg, _Gdead, _Grunnable)
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    return newg
}

我们在分析 runtime.newproc 的过程中,保留了主干省略了用于获取结构体的 runtime.gfgetruntime.malg、将 Goroutine 加入运行队列的 runtime.runqput 以及设置调度信息的过程,下面会依次分析这些函数。

初始化结构体

runtime.gfget 通过两种不同的方式获取新的 runtime.g

  1. 从 Goroutine 所在处理器的 gFree 列表或者调度器的 sched.gFree 列表中获取 runtime.g
  2. 调用 runtime.malg 生成一个新的 runtime.g 并将结构体追加到全局的 Goroutine 列表 allgs 中。

golang-newproc-get-goroutine

runtime.gfget 中包含两部分逻辑,它会根据处理器中 gFree 列表中 Goroutine 的数量做出不同的决策:

  1. 当处理器的 Goroutine 列表为空时,会将调度器持有的空闲 Goroutine 转移到当前处理器上,直到 gFree 列表中的 Goroutine 数量达到 32;
  2. 当处理器的 Goroutine 数量充足时,会从列表头部返回一个新的 Goroutine;
func gfget(_p_ *p) *g {
retry:
    if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
        for _p_.gFree.n < 32 {
            gp := sched.gFree.stack.pop()
            if gp == nil {
                gp = sched.gFree.noStack.pop()
                if gp == nil {
                    break
                }
            }
            _p_.gFree.push(gp)
        }
        goto retry
    }
    gp := _p_.gFree.pop()
    if gp == nil {
        return nil
    }
    return gp
}

当调度器的 gFree 和处理器的 gFree 列表都不存在结构体时,运行时会调用 runtime.malg 初始化新的 runtime.g 结构,如果申请的堆栈大小大于 0,这里会通过 runtime.stackalloc 分配 2KB 的栈空间:

func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        stacksize = round2(_StackSystem + stacksize)
        newg.stack = stackalloc(uint32(stacksize))
        newg.stackguard0 = newg.stack.lo + _StackGuard
        newg.stackguard1 = ^uintptr(0)
    }
    return newg
}

runtime.malg 返回的 Goroutine 会存储到全局变量 allgs 中。

简单总结一下,runtime.newproc1 会从处理器或者调度器的缓存中获取新的结构体,也可以调用 runtime.malg 函数创建。

运行队列

runtime.runqput 会将 Goroutine 放到运行队列上,这既可能是全局的运行队列,也可能是处理器本地的运行队列:

func runqput(_p_ *p, gp *g, next bool) {
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        gp = oldnext.ptr()
    }
retry:
    h := atomic.LoadAcq(&_p_.runqhead)
    t := _p_.runqtail
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1)
        return
    }
    if runqputslow(_p_, gp, h, t) {
        return
    }
    goto retry
}
  1. nexttrue 时,将 Goroutine 设置到处理器的 runnext 作为下一个处理器执行的任务;
  2. nextfalse 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;
  3. 当处理器的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;

处理器本地的运行队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。

golang-runnable-queue

简单总结一下,Go 语言有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列。

调度信息

运行时创建 Goroutine 时会通过下面的代码设置调度相关的信息,前两行代码会分别将程序计数器和 Goroutine 设置成 runtime.goexit 和新创建 Goroutine 运行的函数:

    ...
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    ...

上述调度信息 sched 不是初始化后的 Goroutine 的最终结果,它还需要经过 runtime.gostartcallfnruntime.gostartcall 的处理:

func gostartcallfn(gobuf *gobuf, fv *funcval) {
    gostartcall(gobuf, unsafe.Pointer(fv.fn), unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
    sp := buf.sp
    if sys.RegSize > sys.PtrSize {
        sp -= sys.PtrSize
        *(*uintptr)(unsafe.Pointer(sp)) = 0
    }
    sp -= sys.PtrSize
    *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
    buf.sp = sp
    buf.pc = uintptr(fn)
    buf.ctxt = ctxt
}

参考资料

本文来自投稿,不代表本站立场,如若转载,请注明出处:
了解中间件吗?有什么好处?
« 上一篇 09-17
Go 高并发的特点
下一篇 » 09-17

发表评论

发表评论