你好,我是徐文浩。

在上节课里,我们已经了解了Kafka的基本架构。不过,对于基于Kafka的流式数据处理,我们还有两个重要的问题没有回答:

那么,今天这节课,就是要帮助我们回答这两个问题。一方面,今天我们会深入来看一下,Kafka是如何随着Broker的增加和减少,协调上下游的Producer和Consumer去收发消息的。另一方面,我们会从整个大数据系统的全局视角,来看一下在有了Kafka和Storm这样的利器之后,我们的大数据系统的整体架构应该如何搭建。

Kafka的分布式系统的实现

首先,Kafka系统并没有一个Master节点。不过,这一点倒是不让人意外,主要是Kafka的整体架构实在太简单了。我们在上一讲就看到了,单个的Broker节点,底层就是一堆顺序读写的文件。而要能够分布式地分摊压力,只需要用好ZooKeeper 就好了。

每一个Kafka的Broker启动的时候,就会把自己注册到ZooKeeper上,注册信息自然是Broker的主机名和端口。在ZooKeeper上,Kafka还会记录,这个Broker里包含了哪些主题(Topic)和哪些分区(Partition)。

而ZooKeeper本身提供的接口,则和我们之前讲解过的Chubby类似,是一个分布式锁。每一个Kafka的Broker都会把自己的信息像一个文件一样,写在一个ZooKeeper的目录下。另外ZooKeeper本身,也提供了一个监听-通知的机制。

上游的Producer只需要监听Brokers的目录,就能知道下游有哪些Broker。那么,无论是随机发送,还是根据消息中的某些字段进行分区,上游都可以很容易地把消息发送到某一个Broker里。当然,Producer也可以无需关心ZooKeeper,而是直接把消息发送给一个负载均衡,由它去向下游的Broker进行数据分发。

高可用机制

而在Kafka最初的论文里,还没有包括Kafka的高可用机制。在这种情况下,一旦某个Broker节点挂了,它就会从ZooKeeper上消失,对应的分区也就不见了,自然数据我们也就没有办法访问了。

不过,在了解了这么多的分布式高可用方案之后,相信我们要自己实现一个Kafka的高可用方案,自然也不困难。在Kafka发布了0.8版本之后,它就支持了由多副本带来的高可用功能。

在现实中,Kafka是这么做的:

Producer发送数据、Broker接收并存储下来的逻辑就这么简单。不过,下游Consumer去消费数据的逻辑就稍微复杂一点了。主要的挑战,来自于我们可以动态增减Broker和Consumer。

负载均衡机制

Kafka的Consumer一样会把自己“注册”到ZooKeeper上。在同一个Consumer Group下,一个Partition只会被一个Consumer消费,这个Partition和Consumer的映射关系,也会被记录在ZooKeeper里。这部分信息,被称之为“所有权注册表”。

而Consumer会不断处理Partition的数据,一旦某一段的数据被处理完了,对应这个Partition被处理到了哪个Offset的位置,也会被记录到ZooKeeper上。这样,即使我们的Consumer挂掉,由别的Consumer来接手后续的消息处理,它也可以知道从哪里做起。

那么在这个机制下,一旦我们针对Broker或者Consumer进行增减,Kafka就会做一次数据“再平衡(Rebalance)”。所谓再平衡,就是把分区重新按照Consumer的数量进行分配,确保下游的负载是平均的。Kafka的算法也非常简单,就是每当有Broker或者Consumer的数量发生变化的时候,会再平均分配一次。

如果我们有X个分区和Y个Consumer,那么Kafka会计算出 N=X/Y,然后把0到N-1的分区分配给第一个Consumer,N到2N-1的分配给第二个Consumer,依此类推。而因为之前Partition的数据处理到了哪个Offset是有记录的,所以新的Consumer很容易就能知道从哪里开始处理消息。

图片

而和Storm一样,本质上,Kafka对于消息的处理也是“至少一次”的。如果消息成功处理完了,那么我们会通过更新ZooKeeper上记录的Offset,来确认这一点。而如果在消息处理的过程中,Consumer出现了任何故障,我们都需要从上一个Offset重新开始处理。这样,我们自然也就避免不了重复处理消息。

