每一个线程 M 都有一个调度协程 g0, g0协程的主函数是 runtime.schedule,该函数实现了协程调度功能。那么,Go 语言是如何管理以及调度成千上万个协程呢?是否和操作系统一样,维护着可运行队列和阻塞队列?有没有所谓的按照时间片调度?或者是优先级调度?又或者是抢占式调度?

1. 调度器实现原理

        函数 runtime.schedule 实现了协程调度功能,怎么调度协程呢?第一步当然是获取到一个可运行协程 G; 第二步就是切换到协程 G 的上下文(包括切换协程栈,指令跳转)。

        思考一下,Go 语言调度器都有哪些途径去获取可运行协程 G 呢?首先每一个逻辑处理器 P 都有一个可运行协程队列,调度器一般情况下只需要从当前逻辑处理器 P 负载分配不均衡,还有一个全局可运行协程队列,一定条件下也会从全局可运行协程队列获取协程。当然,如果逻辑处理器 P 的本地可运行协程队列为空,全局可运行协程队列也为空的话,调度器还会尝试其他方法获取协程,比如从其他逻辑处理器 P 的本可运行协程队列去 “偷”。

        在讲解调度器的实现原理之前,我们再思考一下:无论是逻辑处理 P 的本地可运行协程队列,还是全局可运行协程队列,存储的都是可运行状态的协程,那处于阻塞状态的协程呢?它们能被调度器调度吗?其实,可以换一个角度思考,阻塞状态的协程在什么条件下会解除阻塞呢?这就要看协程是因为什么原因阻塞的了。比如,因为抢锁阻塞的协程,只有其他协程释放了锁才有可能解除阻塞;因为管道的读/写阻塞的协程,只有其他协程读/写了该管道或关闭了该管道才有可能解除阻塞;因为网络 I/O 阻塞的协程,只有 Go 程序检测到网络可读可写了才能解除阻塞;休眠协程,只有当时间到达某一时刻才能解除阻塞,而这同样需要 Go 程序主动检测。可以看到,休眠的协程和因为网络 I/O 阻塞的协程,都需要 Go 程序主动检测,才有可能解除阻塞,而这些检测逻辑我们将会在 Go 语言调度程序中看到。

        Go 语言调度器基本就是通过上述这些手段获取可运行协程 G 的,我们可以简单看一下函数 runtime.schedule 的实现逻辑,代码如下所示:

func schedule(){
	//检测是否有定时任务到达触发时间
	checkTimers(pp,0)
	
	//schedtick 调度计数器,每执行一次调度程序加 1
	//每执行 61 次调度程序,会优先从全局可运行协程队列获取协程
	if gp == nil {
		if _g_.m.p.prt().scheditick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(),1)
			unlock(&sched.lock)
		}
	}
	
	//从逻辑处理器 P 的本地可运行协程队列获取协程
	if gp == nil {
		gp,inheritTime = runqget(_g_.m.p.prt())
	}
	
	//继续尝试其他方式获取协程
	if gp == nil {
		gp,inheritTime = findrunnable()
	}
	
	//调度执行
	execute(gp,inheritTime)
}

         一般情况下,Go 语言调度器优先从当前逻辑处理器 P 的可运行协程队列获取协程。另外,每执行 61 次调度程序,Go 语言调度器就会优先从全局可运行协程队列获取协程。如果经过这两个步骤之后还没有获取到协程,则 Go 语言调度器会通过函数 runtime.findrunnable 继续尝试其他方式获取协程,该函数实现了获取协程的多种方法。此外,在获取协程之前,函数 runtime.schedule 会先检测是否有定时任务到达触发时间,如果有,则执行该定时任务。

        函数 runtime.findrunnable 的逻辑实际上非常简单,但是代码量较多。在这里,摘抄了少量代码,如下所示:

