Go 中的同步队列

本文译自 Synchronization queues in Golang 版权@归原文所有.

问题

假设我们正在运作一家雇佣程序员和测试人员的 IT 公司. 为了让人们有机会认识对方并放松一下, 我们买了一张乒乓桌, 并制定了以下规则:

  • 正好两个人可以同时玩,
  • 下一对只能在前一个完成时才能开始他们的比赛, 所以不允许切换一个球员,
  • 测试人员只能与程序员一起工作, 反之亦然(不能两个测试人员或两个程序员一起). 如果程序员或测试人员想玩游戏, 那么需要分别等待测试人员或程序员建立有效的配对.
func main() {
for i := 0; i < 10; i++ {
go programmer()
}
for i := 0; i < 5; i++ {
go tester()
}
select {} // long day at work...
}
func programmer() {
for {
code()
pingPong()
}
}
func tester() {
for {
test()
pingPong()
}
}

我们将通过 time.Sleep 模拟测试, 编码, 以及打乒乓球.

func test() {
work()
}
func code() {
work()
}
func work() {
// Sleep up to 10 seconds.
time.Sleep(time.Duration(rand.Intn(10000)) * time.Millisecond)
}
func pingPong() {
// Sleep up to 2 seconds.
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
}
func programmer() {
for {
code()
fmt.Println("Programmer starts")
pingPong()
fmt.Println("Programmer ends")
}
}
func tester() {
for {
test()
fmt.Println("Tester starts")
pingPong()
fmt.Println("Tester ends")
}
}

这样的程序发出如下消息流:

> go run pingpong.go
Tester starts
Programmer starts
Programmer starts
Tester ends
Programmer ends
Programmer starts
Programmer ends
Programmer ends

但是按照规则打乒乓球, 我们的信息流可以只包含这样的 4 行长的序列(以任何顺序和重复的任意次数):

Tester starts
Programmer starts
Tester ends
Programmer ends
Tester starts
Programmer starts
Programmer ends
Tester ends
Programmer starts
Tester starts
Tester ends
Programmer ends
Programmer starts
Tester starts
Programmer ends
Tester ends

所以, 无论是测试人员还是程序员都要接近桌子. 之后合作伙伴加入(相应的程序员或测试人员). 在离开游戏时, 他们可以以任何顺序进行. 这就是我们有 4 个有效序列的原因.

以下是两种解决方案. 一个是基于互斥量的, 第二个是使用独立的 worker 协调整个过程, 确保所有事情都按照政策进行.

方案 #1

两种方案都使用在接近桌子之前排队的数据结构. 当至少有一个有效对(Dev + QA)时, 允许该对玩乒乓球.

func tester(q *queue.Queue) {
for {
test()
q.StartT()
fmt.Println("Tester starts")
pingPong()
fmt.Println("Tester ends")
q.EndT()
}
}
func programmer(q *queue.Queue) {
for {
code()
q.StartP()
fmt.Println("Programmer starts")
pingPong()
fmt.Println("Programmer ends")
q.EndP()
}
}
func main() {
q := queue.New()
for i := 0; i < 10; i++ {
go programmer(q)
}
for i := 0; i < 5; i++ {
go tester(q)
}
select {}
}

queue 包定义如下:

package queue
import "sync"
type Queue struct {
mut sync.Mutex
numP, numT int
queueP, queueT, doneP chan int
}
func New() *Queue {
q := Queue{
queueP: make(chan int),
queueT: make(chan int),
doneP: make(chan int),
}
return &q
}
func (q *Queue) StartT() {
q.mut.Lock()
if q.numP > 0 {
q.numP -= 1
q.queueP <- 1
} else {
q.numT += 1
q.mut.Unlock()
<-q.queueT
}
}
func (q *Queue) EndT() {
<-q.doneP
q.mut.Unlock()
}
func (q *Queue) StartP() {
q.mut.Lock()
if q.numT > 0 {
q.numT -= 1
q.queueT <- 1
} else {
q.numP += 1
q.mut.Unlock()
<-q.queueP
}
}
func (q *Queue) EndP() {
q.doneP <- 1
}