如果你希望能够避免这一点,你需要在实际的消息体内,有类似message-id这样的字段,并且要通过其他的去重机制来解决,但是这并不容易做到。

顺序保障机制

不过,Kafka虽然有很强的性能,也在发布之后很快提供了基于多副本的高可用机制。但是Kafka本身,其实也是有很多限制的。

首先,是Kafka很难提供针对单条消息的事务机制。因为我们在ZooKeeper上保存的,是最新处理完的消息的一个Offset,而不是哪些消息被处理完了、哪些消息没有被处理完这样的message-id => status的映射关系。所以,Consumer没法说,我有一条新消息已经处理完了,但是还有一条旧消息还在处理中。而是只能按照消息在Partition中的偏移量,来顺序处理。

其次,是Kafka里,对于消息是没有严格的“顺序”定义的。也就是我们无法保障,先从应用服务器发送出来的消息,会先被处理。因为下游是一个分布式的集群,所以先发送的消息X可能被负载均衡发送到Broker A,后发送的消息反而被负载均衡发送到Broker B。但是Broker B里的数据,可能会被下游的Consumer先处理,而Broker A里的数据后被处理。

不过,对于快速统计实时的搜索点击率这样的统计分析类的需求来说,这些问题都不是问题。而Kafka的应用场景也主要在这里,而不是用来作为传统的消息队列,完成业务系统之间的异步通信。

数据处理的Lambda架构

其实,有了Storm和Kafka这样的实时数据处理架构之后,另一个问题也就浮出了水面。既然我们已经可以获得分钟级别的统计数据,那我们还需要MapReduce这样的批处理程序吗?

答案当然还是需要的,因为在目前的框架下,我们的流式计算,还有几个问题没有处理好。

首先,是我们的流式数据处理只能保障“至少一次(At Least Once)”的数据处理模式,而在批处理下,我们做到的是“正好一次(Exactly Once)”。也就意味着,批处理计算出来的数据是准确的,而流式处理计算的结果是有误差的。

其次,是当数据处理程序需要做修改的时候,批处理程序很容易修改,而流式处理程序则没有那么容易。比如,增加一些数据分析的维度和指标。原先我们只计算点击率,现在可能还需要计算转化率;原先我们只需要有分国家的统计数据,现在还要有分省份和分城市的数据。

我们原先的计算结果已经保存在数据库或者HDFS上了。那么对于批处理程序来说,我们的解决方案也很容易,那就是选定一个我们希望新的报表需要覆盖的时间范围,比如过去30天。我们撰写一个新的MapReduce程序,运行出新的计算结果,保存成新的数据表。我们可以把旧的数据表删除,用新的数据表替换就好了。

通常,我们的Hadoop集群不只要承担报表任务,也会承担很多临时的分析任务。所以一般来说,像Hadoop这样的批处理集群的计算资源对于单个报表来说是足够富余的,重跑30天的数据分析,往往也可以在1~2天内完成。

流式数据处理的性能压力

但是对于流式处理,问题就有些麻烦了,特别是在没有Kafka的时候。

我们重新撰写一个新的Storm的Topology,来支持新的分析维度和指标并不困难。困难的地方在于,我们需要在不影响正在线上运行的程序的情况下,进行新版本程序的发布。

一个解决方案是,我们写了一个新的Topology,然后需要重放(Replay)过去30天的日志数据。如果我们用的是Scribe这样的流式日志传输系统,我们会发现日志流都已经上传到了HDFS上,我们还需要有一个程序,从HDFS上把数据拉出来发送到Scribe里。

而即使你用了Kafka,数据都存放在了Kafka Broker的本地硬盘上,重放日志的动作你还是少不了的。这个时候,你会面临的问题是,重放日志需要花费很多时间、或者短时间内会消耗很多计算资源。一般来说,你最多也就为流式数据处理,预留平时日志量3~5倍的计算能力。那如果你需要重放30天的日志,你就需要等上6~10天才能重放完。

这样就意味着每次修改程序,要么你只能更新新的数据产生的报表,要么你就要等上好几天,才能看到最后的计算结果。