func findrunnable()(gp *g,inheritTime bool) {
	top:
		.....
		//从全局可运行协程队列获取
		if sched.runqsize != 0 {
			lock(&sched.lock)
			gp := globrunqget(_p_,0)
			unlock(&sched.lock)
			if gp != nil {
				return gp,false
			}
			//检测网络 I/O
			if list := netpoll(0); !list.empty(){// non-blocking
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp,_Gwaiting,_Grunnable)
				return gp,false
			} 
			//从其他逻辑处理器 P 的本地可运行协程队列去 “偷”
			gp,inheritTime,tnow,w, newWork := stealWork(now)
			if gp != nil {
				return gp,inheritTime
			} 
			//暂停线程的运行
			stopm()
			//重新开始新一轮的协程获取
			goto top
		}
}

        参考上面的代码,从全局可运行协程队列获取协程 G 时,需要加锁。另外,为什么需要检测网络 I/O呢? 因为可能有协程由于网络 I/O 阻塞,而此时网络其实已经可读或者可写了,需要解除这些协程的阻塞状态。函数 runtime.netpoll 返回的是协程列表,这里会修改协程为可运行状态 ,同时返回第一个协程,而其余协程将会被添加到全局可运行协程队列或逻辑处理器 P 的本地可运行队列。如果在这些操作之后,还没有获取到可运行协程,则尝试从其他逻辑处理器 P 的本地可运行协程队列去 “偷” 。最后,如果还是没有获取到可运行协程,说明整个 Go 程序当前都没有协程需要调度执行,这里选择暂停当前线程 M 的运行,当有协程需要调度执行时,再恢复线程 M 的执行。

        参考函数 runtime.schedule 的实现逻辑, Go 语言调度器在获取到协程之后,通过函数 runtime.execute 切换到协程 G 的上下文,以此实现协程的调度执行。该函数比较简单,代码如下所示:

func execute(gp *g,inheritTime bool){
	//设置协程状态为运行中
	casgstatus(gp,_Grunnable,_Grunning)
	//更新调度计数器
	if !inheritTime {
		_g_.m.p.prt().schedtick++
	}
	//切换协程上下文
	gogo(&gp.sched)
}

        参考上面的代码,函数 runtime.schedule 首先更新协程状态为运行中,同时更新调度计数器。最后,通过函数 runtime.gogo 切换协程上下文,该函数是通过汇编代码实现的。 

        另外,函数 runtime.execute 的第二个输入参数 inheritTime 是什么含义呢?它表示这次协程的执行是否继承上一个协程的时间片。假设时间片为 10ms,上一个协程已经执行了 5ms, 如果继承,则表明这一个协程最多只能执行 5ms,时间片就会结束,从而再次调度其他协程。 

2. 时间片调度

         Go 语言调度器有时间片调度的概念吗?回答这个问题之前,我们可以写一个简单的 Go 程序验证一下,代码如下:

package main

import (
	"fmt"
	"runtime"
)

func main() {
	//设置逻辑处理器 P 的数量为 1
	runtime.GOMAXPROCS(1)
	go func() {
		fmt.Println("hello world")
		for {
			//死循环
		}
	}()
	//main协程主动让出
	runtime.Gosched()
	fmt.Println("main end")
}

        基于 go 关键字创建协程时,只是将协程添加到逻辑处理器 P 的可运行协程队列中,并没有立即调度执行该协程。因此,为了避免主协程执行结束导致 Go 程序退出,我们使用函数 runtime.Gosched 来主动让出 CPU,这样调度器就能优先调度执行其他协程(也就是通过 go 关键字创建的协程)。

        另外,我们通过函数 runtime.GOMAXPROCS 将逻辑处理器 P 的数量设置为 1,即最多只能有一个线程 M 绑定到逻辑处理器 P 上,也就是最多只能有一个调度器运行。这样的话,如果 Go 语言没有时间片调度的概念,那么一旦子协程执行到循环语句,就会陷入死循环,导致调度器再也没有机会调度其他协程,最终的结果就是主协程的打印语句无法执行。

        执行结果是怎样的呢?如果你在 Go 1.14 及以上版本的环境中运行该程序,你会发现正常输出了 main end; 但是如果你在 Go 1.13 版本中运行该程序,你将发现程序一直执行,没有输出 main end。你可以下载多个版本的 Go 试一试,看看结果是否如此。通过上面的程序是否说明 Go 1.14 及以上版本有时间片调度的概念,而 Go 1.13 版本则没有时间片调度的概念呢?实际上并非如此,你可以再试试执行下面这个程序:

