这是《》系列的第九篇文章,讲用 muduo 实现一个简单的 。

Muduo 全系列文章列表:

本文介绍用 muduo 实现一个简单的 topic-based 消息广播服务,这其实是“聊天室”的一个简单扩展,不过聊天的不是人,而是分布式系统中的程序。

本文的代码见

在分布式系统中,除了常用的 end-to-end 通信,还有一对多的广播通信。一提到“广播”,或许会让人联想到 IP 多播或 IP 组播,这不是本文的主题。本文将要谈的是基于 TCP 协议的应用层广播。示意图如下:

hub

上图中圆角矩形代表程序,"Hub"是一个服务程序,不是网络集线器,它起到类似集线器的作用,故而得名。Publisher 和 Subscriper 通过 TCP 协议与 Hub 程序通信。Publisher 把消息发到某个 topic 上,Subscribers 订阅该 topic,然后就能收到消息。即 publisher 借助 hub 把消息广播给了多个 subscribers。这种 pub/sub 结构的好处在于可以增加多个 Subscriber 而不用修改 Publisher,一定程度上实现了“解耦”(也可以看成分布式的 observer pattern)。 由于走的是 TCP 协议,广播是基本可靠的,这里的“可靠”指的是“比 UDP 可靠”,不是“完全可靠”。(思考:如何避免 Hub 成为 single point of failure?)

为了避免串扰(cross-talk),每个 topic 在同一时间只应该有一个 publisher,hub 不提供 compare-and-swap 操作。

(“可靠广播、原子广播”在分布式系统中有重大意义,是以 replicated state machine 方式实现可靠的分布式服务的基础,“可靠广播”涉及 consensus 算法,超出了本文的范围。)

应用层广播在分布式系统中用处很大,这里略举几例:

1. 体育比分转播。有 8 片比赛场地正在进行羽毛球比赛,每个场地的计分程序把当前比分发送到各自的 topic 上(第 1 号场地发送到 court1,第 2 号发送到 court2,以此类推)。需要用到比分的程序(赛场的大屏幕显示,网上比分转播等等)自己订阅感兴趣的 topic ,就能及时收到最新比分数据。由于本文实现的不是 100% 可靠广播,那么消息应该是 snapshot,而不是 incremental。(换句话说,消息的内容是“现在是几比几”,而不是“刚才谁得分”。)

2. 负载监控。每台机器上运行一个监控程序,周期性地把本机当前负载(CPU、网络、磁盘、温度)publish 到以 hostname 命名的 topic 上,这样需要用到这些数据的程序只要在 hub 订阅相应的 topic 就能获得数据,无需与多台机器直接打交道。(为了可靠起见,监控程序发送的消息里边应该包含时间戳,这样能防止 stale 数据,甚至一定程度上起到心跳的作用。)沿着这个思路,分布式系统中的服务程序也可以把自己的当前负载发布到 hub 上,供 load balancer 和 monitor 取用。

协议

为了简单起见,muduo 的 hub 示例采用以 '/r/n' 分界的文本协议,这样用 telnet 就能测试 hub。协议只有三个命令:

  • sub /r/n

    • 该命令表示订阅 ,以后该 topic 有任何跟新都会发给这个 tcp 连接。在 sub 的时候,hub 会把该 上最近的消息发给此 subscriber。

  • unsub /r/n

    • 该命令表示退订

  • pub /r/n/r/n

    • 往 发送消息,内容为 。所有订阅了此 的 subscribers 会收到同样的消息“pub /r/n/r/n”

代码

muduo 示例中的 hub 分为几个部分:

  • hub 服务程序,负责一对多的消息分发。它会记住每个 client 订阅了哪些 topic,只把消息发给特定的订阅者。代码见

  • pubsub 库,为了方便编写使用 hub 服务的应用程序,我写了一个简单的 client library,用来和 hub 打交道。这个 library 可以订阅 topic、退订 topic、往指定 topic 发布消息。代码见 和

  • sub 示例程序,这个命令行程序订阅一个或多个 topic,然后等待 hub 的数据。代码

  • pub 示例程序,这个命令行程序往某个 topic 发布一条消息,消息内容由命令行参数指定。代码

一个程序可以既是 publisher 又是 subscriber,而且 pubsub 库只用一个 tcp 连接(这样 failover 比较简便)。

使用范例:

  1. 开启 4 个命令行窗口

  2. 在第一个窗口运行 $ hub 9999

  3. 在第二个窗口运行 $ sub 127.0.0.1:9999 mytopic

  4. 在第三个窗口运行 $ sub 127.0.0.1:9999 mytopic court

  5. 在第四个窗口运行 $ pub 127.0.0.1:9999 mytopic "Hello world."  ,这时第二三号窗口都会打印 “mytopic: Hello world.”,表明收到了 mytopic 这个主题上的消息。

  6. 在第四个窗口运行 $ pub 127.0.0.1:9999 court "13:11"  ,这时第三号窗口会打印 “court: 13:11”,表明收到了 court 这个主题上的消息。第二号窗口没有订阅此消息,故无输出。

借助这个简单的 pub/sub 机制,还可以做很多有意思的事情。比如把分布式系统中的程序的一部分 end-to-end 通信改为通过 pub/sub 来做(例如,原来是 A 向 B 发一个 SOAP request,B 通过同一个 tcp 连接发回 response (分析二者的通信只能通过查看 log 或用 tcpdump 截获);现在是 A 往 topic_a_to_b 上发布 request,B 在 topic_b_to_a 上发 response),这样多挂一个 monitoring subscriber 就能轻易地查看通信双方的沟通情况,很容易做状态监控与 trouble shooting。