其实,最常会发生的变更,既不来自于硬件故障导致的数据重复处理,也不是来自于业务需求变更导致我们需要修改程序。最常会发生的变更,来自于解决分析程序里的各种Bug。在这种场景下,我们的输入数据不会发生变化,输出的表结构也不会发生变化。但是,我们可能需要反复修改数据处理程序,并且反复在同一份日志数据集上运行这个程序。

这样的程序运行场景,对于大数据的批处理来说,压力并不大,但是对于流式数据处理,一样会有大量重放日志的工作量。

Lambda架构的基本思想

有鉴于此,Storm的作者南森·马茨(Nathan Marz)提出了Lamda架构,把大数据的批处理和实时数据结合在一起,变成一个统一的架构

Nathan的思路是这样的,我们先不去看具体数据是通过什么计算框架来处理的,而是把整个数据处理流程抽象成 View = Query(Data) 这样一个函数。我们所有的报表、看板、数据分析结果,都是来自对于原始日志的分析。

所以,原始日志就是我们的主数据(Master Data),不管是MapReduce这样的大数据批处理,还是Storm这样的大数据流式处理,都是对于原始数据的一次查询(Query)。而这些计算结果,其实就是一个基于特定查询的视图(View)。

当我们的程序有Bug,其实就是查询写错了,我们的主数据没有变,我们视图的含义也没有变,我们只需要重新写一个查询就好了。而如果我们有需求层面的变更,就是我们需要一个新的视图,以及对应的新的查询了。

而对于实际数据分析系统的用户来说,其实他关心的既不是Query也不是Master Data,而是一个个View。那么,我们在系统的整体架构上,就只需要对这些用户暴露出View,而不需要告诉他们,具体下面的Query和Master Data的细节就好了。这样,我们可以按照Hadoop和Storm本身合适的场景进行选择。

一方面,我们可以通过Storm进行实时的数据处理,能够尽快获得想要的报表和数据分析结果。另一方面,我们同样会定时运行MapReduce程序,获得更准确的数据结果。在MapReduce程序运行完之前,我们的分析决策基于Storm的实时计算结果;但是当MapReduce更准确的计算结果出来了,我们就可以拿这个结果替换掉之前的实时计算结果。

而对于外部用户来说,他们看到的始终是同一个视图,只是这个视图,会随着时间的变化不断修正数据结果罢了。

所以,Nathan Marz总结的Lambda结构,是由这样几部分组成的:

对于外部的用户来说,他不需要和批处理层以及实时处理层打交道,而只需要通过像SQL这样的查询语言,直接去查询服务层就好了。

数据处理的Kappa架构

可以看到,Lambda架构很好地结合了MapReduce和Storm的优点。而这个Lambda结构,最终也变成了Twitter的一个开源项目SummingBird。但是,这个Lambda架构也有一个显著的缺点,也就是什么事情都需要做两遍。

这个做两遍,体现在两个方面:

而且,因为批处理层和实时处理层的代码不同,我们还不得不解决,两遍对于同样视图的理解不同,采用了不同的数据处理逻辑,引入新的Bug的问题。

不过,在Kafka还没有成熟的时候,把数据分成批处理层和实时处理层是很难避免的。主要问题在于,我们重放实时处理层的日志是个开销很大的动作。通过Scribe这样的日志收集器,我们的Master Data最终是以一个个固定文件落地到HDFS的文件系统上。一旦我们想要重放日志,我们就需要把日志从HDFS上,分片拉到不同的服务器上,再搭建起多个Scribe的集群,去重放日志。

但是,在有了Kafka之后,重放日志一下子变得简单了。因为我们所有的日志,都会在Kafka集群的本地硬盘上。而通过重放日志来重新进行数据计算,也只是设定一下新的分析程序在ZooKeeper上的Offset就好了。

有鉴于此,Kafka的作者杰伊·克雷普斯(Jay Kreps)就提出了一个新的数据计算框架,称之为Kappa架构。Kappa架构在View = Query(Data)这个基本的抽象理念上,和Lambda架构没有变化。但是相比于Lambda架构,Kappa架构去掉了Lambda架构的批处理层,而是在实时处理层,支持了多个视图版本