package main

import (
	"fmt"
	"runtime"
)

func main() {
	//设置逻辑处理器 P 的数目为 1
	runtime.GOMAXPROCS(1)
	go func() {
		fmt.Println("hello world")
		var arr []int
		for i := 0; i < 100; i++ {
			arr = append(arr, i)
		}
		for {
			test(arr)
		}
	}()
	runtime.Gosched()
	fmt.Println("main end")
}

func test(arr []int) []int {
	diff := make([]int, len(arr), len(arr))
	diff[0] = arr[0]
	for i := 1; i < len(arr); i++ {
		diff[i] = arr[i] - arr[i-1]
	}
	return diff
}

        首先声明一下,函数 test 没有任何意义,只是为了避免 Go 语言的优化(比如内联优化)。注意这一次不再是简单的死循环语句,而是在死循环里调用了函数 test。你可以在 Go 1.13 版本执行上述程序试试,你会发现,主协程又输出了 main end。两个程序同样都陷入了死循环,为什么执行结果不一样呢?另外,为什么 Go 语言不同版本执行结果也不一样呢?不着急,这些问题我们一个个解决。

        在回答上面两个问题之前,我们先确定一定,Go 语言确实有时间片调度的概念。说起时间片调度,就不得不提到一个辅助线程,这个线程的入口函数是 runtime.sysmon,就是这个函数检测是否有协程执行时间过长(执行时间超过 10ms),如果有,则 "通知“ 该协程让出 CPU。这里摘抄了函数 runtime.sysmon 的部分代码,如下所示:

// 创建新线程,入口函数是 sysmon
newm(sysmon,nil)
func sysmon(){
	for {
		delay = 10 * 1000 //最大休眠时间 10ms
		usleep(delay)
		//该函数用于抢占长时间运行的协程 G
		retake(nanotime())
	}
}

        参考上面的代码,函数 runtime.sysmon 的主体也是一个死循环,每执行一次休眠 10ms,相当于每 10ms 执行一次函数 runtime.retake,就是该函数实现的检测功能,以及 ”通知“ 功能。如何检测协程执行时间过长呢?其实每一个逻辑处理器 P 都维护了一个调度计数器,以及最近一次调度协程 G 的时间点,通过这两个数据就可以确定逻辑处理器 P 是否长时间没有调度新的协程。如果逻辑处理器 P 长时间没有调度新的协程,是不是就相当于某个协程执行时间过长了?这样一来,检测逻辑一下简单了很多,代码如下所示:

