你好,我是徐文浩。

到Spanner为止,我们已经把大数据里,关于数据存储和在线服务的重要论文解读完了。从这一讲开始,我们就要开始讲解另一个重要的主题,也就是大数据的流式处理。今天我们解读的第一篇论文,来自一个曾经辉煌但是今天已经逐渐销声匿迹的公司Yahoo。这篇论文就是《S4:Distributed Stream Computing Platform》,伴随着这篇论文的,同样是一个开源系统Apache S4。和同样孵化自Yahoo的Hadoop不同,S4虽然是最早发布的开源分布式流式数据处理系统,但是在市场上最终却没有占有一席之地。

不过,学习S4的论文本身还是很有价值的。一方面,你可以看到大数据流式处理的基础结构是怎么样的,你会发现它和批处理的MapReduce是非常相似的;另一方面,它又会面临比批处理的MapReduce多得多的挑战和困难。

那在学完这一讲之后,你能有这样几点收获:

好了,那接下来就请和我一起进入流式计算的世界吧。

实时计算到底有多“实时”?

在MapReduce出现之后,整个大数据处理领域就红火了起来,Hadoop这样的开源项目也很快深入到各大互联网公司。一方面,MapReduce帮我们解决了海量数据处理的问题;但是另一方面,我们常常又觉得MapReduce不太够用,因为很多时候,我们希望能够更加“实时”地处理数据。

之所以我们希望能够“实时”进行数据处理,是由于“效益”的原因。MapReduce一开始的主要的应用领域,就是广告和搜索。通过MapReduce程序,我们可以分析海量的用户搜索行为、广告点击行为,来帮助我们优化搜索排序和广告展示。

比如在广告领域,最简单的一个优化的办法,就是统计新广告的点击率,点击率太低的广告,我们不予展示,把展示机会给别的广告就好了。在论文里,Yahoo也举了这样一个例子,通过在线统计点击率,把低质量的广告过滤掉,S4可以帮助他们再提升3%的点击率,并且对广告收入没有任何影响。

这个需求,和我们通过MapReduce生成各种报表是一回事儿。唯一的一个差异在于,我们希望这个数据统计的反馈的时间能够尽量短一些,也就是“实时一点”。而这个差异,就触到了MapReduce的痛点了。

一般来说,我们的MapReduce都是定时执行的,比如每天运行一次,生成一个报表,或者频繁一点,每小时运行一次,计算上一个小时的点击率数据。但是,这个获得反馈数据的频率还是太慢了。每小时运行一次MapReduce程序,意味着我们的统计数据,平均要晚上半个小时。而半个小时里,低质量的广告或者搜索结果已经曝光了很多次了。

举个例子,现在无论是什么样的社会热点新闻,很容易在微博热搜上出现。往往在新闻发生后的一两分钟,就已经有大量的搜索出现在微博热搜里了。如果我们要等待半个小时,才能统计到这些搜索,那么热搜功能就可以说形同虚设了。

所以,我们希望能在尽可能短的时间内,就得到这样的反馈数据。那么,你可能要问了,我们能不能直接更频繁地运行MapReduce程序呢?比如,每分钟运行一次,是不是就解决这个问题了?

如果你仔细学习了之前的课程,相信你自己也会意识到这样是行不通的。采用频繁运行MapReduce程序的办法,我们至少会遇到两个问题。

一个是大量的“额外开销”。

我们之前讲过,MapReduce的额外开销不小,再小的任务也需要个十几秒到一分钟的运行时间。如果我们高频率每分钟运行MapReduce任务,那么“额外开销”占的时间比重和硬件资源会非常高,也很浪费。

二个是我们不得不让输入文件变得极其“碎片化”。

无论是GFS还是HDFS,都是把文件变成一个个64MB大小的Block,然后MapReduce通过分布式并行读取来进行快速分析。

但是如果我们需要每分钟都处理数据,那么对于输入的数据,就要按照分钟进行分割。每分钟我们都需要有很多个文件,分布到GFS/HDFS上不同的数据节点。这样,我们的文件都会变得很小,也就丧失了顺序读取大文件的性能优势。

