go 语言实现发布订阅模式

tendermint 中用到了订阅发布模式,这种模式大家都不会陌生,比如你打开你的微信订阅号,你订阅的作者发布的文章,会广播给每个订阅者 。在这个场景里,微信公众号就是一个Pulisher,而你就是一个Subscriber,你收到的文章就是一个Message 。
今天不过多讨论 tendermint 的业务流程,主要和大家聊一下如何使用 go 语言写一个订阅发布模式 。下面这个图是 tendermint 中发布订阅模式的草图:

go 语言实现发布订阅模式

文章插图
发布订阅模式
【go 语言实现发布订阅模式】 
接下来我再介绍几个重要的数据结构:
type state struct { // query string -> client -> subscription subscriptions map[string]map[string]*Subscription // query string -> queryPlusRefCount queries map[string]*queryPlusRefCount}state 是用来维护所有 subscriber,当需要 publish 时,遍历这个对象中的两个 map,找到需要通知的 Subscription,然后通知 。我们接着看其他数据结构:
type Server struct { cmn.BaseService? cmds chan cmd cmdsCap int // check if we have subscription before // subscribing or unsubscribing mtx sync.RWMutex subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct}这个结构中我们主要看 cmds 这个 channel,当我们有新的订阅时,我们向这个 channel 中发送一个数据 。subscriptions 这个结构也维护了所有的订阅者的信息,有新的订阅时,首先检查一下之前是否已经订阅过,如果有,就返回对应错误信息 。
type cmd struct { op operation? // subscribe, unsubscribe query Query subscription *Subscription clientID string? // publish msg interface{} tags map[string]string}上面这个是 cmds 的结构,在 channel 中发送这个类型的数据, query:
type Query interface { Matches(tags map[string]string) bool String() string}这个里面有两个接口,一个是返回 string,另外一个就是判断是否相同 。这里她的作用就是类似于 tag,我们订阅时,需要指定我们关注的类型,可以理解为 tag,发布时也是如此,发布对应 tag 的消息 。其中 Matches 中,传入一个 map,根据其算法实现匹配,这里我们不深追这个算法,我们可以理解为,订阅时,我们生成了一个 query 的对象,发布时要说明发布的消息的类型,或者可以说是具有某种 tag 的消息,然后对比 query 对象,匹配上就发送消息 。
到这里我们订阅模式的各种结构基本上都聊了一下,还有 EventBus 部分没有细说,这部分主要就是和业务相关的了,所以我们这里留在后面的文章写,还有 tendermint 是如何使用上面的发布订阅模式也在下一篇文章中和大家分享 。


    推荐阅读