你好,我是徐文浩。

上节课里,我们看到了随着时代的变迁,人们已经不满足于通过MapReduce这样批处理的方式进行数据分析了。于是,Yahoo推出了S4,不过S4并没有在历史舞台上站稳脚跟。在S4的论文发表的同一年,我们今天的主角,也就是Storm走上了历史舞台。在接下来的几年里,Storm一度成为业界进行实时数据处理的标准解决方案。

令人惊叹的是,Storm并不是来自哪一个业界的大公司。而是来自一个只有3个人的创业公司BackType,它的主要作者南森·马茨(Nathan Marz)那个时候也才仅仅21岁。而没过多久之后,BackType就被当时如日中天的Twitter收购了。可以说,Storm证明了即使是到了今天,天才工程师仍然能够凭借一己之力,对整个行业产生重要的影响。

不过,作为一个开源项目,Storm一开始并没有以论文的形式发表。直到2014年,Twitter才发表了《Storm @Twitter》这样一篇更加偏重于Storm如何在Twitter内部使用的论文。而且,这篇论文中的作者中,也没有Storm最初的作者南森·马茨。虽然论文里没有南森的名字,但是Storm最大的功劳也仍然来自于他这个最初的作者。

那么,今天我们也会不局限于论文本身的内容,而是会着重讲解Storm的基本原理和系统架构。在后面的课程里,我们还会进一步讲解南森提出的组合Storm和Hadoop的Lambda数据处理架构。

在学完这节课之后,我希望你能够理解:

典型的Master+Worker架构

在上节课里,我们看到S4把数据的流式计算,抽象成了一个叫做PE的单元。所有的数据、处理逻辑都是基于PE的,整个系统也是一个没有Master,完全对称的架构。虽然Storm在流式计算上,也采用了类似的有向无环图(DAG)的逻辑模型,但是在整个系统架构上,Storm却要传统很多。

和S4不同,Storm是一个典型的Master+Worker的分布式系统架构,并且将传送的消息和对应消息的处理逻辑做了分离。那么,接下来,我们就一起先来看一下Storm的整体架构。

基于Topology的逻辑模型

和S4类似,Storm系统的抽象模型,也是一个有向无环图。在Storm里,这样一个有向无环图,叫做Topology,也就是拓扑图。整个图里有这样几个元素:

图片

Storm的抽象模型里,和S4的最大不同就在 Bolts 上。S4的PE,不仅是一个功能逻辑的单元,也是一个KV对的数据。同样类型的事件下,所有相同的Key的数据,都会聚合到同一个PE下。这就使得整个系统里有大量的PE对象,也导致S4的整个系统有几个显著的设计问题。

首先就是内存占用和GC开销,大量的PE会占用大量的内存

但是,这个内存占用又是应用开发人员完全控制不了的。因为系统里有多少PE,以及当前计算节点的内存占用是否过大,是由S4框架控制的。而对于应用开发人员来说,他操作的只是单个PE对象,显然单个PE对象本身不会占用太多内存。在内存不足的时候,我们原本可以在应用层面有更灵活的操作,比如更频繁地把数据输出到外部的KV数据库里,释放掉内存占用。但是在S4的框架下,我们做不到这一点。

其次就是我们的业务逻辑代码里,混入了控制分布式数据分发的逻辑

我们在上节课的代码里面看到了,为了让Top K的排序能够分布式地并行执行,对应的示例代码中,特地将输出的Key变成了(SortID,N)这样的组合。也就是靠PE里的逻辑处理代码,来设置整个拓扑图的并行度。这使得我们分布式的分发逻辑,和数据处理逻辑混合在了一起,整个系统被耦合起来了。

而且,如果我们的数据量增加了,想要提升并行度,我们不能简单地修改参数,而是需要修改代码,重新编译部署。并且,历史上已经处理了(SortID, N)组合PE,在重新部署之后,处理的数据可能是完全不同的,因为我们的N的最大值已经变了。

而Storm的设计并不相同,Storm里的Bolt更像是MapReduce里的Map或者Reduce函数。我们可以在Topology里面,去设置不同Bolt的并行度,以及设置数据流是如何分组的。但是,每个Bolt输出的Tuple本身,却不需要通过生成一个类似于(SortID, N)这样一个特殊的Key,来定义下一层的Bolt的并行度。在Storm里面,对应的数据流可以进行这样几种分组(Grouping):