其实,高频率地执行MapReduce还会有很多问题。而归根到底,是这两点:

所以,我们需要一个全新的流式数据处理系统,这也是S4这个系统的出发点。

流式计算的逻辑模型

我们先来看一看S4,是怎么抽象我们的流式计算的。S4把所有的计算过程,都变成了一个个处理元素(Processing Element)对象,简称为PE对象。我这里特地加上了对象,就是因为在实现上,PE就是一个面向对象编程里面一个实际的对象。

每一个PE对象,都有四部分要素组成,分别是:

对于流式的数据处理,就是由一个个PE组成的有向无环图(DAG)。有向无环图的起点,是一些特殊的被称为无键PE(Keyless PE)的对象。这些对象的作用,其实就是接收外部发送来的事件流,这些外部发送过来的事件流,其实就是一条条的消息。

这些无键PE会解析对应的消息,变成一个个事件。然后给每个事件打上三个信息,分别是:

然后可以把事件给发送出去。接着下游的其他PE对象,会根据自己定义的事件类型,和能处理的键来接收对应的消息,并且处理这个消息。如果当前系统里,没有对应的键的PE,那么系统会创建一个新的PE对象。

处理数据的PE对象,可以选择处理完之后立刻发送一个新的事件出去;也可以选择在对象内部来维护一个状态,然后当处理了一定数量的消息之后,或者过了一个固定的事件间隔之后把消息发送出去。

最后,在整个有向无环图的终点,会有一系列的PE对象。这些对象,会把最终的计算结果发布(Publish)。这个发布的频率,也和其他PE发送消息的逻辑类似,可以在每收到一个事件就发送,也可以要求接收到一定数量的事件,或者每隔一个特定的时间间隔发送。

图片

这么来描述,可能整个过程有点过于抽象,我们还是来一起看一看论文里图一的示例。这个例子,是用来统计整个系统里,出现得最多的K个单词,也就是Top K,它的整个DAG的结构是这样的:

S4这个把整个数据处理流程,变成一个有向无环图的设计,也是后续所有流式处理系统都采用的一个解决方案。所有的数据,变成了事件流,而开发人员只需要做两件事情:

而开发人员,不需要关心数据是在哪里被处理的。这些,都由S4这个分布式系统自己来决定。

师从MapReduce的设计理念

其实S4的系统架构,和我们之前看过的MapReduce这样的框架一脉相承。PE其实和Map/Reduce函数一样,只是一个抽象的概念。不过S4的系统设计,要更加激进一点,那就是 S4选择了一个无中心的,完全对称的架构

S4和我们之前看过的所有系统都不一样,没有所谓的Master节点。如果一定要说有一个中心化的地方的话,S4依赖于Zookeeper,也就是一个类似于Chubby这样的分布式锁系统。S4的所有服务器,都会作为一个处理节点(ProcessingNode),简称PN注册在Zookeeper上。具体如何分配负载,是由各个节点协商决定的,而不是由一个中心化的Master统一分配

每一个处理节点,都是相同的,它由上下两部分组成。

上面,是实际的业务处理逻辑模块:

业务处理模块里,只会确定对应的消息发送,应该发送给哪一个逻辑上的PE,实际具体发送到哪一台物理节点,则是由下面的通信层模块来决定的。这个模块主要解决这样几个问题:

图片

你可以看到,这个其实和我们看MapReduce的框架是类似的,开发人员的关注点,只需要在PE这个纯粹的业务逻辑层面。至于计算在哪一台服务器上发生,各个节点之间是怎么通信的,开发人员完全不需要关心。

稍显“过时”的伸缩和容错能力

不过,看到这里,相信你也会有一些疑问。以单词计数为例,看起来一个S4在线上的有向无环图就需要有海量的对象,这个数量级可能是数万乃至数十万。而不像之前我们看过的MapReduce那样,只需要有少数Map和Reduce就好了。

没错,S4的设计其实还有粗糙,也还有着很多的问题。

首先就是这里的海量对象的问题。由于每一个处理数据的Key都要是一个对象,系统里就会有海量的对象。而一个Key如果只出现一次,之后再也不出现了,也要占用内存。S4对此的解决办法,是给Key设定TTL,定期清理掉不需要的Key。

