Patterns and Hints for Concurrency in Go
Russ Cox将向我们介绍他认为4个非常常见的Go设计思想,并且这对我们以后的代码编写都非常的重要,这4条规则可以被认为是黄金准则。
State should be saved at code flow rather than data(variables)
我们编写并发程序的时候,程序的状态(state)应该存储在数据(data)中还是代码(code)中。一般来说将状态(state)存储到code中程序会更加的易读。

前面有一系列的代码转换,状态本来是完全的存储在data中的,最后经过一系列转换,存储到了code中,这样代码更易读,上面的一系列变量就是之前未优化的代码版本:将state存储到data中。
所以,我们后续设计Go程序时,应该将data state尽可能的转化为code state,以控制流的形式去写状态模块,这样会使代码更容易理解。
1.1 if some structs is fixed, we mustn’t modify them.
我们并不是任何时候对代码的状态结构体有很大的自主性,因此,有时我们只是想复用原有的代码(我们假设这些结构体代码十分的复杂且庞大),但是状态并没有如我们所期盼的那样保存到code flow里面,那么我们应该怎么做呢?


go有一个称为/debug/pprof/goroutine的包,可以通过这个包获取当goroutine dump的时候的堆栈,他会根据堆栈的长度进行排序,将最短的堆栈最先输出,后面越来越长,一般第一个最短的堆栈往往是造成bug的最直接原因,下面的那些堆栈对我们或许并没有多大用处。
上面这些内容是热身,下面我们将真正的进入正题。
1. Pattern #1:Publish/subscribe server
这里我们只考虑publish/subscribe两种行为的实现,对相关事件的过滤则是单独的组件,不在我们考虑的范畴内。
type PubSub interface {
// Publish
// publishes the event e to all current subscriptions.
Publish(e Event)
// Subscribe
// registers c to receive future events.
// All subscribers receive events in the same order,
// and that order respects program order:
// if Publish(e1) happens before Publish(e2),
// subscribers receive e1 before e2.
Subscribe(c chan<- Event)
// Cancel
// cancels the prior subscription of channel c.
// After any pending already-published events have been sent on c, the server will signal that the subscription is cancelled by closing c.
Cancel(c chan<- Event)
}
1.1 implementation 1(Mutex Version)
下面是一个较为完善的实现,由于Publish/Subscribe/Cancel这些函数是可以在Goroutines中调用的,因此对于共享状态需要调用锁来保护。
defer s.mu.Unlock():延迟函数,即在当前函数调用完毕后,自动释放锁,这样我们就不必自己去释放了。即使下面函数中有panic,它也可以在函数出现panic之后自动解锁,但是出现panic时,可能会造成已经lock的state出现不一致的情况,因此尽量减少Panic的使用。
type Server struct {
mu sync.Mutex
sub map[chan<- Event]bool // 订阅了该服务器服务的一些client信道
}
func (s *Server) Init() {
s.sub = make(map[chan<- Event]bool) // 初始化函数
}
func (s *Server) Publish(e Event) {
s.mu.Lock()
defer s.mu.Unlock()
// 每次该server有event发布的时候,就将这些event发给那些client的信道
for c := range s.sub {
c <- e
}
}
// 当某client订阅该server时,调用此函数
func (s *Server) Subscribe(c chan<- Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.sub[c] {
panic("pubsub: already subscribed")
}
s.sub[c] = true
}
// 当某client取消订阅该server时,调用此函数
func (s *Server) Cancel(c chan<- Event) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.sub[c] {
panic("pubsub: not subscribed")
}
close(c)
delete(s.sub, c)
}
此外,当我们使用Panic时,这隐含了Server信任client不会滥用上面的这些接口(Subscribe/Cancel),但一般来说我们不希望程序因为错误而中止(panic会导致程序中止),我们更希望函数返回一个错误之后继续执行。
Publish:不同clients接收events时存在一定的同步关系,因为是用for...range...依次发送的,所以如果前面有client的channel因为某种原因接收的很慢(如已经满了,但是client还没有从channel中取出event去处理),那么这也会导致它后面的clients的channel接收的很慢,因此我们必须考虑如何处理这种情况。
问题总结-Slow Goroutines:client是以goroutines的形式运行的,当某个client因为某种故障而太长时间没有去处理自己channel中的events,那么这会导致Server.Publish在往这个channel中放入信息时卡主,因为这个channel已经满了,放不下了,那么Publish函数会阻塞在这里,这会直接导致后面在等待的client也无法收到信息。那么我们如何优雅的处理这种问题呢?
1.2 Options for slow goroutines
Russ教授介绍了如下的三种思路:
Slow down event generation:降低event生成的速度,这是以前的解决方案隐含的做法。publish停止一段时间,直到subscribers跟上。
Drop/coalesce events:Publish不会等待该client,那么意味着这个client会丢失一些events,但是通过一些机制可以让client知道它错过了多少个events。e.g.:OS中的signal handler的channel就是采用的这种机制。
- 在OS的signal handler中:还有一种情况是一个signal event被多次发布到一个channel中,但是该channel的client只会收到一次提示,因为OS中处理该signal一次和多次的语义是幂等的,因此我们可以简单地将重复的events进行合并(coalesce)。
Queue an arbitrary number of events:client也许不想错过任何events,那么可以使用一个专门的队列,来对某个client错过的events进行排序,暂存这些错过的events,后续再提供给该client。
Warning:这意味着需要一个unbounded queue,它极难被管理,因此我们几乎不采用这种方法,这一般不是一个恰当的选择。
1.3 Implementation2(Convert mutex to goroutines)
将mutex转化为goroutines之后,这会使得程序更加的清晰易读。