Queue 包含互斥量 mut 有两个目的:

  • 同步对共享计数器(numT 和 numP)的访问
  • 扮演游戏中的雇员阻止其他人加入乒乓桌的令牌

程序员和测试人员正在使用无缓冲 channels 来等待他们的乒乓伙伴.

<-q.queueP

或者

<-q.queueT

如果没有伙伴可用, 从这些 channels 读取将会阻塞 goroutine.

我们来分析一下由测试人员执行的 StartT :

func (q *Queue) StartT() {
q.mut.Lock()
if q.numP > 0 {
q.numP -= 1
q.queueP <- 1
} else {
q.numT += 1
q.mut.Unlock()
<-q.queueT
}
}

如果 numP 大于 0 (至少有一个程序员在等待游戏), 那么等待的程序员的数量减少一个, 等待的程序员中的一个将被允许加入该乒乓桌(q.queueP <-1). 有趣的是, 在这个过程中, 互斥量不会被释放, 所以它将作为一个令牌独占访问乒乓球桌.

如果没有等待的程序员, 则 numT (等待测试者的数量)增加并且 goroutine 在 <-q.queueT 上阻塞.

StartP 基本上是相同的, 但由程序员执行.

在游戏过程中, 互斥量将被锁定, 因此需要由程序员或测试人员释放. 只有当双方完成游戏时才释放互斥量, 则会使用屏障 doneP:

func (q *Queue) EndT() {
<-q.doneP
q.mut.Unlock()
}
func (q *Queue) EndP() {
q.doneP <- 1
}

如果程序员仍在玩并且测试人员已完成, 那么测试人员将阻塞在 <-q.doneP. 一旦程序员到达 q.doneP <- 1, 屏障将打开, 互斥量将被释放, 以允许这些员工重新开始工作.

如果测试者仍在游戏, 那么程序员将在 q.doneP <- 1 上阻塞. 当测试者完成时, 它从屏障 <-q.doneP 中读取, 这将解除对程序员的阻塞, 并且将释放互斥量以释放该乒乓桌.

这里有趣的是, 不管测试人员还是程序员都可能锁定互斥量, 测试人员总是会释放互斥量. 这也是该解决方案初看起来可能不那么明显的原因之一.

方案 #2

package queue
const (
msgPStart = iota
msgTStart
msgPEnd
msgTEnd
)
type Queue struct {
waitP, waitT int
playP, playT bool
queueP, queueT chan int
msg chan int
}
func New() *Queue {
q := Queue{
msg: make(chan int),
queueP: make(chan int),
queueT: make(chan int),
}
go func() {
for {
select {
case n := <-q.msg:
switch n {
case msgPStart:
q.waitP++
case msgPEnd:
q.playP = false
case msgTStart:
q.waitT++
case msgTEnd:
q.playT = false
}
if q.waitP > 0 && q.waitT > 0 && !q.playP && !q.playT {
q.playP = true
q.playT = true
q.waitT--
q.waitP--
q.queueP <- 1
q.queueT <- 1
}
}
}
}()
return &q
}
func (q *Queue) StartT() {
q.msg <- msgTStart
<-q.queueT
}
func (q *Queue) EndT() {
q.msg <- msgTEnd
}
func (q *Queue) StartP() {
q.msg <- msgPStart
<-q.queueP
}
func (q *Queue) EndP() {
q.msg <- msgPEnd
}

我们有一个中央协调员在独立的 goroutine 内部运行, 它协调整个过程. 调度器获取想要放松的新员工的信息, 或者是否有人通过 msg channel 打乒乓球. 接收调度器的任何消息状态时都会更新:

  • 等待的 Devs 或 QAs 的数量会增加
  • 关于游戏员工的信息会更新

在接收到任何已定义的消息之后, 调度器将检查是否允许另一对开始游戏:

if q.waitP > 0 && q.waitT > 0 && !q.playP && !q.playT {

如果是这样, 则相应地更新状态, 并且一个测试人员和一个程序员被解锁.

我们现在不再使用互斥量(如解决方案 1 中的方法)来管理对共享数据的访问, 而是通过单独的 goroutine 与外部世界进行通信. 这将使我们编写更多的惯用的 Go 程序.

不要通过共享内存进行通信, 通过通信共享内存.

资源