func retake(now int64) uint32 {
	// 遍历所有的逻辑处理器 P 
	for i:=0;i<len(allp);i++{
		if s == _Prunning {
			t := int64(_p_.schedtick)
			//不等于,说明在这 10ms 期间重新调度协程了
			for int64(pd.schedtick) !=t {
				pd.schedtick= uint32(t)
				pd.schedwhen = now
				continue
			}
			//长时间没有调度新的协程
			if pd.schedwhen+forcePreemptNS > now {
				continue
			}
			preemptone(_p_)
		}
	}
}

        参考上面的代码,逻辑处理器 P 有两个变量,变量 schedtick 就是我们说的调度计数器,每次调度协程都会加 1;变量 schedwhen 记录调度时间,当然这个时间其实不是真正的调度时间,可以理解为检测时间。变量 forcePreemptNS 的值相当于 10ms。检测试的整体逻辑是,如果距离最近一次调度超过了 10ms,并且调度计数器没有发生变化(没有重新调度),则 “通知” 该协程让出 CPU。函数 runtime.preemptone 实现了该通知功能。

        原来是辅助线程帮助我们实现的时间片调度,不过好像还是没有解决问题,两个程序同样陷入了死循环,为什么执行结果不一样?为什么 Go 语言不同版本执行结果也不一样?其实,这是由函数 runtime.preemptone 的通知方式决定的,而且不同版本的 Go 语言实现的通知方式也不一样。

        最后,preempt 的含义有 “抢占”,可以这么理解:一个协程正在运行中,本身没有阻塞和休眠,却因为执行时间过长被迫放弃 CPU,即被抢点了 CPU,所以这一调度方式也可以称为抢占式调度。

3. 基于协作的抢占式调度

      下面回答第一个问题:在 Go 1.13 版本下,两处程序同样都陷入了死循环,为什么执行结果不一样?

        上一节中的两个程序唯一不同的是:第一个程序的死循环执行是空语句,第二个程序的死循环执行的是函数调用语句。函数调用比较特殊吗?是的,函数调用就是不同于普通的语句。Go 语言在编译用户自定义的函数时,会注入一些你不知道的代码。

        其实函数 runtime.morestack_noctxt 不仅会判断协程栈是否需要扩容,还会判断当前协程是否应该让出 CPU。我们可以将第二个程序编译为汇编代码,看看函数前后是不是被注入了一些指令,如下所示:

"".test STEXT
    0x0000 00000 (test.go:26)    CMPQ    SP, 16(R14)
    0x0004 00004 (test.go:26)    JLS     404
    //省略了业务代码编译后的汇编指令

    0x0194 00404 (test.go:28)    NOP    
    0x01a3 00419 (test.go:26)    CALL    runtime.morestack_noctxt(SB)
    0x01b7 00439 (test.go:26)    JMP    0

        函数 morestack_noctxt 最终调用了函数 runtime.newstack,该函数检测如果发生了抢占,则让出 CPU,切换到调度程序,代码如下所示:

func newstack(){
	preempt := stackguard0 == stackPreempt
	if preempt {
		//执行抢占逻辑,该函数永远不会返回
		gopreempt_m(gp)
	}
}

        果然是这样,函数 runtime.preemptone 就是通过修改栈保护字段 stackguard0 实现的抢占逻辑。变量 stackPreempt 是一个常量,并且是一个非常大的整数,赋值之后,栈顶指针肯定小于栈何护字段 stackguard0。

        函数 morestack_noctxt 最终调用了函数 runtime.newstack,该函数检测如果发生了抢占,则让出 CPU,切换到调度程序,代码如下所示:

func newstack(){
	preempt := stackguard0 == stackPreempt
	if preempt {
		//执行抢占逻辑,该函数永远不会返回
		gopreempt_m(gp)
	}
}

        参考上面的代码,函数 runtime.newstack 如果检测到栈保护字段等于变量 stackPreempt 则触发抢占逻辑,也就是修怍协程状态为可运行并将其添加全局可运行协程队列,重新执行调度程序。

        现在我们知道了,Go 1.13 版本实现的抢占式调度是基于栈保护字段 stackguard0 实现的,Go 语言在编译用户自定义的函数时,会注入一些代码,检测栈保护字段 stackguard0。但是如果陷入死循环之后,没有任何函数调用(或者函数过于简单,被 Go 编译器内联优化,也就相当于没有函数调用),也就没有机会检测栈保护字段 stackguard0 了。这种抢占式调度是需要辅助线程与用户协程协作才能实现的,如果用户协程不满足条件,就无法完成抢占,所以这种调度方案也称为基于协作的抢占式调度。