将该系统中的Publish、Subscribe、Cancel三种行为各用一个channel来单独存放,并将他们的处理逻辑统一合并到Server.loop()函数中去,原来的mutex变量现在被隐含到loop的程序计数器中了。但是在这个goroutine的版本中,我们很难看出来什么是比较重要的state。
对于Raft实验而言,还是采用Mutex Version更好,因为我们需要实时的知道很多的state。
1.3.1 How to deal with slower subscriber?
为了应对进度较慢的subscribers,我们会添加一些helper goroutines,这些辅助函数的功能是管理特定的subscriber因为某些原因积压的一些events,从而防止整个系统因为某个落后的subscriber而被卡主。在编写Helper函数时,教授从一个最简单的版本开始,依次解决了input为null时读取错误的问题、channel的正确关闭的问题,最终的helper function版本如下所示:
func helper(in <-chan Event, out chan<- Event) {
var q []Event
//这里我认为应该是in != nil || len(q) >0
// in是用来接收publish的发来的events的信道,q是中介,subscriber通过q来从in拿events
for in != nil && len(q) > 0 {
// Decide whether and what to send.
// in是server处发布消息的队列,q是用来存储可能会积压的events的队列
var sendOut chan<- Event
var next Event
if len(q) > 0 {
sendOut = out
next = q[0]
}
select {
case e, ok := <-in:
if !ok {
in = nil // stop receiving from in
break
}
q = append(q, e)
case sendOut <- next:
q = q[1:]
}
}
close(out)
}
因此s.loop()函数也做出相应的改变:

因此,在Implementation2中,我们完全分离了server在publish events时、subscriber在从channel中取出events时,两个行为要维护的channel列表,publish将发布的event放到h的channel中,而subscriber在取出event时,需要将h信道中的内容先取出来放到临时信道q信道中(见helper goroutine)之后,才可以取出这个event。
如此一来,我们就将publish、subscribe行为解耦,处理event较慢的subscriber就不会影响到publish行为的速度。即我们用不同的goroutine来处理不同阶段的事情,将独立的关注点分别用一个goroutine处理(server的publish行为、subscriber的取出event的行为),这是一种很好的设计思路。
2. Pattern #2:Work Scheduler
核心思想是将任何单独的状态都变成一个单独的goroutine,而channel是实现同步的一个关键结构体。
实现一个集群工作调度器,这个类似于MapReduce中我们要实现的coordinator。我的本能反应是实现一个以下的版本,但是很不幸,这个程序有两个问题:

不难看出对于task变量存在DATA RACE问题:因为我们是以go routine的方式运行的call,所以循环中的closure一旦开始运行,我们就不会等到它运行完成,而是直接开始进行task++的操作,因此前一次调用的call可能用到的task变量就是错误的(即实际运行call时用到的task不是它应该用的task)。一定要学会读DATA RACE的报错信息。
WARNING部分代表哪个变量存在DATA RACE(本例中是task);Previous Write代表了是哪里修改了该变量,即可能存在DATA RACE的地方(本例中是task++);第三处代表发生在哪个closure中(本例中是go func() {} ())。
2.1 BUG1:how to fix this DATA RACE on task variable
在lecture2中,教授讲过如何解决这种DATA RACE:一种方法是将task作为函数参数传入进去(copy param),另一种方法是用一个临时变量先获取该值,之后在goroutine中调用外层函数中声明的该临时变量即可。
// solution1: as copy param
for task := 0; task < numTask; task++ {
go func(task int) {
srv := <-idle
call(srv, task)
idle <- srv
}(task)
}
// solution2: use tmp var
for task := 0; task < numTask; task++ {
task := task
go func() {
srv := <-idle
call(srv, task)
idle <- srv
}()
}
solution2看起来可能有一些抽象,为什么可以在for循环体内再次声明一个task变量,实际上Go语言是支持这种行为的:在for循环头部声明的task和在for循环体内部声明的task是两个不同的变量,它们在内存中占用不同的地址。这两个变量只是恰好有相同的名字,但它们是完全独立的。
如果你取消了for循环体内的task声明,并直接使用task,那么所有的goroutine将会共享同一个task变量(与for循环头中的task是一个内存地址)。这是因为在这种情况下,你没有为每个goroutine创建一个新的、独立的task变量。所以,当for循环继续进行并改变task的值时,所有goroutine看到的都将是同一个、最新的task值。
2.2 BUG2:how to reduce goroutine numbers?before exit Schedule, how to ensure all tasks are finished?
如何减少Goroutines的数量,因为在上面的程序中,我们只是一股脑的为所有的tasks创建一个goroutine,但是假如servers的数量很少,那么结果就是很多goroutines在等待servers变得空闲,创建空闲的goroutines也会消耗资源,所以为了尽量减少idle goroutine的数量,我们可以对程序做出如下调整:
for task := 0; task < numTask; task++ {
task := task
// 修改为,当有服务器空闲时,才会去创建新的goroutine
// 即先提前获取空闲的服务器
srv := <-idle
go func() {
call(srv, task)
idle <- srv
}()
}
此外,我们还需要确保当schedule程序退出时,所有的tasks已经都完成了。所以在程序的末尾,当idle列表中所有的服务器都处于空闲状态时,我们就认为所有的tasks已经完成了,初步的完整程序如下:
func Schedule(servers []string, numTask int, call func(srv string, task int)) {
idle := make(chan string, len(servers))
for _, srv := range servers {
idle <- srv
}
for task := 0; task < numTask; task++ {
task := task
// 在有空闲服务器的前提下才创建goroutine
srv := <-idle
go func() {
call(srv, task)
idle <- srv
}()
}
// 当所有的servers都变得空闲时,这意味着所有的tasks都已经完成了
for i := 0; i < len(servers); i++ {
<-idle
}
}
2.3 problems: server num changes, the forever block due to tasks » work channel size
考虑到大部分情况下servers的数量都会远小于tasks的数量,因此我们可以为每个server创建一个goroutine,当server处理完当前任务后可以马上处理下一个任务,这样来的更高效,而不是为每个task创建一个goroutine。此外,为了能够处理随时有server可能进来的情况,我们同样可以用一个goroutine来管理server的goroutine的创建:

右侧代码只是将servers的goroutines的创建也变成一个单独的goroutine,确保了当有新的server进来时,可以让这个server及时的忙碌起来。但是,在这种情况下,原来使用servers数量来判断任务完成的方式就不再适用,所以我们需要换一种判断任务完成的方式,这就是我们创建一个done信道的作用,如果我们已经提前知道tasks的数量,那么当done里面的完成信号等同于tasks数量时,就意味着所有的任务都完成了,但是相应的runTasks任务也要进行修改:

将done->true从每个服务器完成后添加一次变为每个任务完成后添加一次,但是这种情况下会有阻塞的情况出现,当tasks的数量特别大的时候(比work和done两个channel的大小都大的多得多),也可能会出现无法解决的阻塞(永远卡主的阻塞)。

因为work<-task这个操作是在main中运行的,除非tasks放完,否则下面<-done的操作是不会执行的,当tasks数量非常大的时候,比work和done的channel都大,此时done已经满了,但是由于tasks的put操作还未完成,因此<-done操作无法执行,因此此时runTasks也会卡在done<-True这里(因为done已经满了),之后tasks越积越多,那么work<-task也会卡主,那么就造成了“死锁”(并非真正意义的死锁,主要强调程序被永远卡住而无法取得进展)。
为了解决这个问题,我们用一个select,将done<-和work-<task操作放到一起去执行,这样就不会出现在tasks的数量过大的情况下<-done操作无法取得进展的问题,即使出现也只是短时间的阻塞,而不是永远无法取得进展。

当然更干净的方式,更简单地方式是将work<-task这个工作放到一个单独的goroutine中去,这样确保了main总是能运行到<-done操作这里。

当然,如果能确保work信道的size足够大,那么也可以解决永久阻塞的问题,但是一个tasks往往是KB级别,而channel是以Byte为单位分配的,所以分配足够大的信道的方法并不优雅,也难于管理。
2.4 how to deal with failed tasks?
当任务失败时,我们采用重新执行的方式来处理,这应该有由server本身来负责,当server执行失败时,会把这个任务重新放到work信道中,等待下一次执行。所以这延长了work信道的使用时间,因此close(work)的操作不能在task发送完后就执行,它必须等到所有的任务都成功执行完后才能执行,代码调整如下:

此外,当程序要结束时,我们一般不会选择直接杀死某个goroutine,因为它可能会与别的程序和goroutines产生交互,比如说它正在持有一把锁,如果我们杀死了它,那么别的goroutines可能会因此阻塞或崩溃,所以我们一般都会等到他们自然结束,即以一种更优雅的方式,用同步方式让他们自然地退出goroutines。
比如说在这里,当所有任务完成后,我们关闭work信道,之后所有的runTask-goroutines都检测到了work信道已经关闭,那么他们接下来就会自然地退出当前的goroutine,当然在退出前还要清理当前goroutine使用的内存。
2.5 don’t forget to close the servers’ goroutines
最后退出后,我们还有servers的goroutine没有关闭,因此需要对代码做出改动以回收这部分资源。同样是需要一个channel来负责提醒servers的goroutine可以退出。