其次,是S4里,没有时间窗口的概念。在我们进行实时数据处理的时候,我们需要统计的,常常是“过去一分钟的热搜”,或者“过去一小时的热搜”,这样有一个时间范围的数据。

但是在S4的设计里,我们并没有地方可以设定这个时间窗口。所以类似的需求,需要我们自己在PE的代码里面去维护或者实现,一下子大大增加了开发的难度和复杂度。

第三,是S4的容错处理非常简单。S4能够做到的容错,其实就是某一个计算节点挂掉了,我们重新再起一个计算节点承担它的工作。但是,原先节点里,所有PE维护的状态信息就都丢失了。我们既不知道目前的统计信息是什么,也不知道目前处理到哪些事件了。

Yahoo给出的答案是退回到离线批处理计算的数据上,但是这个显然就不满足流式处理一开始的需求了。只能算是个聊胜于无的方案。

最后一个问题,则是S4虽然是一个分布式系统,但是并不支持真正的动态扩容。在一开始论文的假设部分,就假设了运行中的集群不会增加或者减少节点。

这样带来的问题,就是当负载快速上升的时候,S4的策略是随机丢弃一些数据,本质上是对数据进行了采样,而不是能够通过简单增加硬件来解决问题。

不过不管怎么说,S4还是让大数据的的流式处理迈出了第一步。而这些S4并没有回答好的问题,也会为接下来的流式数据处理系统的兴旺拉开了帷幕。

小结

好了,对于S4的论文,我们到这里也就解读完了。我们看到,随着大数据的价值深入人心,MapReduce这样定期定时进行数据处理的方式,逐渐难以满足业务需求。于是,大数据的流式计算登上了历史舞台。

Yahoo通过S4系统,进行低延时的“实时”数据处理。整个系统的设计理念类似于MapReduce,开发人员只需要实现Processing Element这样的业务处理逻辑,而不需要关心“分布式”是怎么运行的。整个框架会完成数据的分发、计算节点的调度,以及容错之后的恢复。

通过S4,Yahoo能够及时地获取广告和搜索数据的反馈,以及进行在线的A/B测试。整个S4内部的设计,也将业务逻辑层和网络协议、数据路由、负载均衡等拆分开来,做成了一个可插拔(Pluggable)的系统架构。

在整个的流式数据处理框架里,S4采用了一个典型的Actor模式。整个数据处理的流程,可以被画成一个有向无环图,图里的每一个点都是一个处理元素,每一条边都是一条消息传递的路径,而每一个处理元素都会被托管在某一个处理节点里。

处理元素负责实现业务逻辑,并且可以保存计算结果在内存。同时,S4支持你定时地将对应的结果发布到外部的存储系统里,使得计算结果对外可用。

但是,S4的设计显然也是很粗糙的。S4采用了一个完全对称、没有中心节点的分布式架构,虽然看起来这个解决了“单点故障”问题,但是也因此放弃了动态扩容,而只能在大量流量进入的时候,选择服务降级的解决方案。

而在业务层面,S4的容错,也只是考虑“计算节点”层面的容错。容错只是将挂掉的节点能够在其他的硬件上重新运行起来,但是已经处理的历史数据都已经丢失了。而对于节点之间的数据传输,S4也没有作出全链路的传输保障

这些问题,也是后面的Storm、Kafka、Flink这些系统出现的出发点。而S4自己,却在2014年就从Apache孵化的项目中“退役(retried)”了。

推荐阅读

S4的论文是流式计算的起点,所有S4设计上的各种缺陷,都成为后来的系统的改进点。而它的论文本身也非常简短,读一读论文原文非常有助于你认识到流式计算最原始和粗糙的想法,是从哪里开始的。

思考题

除了我在这节课里所说的问题之外,每分钟定时运行MapReduce来进行实时数据处理,还可能会遇到哪些问题呢?我们是否可以通过优化改造MapReduce,来解决这些问题呢?请你想一想,在留言区中分享你的思考和答案。

评论