Go语言中常见的并发模式( 四 )


通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能 。
1.6.6 素数筛在1.2节中,为了演示Newsqueak的并发特性,给出了并发版本素数筛的实现 。并发版本的素数筛是一个经典的并发例子,通过它可以更深刻地理解Go语言的并发特性 。“素数筛”的原理如图1-5所示 。
我们需要先生成最初的2, 3, 4,…自然数序列(不包含开头的0、1):
// 返回生成自然数序列的通道: 2, 3, 4, ...func GenerateNatural() chan int { ch := make(chan int) go func() { for i := 2; ; i++ { ch <- i } }() return ch}GenerateNatural()函数内部启动一个Goroutine生产序列,返回对应的通道 。
然后为每个素数构造一个筛子:将输入序列中是素数倍数的数提出,并返回新的序列,是一个新的通道 。
// 通道过滤器: 删除能被素数整除的数func PrimeFilter(in <-chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { out <- i } } }() return out}PrimeFilter()函数也是内部启动一个Goroutine生产序列,返回过滤后序列对应的通道 。
现在可以在main()函数中驱动这个并发的素数筛了:
func main() { ch := GenerateNatural() // 自然数序列: 2, 3, 4, ... for i := 0; i < 100; i++ { prime := <-ch // 新出现的素数 fmt.Printf("%v: %vn", i+1, prime) ch = PrimeFilter(ch, prime) // 基于新素数构造的过滤器 }}先是调用GenerateNatural()生成最原始的从2开始的自然数序列 。然后开始一个100次迭代的循环,希望生成100个素数 。在每次循环迭代开始的时候,通道中的第一个数必定是素数,我们先读取并打印这个素数 。然后基于通道中剩余的数列,并以当前取出的素数为筛子过滤后面的素数 。不同的素数筛对应的通道是串联在一起的 。
素数筛展示了一种优雅的并发程序结构 。但是因为每个并发体处理的任务粒度太细微,程序整体的性能并不理想 。对于细粒度的并发程序,CSP模型中固有的消息传递的代价太高了(多线程并发模型同样要面临线程启动的代价) 。
1.6.7 并发的安全退出有时候需要通知Goroutine停止它正在干的事情,特别是当它工作在错误的方向上的时候 。Go语言并没有提供一个直接终止Goroutine的方法,因为这样会导致Goroutine之间的共享变量处在未定义的状态上 。但是如果想要退出两个或者任意多个Goroutine怎么办呢?
Go语言中不同Goroutine之间主要依靠通道进行通信和同步 。要同时处理多个通道的发送或接收操作,需要使用select关键字(这个关键字和网络编程中的select()函数的行为类似) 。当select()有多个分支时,会随机选择一个可用的通道分支,如果没有可用的通道分支,则选择default分支,否则会一直保持阻塞状态 。
基于select()实现的通道的超时判断:
select {case v := <-in: fmt.Println(v)case <-time.After(time.Second): return // 超时}通过select的default分支实现非阻塞的通道发送或接收操作:
select {case v := <-in: fmt.Println(v)default: // 没有数据}通过select来阻止main()函数退出:
func main() { // 做一些处理 select{}}当有多个通道均可操作时,select会随机选择一个通道 。基于该特性我们可以用select实现一个生成随机数序列的程序:
func main() { ch := make(chan int) go func() { for { select { case ch <- 0: case ch <- 1: } } }() for v := range ch { fmt.Println(v) }}我们通过select和default分支可以很容易实现一个Goroutine的退出控制:
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } }}func main() { cannel := make(chan bool) go worker(cannel) time.Sleep(time.Second) cannel <- true}但是通道的发送操作和接收操作是一一对应的,如果要停止多个Goroutine,那么可能需要创建同样数量的通道,这个代价太大了 。其实我们可以通过close()关闭一个通道来实现广播的效果,所有从关闭通道接收的操作均会收到一个零值和一个可选的失败标志 。
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } }}func main() { cancel := make(chan bool) for i := 0; i < 10; i++ { go worker(cancel) } time.Sleep(time.Second) close(cancel)}我们通过close()来关闭cancel通道,向多个Goroutine广播退出的指令 。不过这个程序依然不够稳健:当每个Goroutine收到退出指令退出时一般会进行一定的清理工作,但是退出的清理工作并不能保证被完成,因为main线程并没有等待各个工作Goroutine退出工作完成的机制 。我们可以结合sync.WaitGroup来改进:


推荐阅读