Go语言中常见的并发模式( 二 )


func main() { done := make(chan int, 1) // 带缓存通道 go func(){ fmt.Println("你好, 世界") done <- 1 }() <-done}对于带缓存的通道,对通道的第K
【Go语言中常见的并发模式】个接收完成操作发生在第K+C
个发送操作完成之前,其中C
是通道的缓存大小 。虽然通道是带缓存的,但是main线程接收完成是在后台线程发送开始但还未完成的时刻,此时打印工作也是已经完成的 。
基于带缓存通道,我们可以很容易将打印线程扩展到N
个 。下面的例子是开启10个后台线程分别打印:
func main() { done := make(chan int, 10) // 带10个缓存 // 开N个后台打印线程 for i := 0; i < cap(done); i++ { go func(){ fmt.Println("你好, 世界") done <- 1 }() } // 等待N个后台线程完成 for i := 0; i < cap(done); i++ { <-done }}对于这种要等待N
个线程完成后再进行下一步的同步操作有一个简单的做法,就是使用sync.WaitGroup来等待一组事件:
func main() { var wg sync.WaitGroup // 开N个后台打印线程 for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println("你好, 世界") wg.Done() }() } // 等待N个后台线程完成 wg.Wait()}其中wg.Add(1)用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到) 。当后台线程完成打印工作之后,调用wg.Done()表示完成一个事件 。main()函数的wg.Wait()是等待全部的事件完成 。
1.6.2 生产者/消费者模型并发编程中最常见的例子就是生产者/消费者模型,该模型主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度 。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据 。这样就让生产和消费变成了异步的两个过程 。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品积压导致CPU被剥夺的问题 。
Go语言实现生产者和消费者并发很简单:
// 生产者:生成factor整数倍的序列func Producer(factor int, out chan<- int) { for i := 0; ; i++ { out <- i*factor }}// 消费者func Consumer(in <-chan int) { for v := range in { fmt.Println(v) }}func main() { ch := make(chan int, 64) // 成果队列 go Producer(3, ch) // 生成3的倍数的序列 go Producer(5, ch) // 生成5的倍数的序列 go Consumer(ch) // 消费生成的队列 // 运行一定时间后退出 time.Sleep(5 * time.Second)}我们开启了两个Producer生产流水线,分别用于生成3和5的倍数的序列 。然后开启一个Consumer消费者线程,打印获取的结果 。我们通过在main()函数休眠一定的时间来让生产者和消费者工作一定时间 。正如1.6.1节中说的,这种靠休眠方式是无法保证稳定的输出结果的 。
我们可以让main()函数保存阻塞状态不退出,只有当用户输入Ctrl+C时才真正退出程序:
func main() { ch := make(chan int, 64) // 成果队列 go Producer(3, ch) // 生成3的倍数的序列 go Producer(5, ch) // 生成5的倍数的序列 go Consumer(ch) // 消费生成的队列 // Ctrl+C 退出 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Printf("quit (%v)n", <-sig)}这个例子中有两个生产者,并且两个生产者之间无同步事件可参考,它们是并发的 。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作 。
1.6.3 发布/订阅模型发布/订阅(publish-subscribe)模型通常被简写为pub/sub模型 。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则成为订阅者(subscriber),生产者和消费者是M
: N
的关系 。在传统生产者/消费者模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题 。
为此,我们构建了一个名为pubsub的发布/订阅模型支持包:
// Package pubsub implements a simple multi-topic pub-sub library.package pubsubimport ( "sync" "time")type ( subscriber chan interface{} // 订阅者为一个通道 topicFunc func(v interface{}) bool // 主题为一个过滤器)// 发布者对象type Publisher struct { m sync.RWMutex // 读写锁 buffer int // 订阅队列的缓存大小 timeout time.Duration // 发布超时时间 subscribers map[subscriber]topicFunc // 订阅者信息}// 构建一个发布者对象,可以设置发布超时时间和缓存队列的长度func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), }}// 添加一个新的订阅者,订阅全部主题func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil)}// 添加一个新的订阅者,订阅过滤器筛选后的主题func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch}// 退出订阅func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() delete(p.subscribers, sub) close(sub)}// 发布一个主题func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() var wg sync.WaitGroup for sub, topic := range p.subscribers { wg.Add(1) go p.sendTopic(sub, topic, v, &wg) } wg.Wait()}// 关闭发布者对象,同时关闭所有的订阅者通道func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) }}// 发送主题,可以容忍一定的超时func (p *Publisher) sendTopic( sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,) { defer wg.Done() if topic != nil && !topic(v) { return } select { case sub <- v: case <-time.After(p.timeout): }}


推荐阅读