4. 基于信号的抢占式调度

        本节回答第二个问题:为什么 Go 语言不同版本执行结果不一样?       

         Go 1.14 及以上版本其实是基于信号实现的抢占式调度。什么是信号?信号是 Linux 系统中进程间通信的一种方式。基于信号实现的抢占式调度非常简单,辅助线程发送信号,线程 M 捕获信号并抢占逻辑。在 Go 1.14 及上版本,函数 runtime.preemptone 的实现逻辑如下所示:

func preemptone(_p_ *p) bool {
	......
	//如果支持信号抢占,发送信号
	if preemptMSupported {
		preemptM(mp)
	}
	return true
}

func preemptM(mp *m){
	//发送信号
	pthread_kill(pthread(mp.procid),sigPreempt)
}

参考上面的代码,函数 pthread_kill 用于向指定线程发送信号。信号总共有 64 种,分为标准信号(不可靠信号)和实时信号(可靠信号),标准信号从 1 到 31,实时信号从 32 到 64,比如 SIGINT,SIGQUIT,SIGKILL 等都是标准信号。在 Linux 系统下,可以执行 kill -l 查看 64 种信号的定义。

[root@localhost ~]# kill -l
 1) SIGHUP	 2) SIGINT	 3) SIGQUIT	 4) SIGILL	 5) SIGTRAP
 6) SIGABRT	 7) SIGBUS	 8) SIGFPE	 9) SIGKILL	10) SIGUSR1
11) SIGSEGV	12) SIGUSR2	13) SIGPIPE	14) SIGALRM	15) SIGTERM
16) SIGSTKFLT	17) SIGCHLD	18) SIGCONT	19) SIGSTOP	20) SIGTSTP
21) SIGTTIN	22) SIGTTOU	23) SIGURG	24) SIGXCPU	25) SIGXFSZ
26) SIGVTALRM	27) SIGPROF	28) SIGWINCH	29) SIGIO	30) SIGPWR
31) SIGSYS	34) SIGRTMIN	35) SIGRTMIN+1	36) SIGRTMIN+2	37) SIGRTMIN+3
38) SIGRTMIN+4	39) SIGRTMIN+5	40) SIGRTMIN+6	41) SIGRTMIN+7	42) SIGRTMIN+8
43) SIGRTMIN+9	44) SIGRTMIN+10	45) SIGRTMIN+11	46) SIGRTMIN+12	47) SIGRTMIN+13
48) SIGRTMIN+14	49) SIGRTMIN+15	50) SIGRTMAX-14	51) SIGRTMAX-13	52) SIGRTMAX-12
53) SIGRTMAX-11	54) SIGRTMAX-10	55) SIGRTMAX-9	56) SIGRTMAX-8	57) SIGRTMAX-7
58) SIGRTMAX-6	59) SIGRTMAX-5	60) SIGRTMAX-4	61) SIGRTMAX-3	62) SIGRTMAX-2
63) SIGRTMAX-1	64) SIGRTMAX	

        需要说明的是,选择第几种信号也是有要求的,因为很多信号都有特殊含义,是不能随便使用的,Go 语言在实现抢占式调度时,选择的信号是 SIGURG。

        基于上文的描述此时有一个问题需要考虑:辅助线程发送了抢占信号,线程 M 如何捕获信号呢? Go 程序想要接收并处理某种信号,是需要设置信号处理器的,信号处理器包括我们想要接收的信号,以及信号处理函数。Go 语言设置的信号处理函数为 runtime.sighandler。该函数判读如果是抢占信号,则执行抢占逻辑,代码如下所示:

func sighandle(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	if sig == sigPreempt && debug.asyncpreemptoff == 0 {
		doSigPreempt(gp, c)
	}
}

参考上面的代码,函数 runtime.doSigPreempt 实现了抢占逻辑,其核心逻辑是暂停当前协程的运行并重新执行调度程序。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部