3. Pattern #3:Replicated service client(Timer Type Contents)
我们要实现的接口如下所示:
type ReplicatedClient interface {
// Init
// initializes the client to use the given servers. To make a particular request later,
// the client can use callOne(srv, args), where srv is one of the servers from the list.
// callOne是选择一个特定的服务器来使用
Init(servers []string, callOne func(string, Args) Reply)
// Call
// makes a request an available server. Multiple goroutines may call Call concurrently.
// 只是找一个合适的server来使用,尽量使用最初的那个server,如果发现上一次的server不好用了,那么才会使用下一个,因此唯一要维护的状态就是上一次使用的server
Call(args Args) Reply
}
当我们要共享的状态很少时(这里只需要维护上一次使用的server是什么),维护的工作量很小,因此使用mutex总是没问题的。总之,在不影响代码的简洁性和可读性的前提下,可以用mutex来维护少量的共享状态。
首先厘清需求,在Call中需要实现的是找到一个合适的server,优先选择上一次使用的server,否则就依次给所有的server发送请求,当然只处理收到的第一个回复,第一个回复的服务器就是我们要使用的服务器;此外,我们不能一直等一个服务器回复,所以要设置一个超时时间,当过了这个时间以后,我们就先向下一个服务器发送请求;综合以上需求,最终的代码如下:
func (c *Client) Call(args Args) Reply {
type result struct {
serverID int
reply Reply
}
const timeout = 1 * time.Second
t := time.NewTimer(timeout)
// 一定要调用t.Stop来释放timer对象的资源,因为timer对象即使到期后也不会被自动回收
// 也要主动释放t.C中的内容,否则即使调用了stop,t.c中的超时时间也会永远存在
defer t.Stop()
// 只需要一个server就可以了,可是这里将done的通道设置为了len(servers),因为如果设置为1,并且该client向其他的server也发送了请求,那么除了第一个回复的server可以成功写入,剩下的server都会阻塞住,为了不让他们阻塞,可以及时的处理别的任务,因此给done足够的大小
done := make(chan result, len(c.servers))
for id := 0; id < len(c.servers); id++ {
id := id
go func() {
done <- result{id, c.callOne(c.servers[id], args)}
}()
select {
case r := <-done:
return r.reply
case <-t.C:
// timeout
t.Reset(timeout)
}
}
r := <-done
return r.reply
}
3.1 time.Timer的回收问题
在Go语言中,time.Timer对象在超时后并不会被自动释放。定时器的C字段是一个channel,当定时器超时后,当前时间会被发送到这个channel。然而,这个channel并不会被自动关闭或者清理,除非你显式地调用Stop方法。另外,即使定时器已经超时,它的内部goroutine也不会立即结束。这个goroutine会一直运行,直到定时器被显式地停止。
因此,如果你创建了一个定时器,并且不再需要它,那么你应该调用Stop方法来停止定时器并释放其相关资源。否则,即使定时器已经超时,它的内部goroutine和channel仍然会存在,可能会导致资源泄漏。
在Go语言中,调用Stop方法并不会清空或关闭定时器的C字段。Stop方法只会阻止定时器触发,但并不会影响C字段。如果定时器已经触发,并且C字段中的值还没有被读取,那么这个值会继续存在。
因此,即使你调用了t.Stop(),你仍然需要确保从t.C中读取了所有的值。否则,这些值会继续存在,可能会影响你的程序逻辑。所以,在Go语言中,调用t.Stop()并不会清空或关闭定时器的C字段。你需要自己确保从t.C中读取了所有的值。
总结来说,主动调用`defer t.Stop()`,可以确保timer的停止与程序退出是同时的;如果有超时的时间,那么也要确保能清空`t.C`中的超时时间,否则会造成垃圾内存的出现。
3.2 添加use previous server机制
因为我们要确保首先使用上一次的服务器的ID,而这个prefer状态是一个共享变量,因此要对程序做出如下修改:
// 只放关键代码,替换了从done :=...开始往下的内容
c.mu.Lock()
prefer := c.prefer
c.mu.Unlock()
var r result
// 确保了我们先向上一次使用的server发送请求
for off := 0; off < len(c.servers); off++ {
id := (prefer + off) % len(c.servers)
go func() {
done <- result{id, c.callOne(c.servers[id], args)}
}()
select {
case r = <-done:
goto Done
case <-t.C:
// timeout
t.Reset(timeout)
}
}
r = <-done
Done:
c.mu.Lock()
c.prefer = r.serverID
c.mu.Unlock()
return r.reply
3.3 总结
可以看到这里用了mutex,goto等可能使得代码可读性变差的特性,但是只要能确保这不会使得代码的可读性变差,并且要管理的共享状态很少,那么使用这些都是没有关系的。
4. Pattern #4 Protocol multiplexer(协议复用器)
协议多路复用器,通常被称为”多路复用器”或”MUX”,是一种设备或软件,可以将多个数据流路由或合并到一个或多个输出通道。这在电信和计算机网络中经常被用来提高效率并减少额外的硬件或软件的需求。是任何RPC
一个具体的协议多路复用器的例子是TCP端口服务多路复用器(TCPMUX),这是一种互联网协议,允许在一个单一、众所周知的端口号上联系主机上的多个可用TCP服务⁵。
另一个例子是Microsoft网络适配器多路复用协议,它在Windows中被用于网络接口卡(NIC)绑定。当两个或更多的网络适配器被组合成一个物理设备时,这个协议就会起作用,目标是增加网络带宽或容错能力。
总的来说,协议多路复用器是管理和优化网络系统中数据流的关键组件。它允许资源的有效使用,并可以提高网络的性能和可靠性。希望这个答案对你有所帮助!如果你有其他问题,请随时告诉我!
4.1 basic API
下面是我们要实现的接口和一些API:
type ProtocolMux interface {
// Init
// initializes the mux to manage messages to the given service.
Init(Service)
// Call
// makes a request with the given message and returns the reply.
// Multiple goroutines may call Call concurrently.
Call(Msg) Msg
}
type Service interface {
// ReadTag
// returns the muxing identifier in the request or reply message.
// Multiple goroutines may call ReadTag concurrently.
ReadTag(Msg) int64
// Send
// sends a request message to the remote service.
// Send must not be called concurrently with itself.
Send(Msg)
// Recv
// waits for and returns a reply message from the remote service.
// Recv must not be called concurrently with itself.
Recv() Msg
}
4.2 implementation1
根据上面的需求,我们简单给出一个实现,基本思想是,client可以通过Mux发送多次消息的请求,所有请求都会被放到send队列之中,但是我们只需要一个reply,当收到第一个reply后,就可以从pending队列中删除对应tag的reply的channel,之后再有同类型的reply到来,那么都是无效的,这里直接用panic来表示是unexpected reply:
type Mux struct {
srv Service
// request的发送队列
send chan Msg
mu sync.Mutex
// 从int->Msg Chan的映射,INT是我们在Msg中看到的id编号。channel是对应ID的reply队列,容量为1,因为它只要收到的第一个reply之后就会删除这个队列
pending map[int64]chan<- Msg
}
func (m *Mux) Init(srv Service) {
m.srv = srv
m.pending = make(map[int64]chan Msg)
go m.sendLoop()
go m.recvLoop()
}
func (m *Mux) sendLoop() {
for args := range m.send {
m.srv.Send(args)
}
}
func (m *Mux) recvLoop() {
for {
reply := m.srv.Recv()
tag := m.srv.Tag(reply)
m.mu.Lock()
done := m.pending[tag]
delete(m.pending, tag)
m.mu.Unlock()
if done == nil {
panic("unexpected reply")
}
done <- reply
}
}
func (m *Mux) Call(args Msg) (reply Msg) {
tag := m.srv.ReadTag(args)
done := make(chan Msg, 1)
m.mu.Lock()
if m.pending[tag] != nil {
m.mu.Unlock()
panic("mux: duplicate call tag")
}
m.pending[tag] = done
m.mu.Unlock()
m.send <- args
return <-done
}
5.Hints
基于上面的4个Pattern,总结了一些在我们进行Go语言编程时的一些建议:
下面是总结关于在Go语言编程中使用协程(goroutines)、通道(channels)和互斥锁(mutexes)的建议:
- 使用竞态检测器,适用于开发甚至生产环境(运行时加入-race的参数)。
启示集合1:
- 当它使程序更清晰时,将数据状态(data state)转化为代码状态(code state)。
- 当它使程序更清晰时,将互斥锁(mutex)转化为协程(routines)。
- 使用额外的协程来持有额外的代码状态。
- 使用协程让独立的关注点独立运行。
启示集合2:
- 考虑慢协程的影响。
- 知道每次通信为何以及何时进行。
- 知道每个协程为何以及何时退出。
- 键入 Ctrl-\ 来终止程序并转储(停止)所有协程的堆栈。
- 使用HTTP server的
/debug/pprof/goroutine来检查活动的协程堆栈。
启示集合3:
- 使用缓冲通道作为并发阻塞队列。
- 在引入无界队列之前要仔细考虑。
- 关闭通道以表明不再发送更多的值。
启示集合4:
- 停止你不需要的计时器,同时当计时器超时后,将t.C这个channel中的时间也排出去。
- 更倾向于使用defer来解锁互斥锁。
启示集合5:
- 如果这是编写代码的最清晰的方式,使用互斥锁。
- 如果这是编写代码的最清晰的方式,使用goto。
- 如果这是编写代码的最清晰的方式,将协程、通道和互斥锁一起使用。
back.