Go调度器学习之协作与抢占详解
0. 简介
在上篇博客——《golang调度器(4)—goroutine调度》中一直遗留了一个没有解答的问题:如果某个G执行时间过长,其他的G如何才能被正常调度,这就引出了接下来的话题:协作与抢占。
在Go语言的v1.2
版本就实现饿了基于协作的抢占式调用,这种调用的基本原理就是:
- 当
sysmon
监控线程发现有协程的执行时间太长了,那么会友好地为这个协程设置抢占标记; - 当这个协程
调用(call)
一个函数时,会检查是否扩容栈,而这里就会检查抢占标记,如果被标记,则会让出CPU,从而实现调度。
但是这种调度方式是协程主动的,是基于协作的,但是他无法面对一些场景,比如在死循环中没有任何调用发生,那么这个协程将永远执行下去,永远不会发生调度,这显然是不可接受的。
于是,在v1.14
版本,Go终于引入了基于信号的抢占式调度,下面,我们将介绍一下这两种抢占调度。
1. 用户主动让出CPU:runtime.Gosched函数
在介绍两种抢占调度之前,我们首先介绍一下runtime.Gosched
函数:
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
根据说明,runtime.Gosched
函数会主动放弃当前处理器,并且允许其他协程执行,但是起并不会暂停自己,而只是让渡调度权,之后依赖调度器获得重新调度。
之后,会通过mcall
函数切换到g0
栈去执行gosched_m
函数:
// Gosched continuation on g0.
func gosched_m(gp *g) {
if trace.enabled {
traceGoSched()
}
goschedImpl(gp)
}
gosched_m
调用goschedImpl
函数,其会为协程gp
让渡出本M,并且将gp
放到全局队列中,等待调度。
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg() // 使当前m放弃gp,就是其参数 curg
lock(&sched.lock)
globrunqput(gp) // 并且把gp放到全局队列中,等待调度
unlock(&sched.lock)
schedule()
}
虽然runtime.Gosched
具有主动放弃CPU的能力,但是对用户的要求比较高,并非用户友好的。
2. 基于协作的抢占式调度
2.1 场景
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var once = sync.Once{}
func f() {
once.Do(func() {
fmt.Println("I am go routine 1!")
})
}
func main() {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
go func() {
for {
f()
}
}()
time.Sleep(10 * time.Millisecond)
fmt.Println("I am main goroutine!")
}
我们考虑如上代码,首先我们设置P的个数为1,然后起一个协程中进入死循环,循环调用一个函数,如果没有抢占调度,那么这个协程将一直占据P,也就是会一直占据CPU,代码就永远不可能执行到fmt.Println("I am main goroutine!")
这行。下面我们看看,协作式抢占是怎么避免以上问题的。
2.2 栈扩张与抢占标记
$ go tool compile -N -l main.go
$ go tool objdump main.o >> main.i
我们通过以上指令,得到2.1中代码的汇编代码,截取f
函数的汇编代码如下:
TEXT "".f(SB) gofile../home/chenyiguo/smb_share/go_routine_test/main.go
main.go:12 0x151a 493b6610 CMPQ 0x10(R14), SP
main.go:12 0x151e 762b JBE 0x154b
main.go:12 0x1520 4883ec18 SUBQ $0x18, SP
main.go:12 0x1524 48896c2410 MOVQ BP, 0x10(SP)
main.go:12 0x1529 488d6c2410 LEAQ 0x10(SP), BP
main.go:13 0x152e 488d0500000000 LEAQ 0(IP), AX [3:7]R_PCREL:"".once
main.go:13 0x1535 488d1d00000000 LEAQ 0(IP), BX [3:7]R_PCREL:"".f.func1·f
main.go:13 0x153c e800000000 CALL 0x1541 [1:5]R_CALL:sync.(*Once).Do
main.go:16 0x1541 488b6c2410 MOVQ 0x10(SP), BP
main.go:16 0x1546 4883c418 ADDQ $0x18, SP
main.go:16 0x154a c3 RET
main.go:12 0x154b e800000000 CALL 0x1550 [1:5]R_CALL:runtime.morestack_noctxt
main.go:12 0x1550 ebc8 JMP "".f(SB)
其中第一行,CMPQ 0x10(R14), SP
就是比较SP
和0x10(R14)
(其实就是stackguard0
)的大小(注意AT&T
格式下CMP
系列指令的顺序),当SP
小于等于0x10(R14)
时,就会调转到0x154b
地址调用runtime.morestack_noctxt
,触发栈扩张操作。其实如果你仔细观察就会发现,所有的函数的序言(函数调用的最前方)都被插入了检测指令,除非在函数上标记//go:nosplit
。
接下来,我们将关注于两点来打通整个链路,即:
- 栈扩张怎么重新调度,让出CPU的执行权?
- 何时会设置栈扩张标记?
2.3 栈扩张怎么触发重新调度
// morestack but not preserving ctxt.
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
...
// Set g->sched to context in f.
MOVQ 0(SP), AX // f's PC
MOVQ AX, (g_sched+gobuf_pc)(SI)
LEAQ 8(SP), AX // f's SP
MOVQ AX, (g_sched+gobuf_sp)(SI)
MOVQ BP, (g_sched+gobuf_bp)(SI)
MOVQ DX, (g_sched+gobuf_ctxt)(SI)
...
CALL runtime·newstack(SB)
CALL runtime·abort(SB) // crash if newstack returns
RET
以上代码中,runtime·morestack_noctxt
调用runtime·morestack
,在runtime·morestack
中,会首先记录协程的PC和SP,然后调用runtime.newstack
:
func newstack() {
...
gp := thisg.m.curg
...
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
...
preempt := stackguard0 == stackPreempt
...
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
if gp.preemptShrink {
// We're at a synchronous safe point now, so
// do the pending stack shrink.
gp.preemptShrink = false
shrinkstack(gp)
}
if gp.preemptStop {
preemptPark(gp) // never returns
}
// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return
}
...
}
我们简化runtime.newstack
函数,总结起来就是通过现有工作协程的stackguard0
字段,来判断是不是应该发生抢占,如果需要的话,则调用gopreempt_m(gp)
函数:
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
可以看到,gopreempt_m
函数和前面讲到Gosched
函数时说到的gosched_m
函数一样,都将调用goschedImpl
函数,为协程gp
让渡出本M,并且将gp
放到全局队列中,等待调度。
这里我们就明白了,一旦发生栈扩张,就有可能会发生让渡出执行权,进行重新调度的可能性,那什么时候会发生栈扩张呢?
2.4 何时设置栈扩张标记
在代码中,将stackguard0
字段置为stackPreempt
的地方有不少,但是和我们以上场景相符的还是在后台监护线程sysmon
循环中,对于陷入系统调用和长时间运行的goroutine
的运行权进行夺取的retake
函数:
func sysmon() {
...
for {
...
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
...
}
}
func retake(now int64) uint32 {
...
for i := 0; i < len(allp); i++ {
...
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // forcePreemptNS=10ms
preemptone(_p_) // 在这里设置栈扩张标记
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
...
}
unlock(&allpLock)
return uint32(n)
}
其中,在preemptone
函数中进行栈扩张标记的设置:
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the nORMal stack overflow check.
gp.stackguard0 = stackPreempt // 设置栈扩张标记
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
通过以上,我们串通起了goroutine
协作式抢占的逻辑:
- 首先,后台监控线程会对运行时间过长(
≥10ms
)的协程设置栈扩张标记; - 协程运行到任何一个函数的序言的时候,都会首先检查栈扩张标记;
- 如果需要进行栈扩张,在进行栈扩张的时候,会夺取这个协程的运行权,从而实现抢占式调度。
3. 基于信号的抢占式调度
分析以上结论我们可以知道,上述抢占触发逻辑有一个致命的缺点,那就是必须要运行到函数栈的序言部分,而这根本无法读取以下协程的运行权,在Go的1.14版本之前,一下代码不会打印最后一句"I am main goroutine!"
:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var once = sync.Once{}
func main() {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
go func() {
for {
once.Do(func() {
fmt.Println("I am go routine 1!")
})
}
}()
time.Sleep(10 * time.Millisecond)
fmt.Println("I am main goroutine!")
}
因为以上协程中的for
循环是个死循环,且并不会包含栈扩张逻辑,所以不会让渡出自身的执行权。
3.1 发送抢占信号
为此,Go SDK
引入了基于信号的抢占式调度。我们注意分析上一节preemptone
函数代码中有以下部分:
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
其中preemptM
函数会发送_SIGURG
信号给需要抢占的线程:
const sigPreempt = _SIGURG
func preemptM(mp *m) {
// On Darwin, don't try to preempt threads during exec.
// Issue #41702.
if GOOS == "darwin" || GOOS == "iOS" {
execLock.rlock()
}
if atomic.Cas(&mp.signalPending, 0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, 1)
}
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt)
}
if GOOS == "darwin" || GOOS == "ios" {
execLock.runlock()
}
}
3.2 抢占调用的注入
说到这里,我们就需要回到最开始,在第一个协程m0
开启mstart
的调用链路上,会调用mstartm0
函数,在这里会调用initsig
:
func initsig(preinit bool) {
...
for i := uint32(0); i < _NSIG; i++ {
...
handlingSig[i] = 1
setsig(i, abi.FuncPCABIInternal(sighandler))
}
}
在以上,注册了sighandler
函数:
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
...
}
然后接收到sigPreempt
信号时,会通过doSigPreempt
函数处理如下:
func doSigPreempt(gp *g, ctxt *siGCtxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) // 插入抢占调用
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
最终,doSigPreempt—>asyncPreempt->asyncPreempt2
:
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
然后,又回到了我们熟悉的gopreempt_m
函数,这里就不赘述了。
所以对于基于信号的抢占调度,总结如下:
- M1发送信号
_SIGURG
; - M2接收到信号,并通过信号处理函数进行处理;
- M2修改执行的上下文,并恢复到修改后的位置;
- 重新进入调度循环,进而调度其他
goroutine
。
4. 小结
总的来说,Go
的调度策略的发展,也是随着需求的丰富而逐步发展的,协作式调度能够保证具备函数调用的用户 Goroutine 正常停止; 抢占式调度则能避免由于死循环导致的任意时间的垃圾回收延迟。
到此这篇关于Go调度器学习之协作与抢占详解的文章就介绍到这了,更多相关Go调度器内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
相关文章