如果你对照着这里的示意图,可以看到在Storm下进行Top K的单词排序,两边的数据流向是不一样的,S4里的一个WordCountPE的输出,只会给一个SortPE;而Storm里的WordCountBolt的输出,会发送给多个不同的SortCountBolt,因为同一个WordCountBolt下,会包含很多个不同的单词。

图片

Master+Worker的系统架构

我们说Storm的Bolt很像MapReduce的Map和Reduce函数,其实Storm本身的架构也和MapReduce非常相似。和我们上节课看过的无中心的S4不同,Storm选择了一个典型的Master+Worker的架构设计。整个Storm集群里,也是由 Nimbus+Supervisor+Worker 这样三种类型的进程组成的。

首先是Nimbus进程,其实也就是Storm集群的Master节点。它的作用,类似于Hadoop里的JobTracker,或者说MapReduce里的Scheduler+Master,也就是负责资源的分配和任务的调度。

开发人员会直接提交一个Topology给Master。这个Topology,之前只是一个抽象的有向无环图。而在实际应用里,它就好像一个MapReduce的任务一样,是一个编译好的程序和对应的配置。只不过,MapReduce的任务执行完了就结束了。而作为流式计算,Topology这个任务如果我们不去终止它,它就会永不停歇地运行下去。

然后是Supervisor进程,这个类似于Hadoop里的TaskTracker,也就是MapReduce里的Worker。Supervisor在每一个服务器上都会有一个,它本身不负责执行任务,但是会负责接收Nimbus分配的任务,然后管理本地的Worker进程,让Worker进程来实际执行任务。

最后是Worker进程,一台服务器上会有多个Worker进程。Storm是使用Clojure写的,跑在JVM上,所以每一个Worker进程就是一个独立的JVM,Worker里面还会通过JVM的Executor来维护一个线程池。然后实际的线程池里,会有很多个Spout/Bolt的任务。因为Java的Executor的实现里会复用线程,所以Spout和Bolt实际上会使用同一个线程。这个,也会大大减少整个系统的开销。

而把整个系统拆分成Nimbus、Supervisor和Worker三种进程,就使得Storm的容错能力也大大增强了。

图片

Nimbus和Supervisor之间,并不是直接通信的。因为如果这样的话,显然Nimbus会成为一个故障的“单点”。所以Nimbus是把对应的任务分配写到Zookeeper里,也就是一个类似于Chubby这样的分布式锁系统。所以我们的任务分配是持久化的,而且会由Paxos协议来保障容错能力。而Supervisor也是从Zookeeper里面,去读取对应的任务分配。

Nimbus和Supervisor的职责都非常简单,Nimbus只需要进行Topology的解析和任务调度,而Supervisor只需要接收任务,并且监控Worker进程是否存活。它们本身不处理数据,而且也不在内存里面保存数据。即使挂掉了,也只需要简单重启一下进程就好了。

这种类似的设计思路,我们在Megastore里的协同服务器(Coordinator)里已经看到过一次了,通过把一些职责单独拆分出来,让特定的节点足够简单。即使这些节点可能成为单点,但是它们的稳定性,也会远高于要处理复杂逻辑的Worker进程或者Bigtable。

图片

这样来看,Storm的整体设计思路和MapReduce很像,各个节点的角色都能在MapReduce的各种节点里找到对应的影子。其实,各类分布式系统的设计思路都是类似的,特别是这样Master+Worker组合的模型,那就是Master负责调度,Worker负责实际处理问题。而为了解决高可用性,往往我们会引入分布式锁,确保任务分配的数据不依赖Master。

另外,为了让整个系统更稳定,我们也会拆分调度任务的进程,和直接执行任务的进程,让每个进程都只有单一的职责。希望在学习和解读完这么多篇论文之后,你在未来的系统设计里。也能遵循这样的理念。

Storm的容错机制

在S4的论文里,Yahoo并没有详细说明它的上下游是怎么通信的。不过在Storm里,我们倒是可以通过源码很清楚地知道,它是通过 ZeroMQ 这个消息队列,完成两个不同的Worker之间的通信的。