我们之所以要有View = Query(Data)这么一个抽象,是因为我们的原始日志,也就是Data是不会变化的,而我们想要的View也不会变化。但是具体的Query,可能会因为程序有Bug而比较频繁地被修改。

在Kappa架构下,如果要对Query进行修改,我们原来的实时处理层的代码可以先不用动,而是可以先部署一个新版本的代码,比如一个新的Topology上去。然后,我们会对这个Topology进行对应日志的重放,在服务层生成一份新的数据结果表,也就是视图的一个新的版本。

在日志重放完成之前,外部的用户仍然会查询旧的实时处理层产生的结果。而一旦日志重放完成,新的Topology能够赶上进度,处理到最新产生的日志,那么我们就可以让查询,切换到新的视图版本上来,旧的实时处理层的代码也就可以停掉了。

而随着Kappa架构的提出,大数据处理又开始迈入了一个新的阶段,也就是“流批一体”逐步进入主流的阶段。而这个,也是我们接下来的课程中要探讨的主题了。

小结

好了,到这里,我们对于Kafka、Lambda架构,以及Kappa架构也就学习完了。在今天这节课里,我们看到,Kafka的分布式架构其实非常简单。

Kafka本身没有Master,每一个Broker节点都会把自己注册到ZooKeeper上。所有的Broker本身也不维护任何状态,对应的状态信息也是放在ZooKeeper上,而下游的Consumer也是一样。对应Consumer处理数据到哪里了,就是简单地在ZooKeeper上,为每一个分区维护了一个最新的Offset。而对应的数据分区分配给哪一个Consumer,也是通过ZooKeeper里的“所有权注册表”记录下来的,分配的逻辑也非常简单,也就是按照分区和Consumer的数量,均匀地根据ID顺序分配。

而有了Storm和Kafka之后,工程界开始思考如何将数据的批处理和流式处理统一起来。Storm的作者Nathan Marz提出了一个抽象概念,那就是 View = Query(Data)。我们的数据处理程序,只是针对数据的一个抽象函数,本身没有状态,这个函数运行在一个主数据集上,可以拿到一个对应的结果视图。所以,基于这个概念,他把自己设计的架构称之为Lambda架构。

在Lambda架构里,我们的数据处理程序,被分成批处理层实时处理层,以及服务层。批处理层的结果会不断替换掉实时处理层的计算结果,以不断给出更准确的视图。而外部的应用,只会查询服务层,并不需要关心底层的数据处理的实现是怎么样的。

不过,在Lambda架构下,我们的数据需要处理两遍,我们的程序代码也要在不同的计算框架下实现两遍。为了减少这样的双倍开销,Jay Kreps提出了Kappa架构。

Kappa架构利用了Kafka把日志数据保留在Broker本地硬盘,重放非常容易这样一个特点,提出了放弃批处理层,转而在实时处理层提供多个程序版本的思路。而这个思路,也会是接下来几年里,大数据处理进一步进化的主要方向。

推荐阅读

这节课,我要给你推荐的阅读材料,就是关于Kappa架构的。在Storm发布之后,很快Lambda架构就成为了大数据处理的一个主流的架构设计方案。直到2014年,Kafka的作者Jay Kreps,撰写了“Questioning the Lambda Architecture”这样一篇文章,讲述了他对于Lambda架构不足之处的思考,以及他认为的流式数据处理的Kappa架构应该是什么样的。

我推荐你去读一下原文,有助于你理解现代的流式数据处理模式,是怎么样一步步进化过来的。

思考题

在最初的论文里,无论是Kafka还是Storm,都在设计上,只保障了“至少一次”的数据处理模式。那么,我们是否有可能在Kafka和Storm上,实现“正好一次”的数据处理机制呢?如果要实现这样的机制,整个数据处理流程会是怎么样的?以及它会对Kafka和Storm带来哪些负面影响呢?

欢迎在留言区分享你的答案和思考,也欢迎你把今天的内容分享给更多的朋友。

评论