相比于通过一个RPC,消息队列有一个很大的优点,那就是高性能。上游节点不需要等待下游节点返回接收成功,就能发送下一条信息。不过,这也带来了一个问题,就是如果在消息发送之后,下游是否成功接收并处理了这条消息,上游是不知道的。可能因为网络超时、也可能因为下游节点的软硬件故障,在分布式系统里,“错误”是在所难免的

而且,在流式数据处理里,我们可能不只有一层链路。就拿论文里的统计Tweet里的单词数量为例,我们先要从一个TweetSpout里,读取数据流里的Tweet,随机发送给到一个ParseTweetBolt,这个Bolt会解析Tweet成一个个单词,再发送给下游的多个WordCountBolt。

要注意,这里下游不只有一个WordCountBolt,而是不同的单词会发送给不同的WordCountBolt。而且,任何一个WordCountBolt没有被成功处理,都意味着我们面临“错误”。

Storm选择的解决方案,是把从Spout发起的第一个Tuple作为一棵树的根。下游所有衍生出来发送的Tuple,都是这棵树的一部分。任何一个Tuple处理失败或者超时了,那么就从Spout重新发送消息。

而要做到这一点,Storm需要在系统里引入一个特殊的Bolt,叫做AckerBolt。Spout发送出去的消息,同时会通知给到AckerBolt。而Bolt一旦处理完根Tuple相关的消息,也会通知给到Acker。

Bolt会告诉AckerBolt两个信息,一个是我已经处理完了某一个Tuple,另一个是这个Tuple衍生往下游的哪些Tuple我也已经发送出去了。这样,Acker就有了一开始Spout发出的Tuple的整棵树的完整信息。等到最后一层的Bolt处理完对应的Tuple,然后发送了对应的通知给到AckerBolt,并且告诉它后面没有新的Tuple了,那么AckerBolt就知道,整棵Tuple树已经处理完成了。

相信看到这里,你会和我有一个同样的疑问,那就是如果这样,AckerBolt的开销岂不是会非常大?

因为看起来,我们要在AckerBolt里面,存上整棵Tuple树。更准确地说,AckerBolt不是要存一棵Tuple树,而是要把所有还在处理中的Tuple都存下来。这就相当于一个AckerBolt,需要存下所有Spout和Bolt在整个处理过程中的内存占用,这样我们的集群哪里抗得住?

别着急,Storm采用了一个很巧妙的办法,那就是利用位运算里的异或(XOR)。Storm给每一个发送出去的Tuple都会分配一个64位的message id。当消息从Spout被发送出去的时候,Storm会给AckerBolt发送这个message-id,告诉它,你要开始追踪这个Tuple树了。Acker里呢,则会维护一个message-id到校验码(checksum)的映射关系。这个校验码,一开始就是拿0和message-id去异或(XOR)一下。

而下游的每一个Bolt,会处理完这个Tuple相关的消息,并且向外发送新的Tuple。每个新发送的Tuple里,都需要带上根Tuple的message-id。在新Tuple发送出去之后,Bolt会通知AckerBolt,通知的内容也很简单,也是一个根message-id到校验码的映射关系。

这里的校验码,就是把当前对外发送的所有消息的message-id,和已经处理完的消息的message-id做一下异或。然后AckerBolt收到这个消息,会把收到的校验码,和本地的校验码也做一下异或,更新成最新的校验码。

因为异或操作,就是当两个数字完全相同的时候,会变成0,也就是A XOR A = 0。而在其他情况下,最后的结果一定不会每一位都是0。我们发送一次消息,并且acking一次消息,相当于在这个校验码上执行了一次A XOR A。所以,只要有Tuple还没有被acking,我们的校验码就不会是0,但是一旦所有的Tuple树上的Tuple都被acking了,那么这个校验码必然就是0。

通过message-id加上校验码,Storm只需要16 bytes就能在AckerBolt里维护一个Tuple树是否已经都处理完了。这样,即使你每秒需要处理10万条消息,AckerBolt里需要维护30秒的Tuple,也只需要48MB的内存空间,这即使对于10年前的服务器来说也是绰绰有余的了。

而且,所有的Bolt通知Acker最新的执行情况,也只需要发送16 bytes的messsage-id和校验码,既不需要发送Tuple的原始内容,也不需要为向下游发送的每一个Tuple都单独发送一条消息,占用的性能消耗也不会太大。

不过,需要注意的是,这个机制只能保障,Spout发出来的Tuple至少被处理一次,也就是At Least Once,但是它避免不了Tuple可能被重复处理

比如,拿我们的Top K排序的Topology来说,任何一个单词在某个Bolt里没有被正确处理,就需要重新处理整个句子。这也意味着,其他单词会被重复统计。所以,这个通过AckerBolt进行容错重发的机制,并不适合所有的应用场景。你需要根据自己的实际业务需求,来决定要不要启用这个机制。

有了At Least Once,那你自然会想到还有At Most Once,也就是一条消息最多发一次。这个要求其实在Storm里很容易做到,我们只要关掉这里的acking机制就好了。其实也就是上游只需要把消息发出去,下游有没有收到、有没有处理成功,上游就不管了。

然而,无论是At Least Once还是At Most Once,都不是我们最理想的进行流式数据处理的方式。我们真正希望的,是每个消息“恰好”被处理一次,也就是“Exactly Once”。所以,Storm还不是流式数据处理最终极的解决方案。在Storm之后,整个流式数据处理系统还会不断进化。

小结

好了,到这里,我们对于Storm的剖析就已经告一段落了。今天,我们一起梳理了Storm里的系统概念、整体的框架设计,以及它是如何巧妙地通过异或运算,来追踪整个Tuple的生命周期的。

在系统设计层面,Storm其实和我们之前看过的MapReduce/Bigtable之类的系统,有很多相似之处。Storm通过Nimbus这个主节点进行任务调度,通过Zookeeper存储所有的元数据,通过把工作节点拆分成Supervisor和Worker,来提升系统稳定性。可以说,太阳底下没有什么新鲜事儿。

而且,这样的系统架构,也的确比无中心的S4更加合理。不同于S4把数据和处理逻辑绑定成PE,并且连整个系统的并行度,也要依靠逻辑代码中选取的Key来决定。Storm完成遵循了“单一职责”这个模式,并且让分布式的并行度、数据的有向无环图,彻底和逻辑代码拆分开来了。

进一步地,Storm还开始针对业务层面的容错做出了更多考虑。相比于S4只是简单地在高压力下进行降级,Storm开始深入考虑一个分布式流式系统,应该怎样对于没能成功处理的消息进行错误恢复。通过巧妙地利用异或运算,Storm可以用很小的内存和网络开销,来追踪整个Tuple处理的生命周期,并通过重发消息实现“至少一次”的消息处理保障。

那么,在接下来的课程中,我们还会再深入其他的流式数据处理系统,一起去看一看流式数据处理会如何逐步走向完善。

推荐阅读

随着Storm登上历史舞台,如何把Storm这样的流式计算和MapReduce这样的批处理系统统一起来,也成为了一个热点话题。而Storm的作者南森本人,也提出了一个叫做Lambda架构的解决方案,Twitter后续也围绕这个架构,开发了Summingbird这个系统。

对于Lambda架构,南森·马茨本人在他写的《大数据系统构建》这本书里,做了非常详尽的讲解,如果有时间的话,我推荐你可以去快速翻阅一下这本书。

虽然在2021年的今天,你可能已经用上了Kafka、Flink,甚至觉得Lambda架构已经有些过时了。但是在整个系统开发中,不只是考虑系统硬件的故障,而是考虑到开发人员本身也会出错,也会写Bug,这是Lambda架构中最重要的核心思想。从这一点来说,Lambda架构在今天仍然有着它独特的价值。

另外,我们也会在后续讲解Kafka的过程中,来一起看看Kafka作者杰伊·克雷普斯(Jay Kreps)提出的Kappa架构,和南森·马茨提出的Lambda架构之间的对比。

思考题

Storm里,我们需要在Topology提交的时候,就设定好各个Bolt/Spout的数量,也就是整个系统的并行度。那么,如果遇到突然增加的流量,我们是否会遇到和S4一样的问题,不得不随机丢掉一些消息呢?我们可以做哪些工作,确保整个Storm系统在出现流量陡增的情况下,仍然可以通过添加硬件进行水平扩展呢?

欢迎在留言区分享你的答案和思考,也欢迎你把今天的内容分享给更多的朋友。感谢收听,咱们下节课再见。

评论