你好,我是徐文浩。

在过去的几讲里,我们看到了大数据的流式处理系统是如何一步一步进化的。从最早出现的S4,到能够做到“至少一次”处理的Storm,最后是能够做到“正好一次”数据处理的MillWheel。你应该能发现,这些流式处理框架,每一个都很相似,它们都采用了有向无环图一样的设计。但是在实现和具体接口上又很不一样,每一个框架都定义了一个属于自己的逻辑。

S4是无中心的架构,一切都是PE;Storm是中心化的架构,定义了发送数据的Spout和处理数据的Bolt;而MillWheel则更加复杂,不仅有Computation、Stream、Key这些有向无环图里的逻辑概念,还引入了Timer、State这些为了持久化状态和处理时钟差异的概念。

和我们在大数据的批处理看到的不同,S4、Storm以及MillWheel其实是某一个数据处理系统,而不是MapReduce这样高度抽象的编程模型。每一个流式数据处理系统各自有各自对于问题的抽象和理解,很多概念不是从模型角度的“该怎么样”抽象出来,而是从实际框架里具体实现的“是怎么样”的角度,抽象出来的

不过,我们也看到了这些系统有很多相似之处,它们都采用了有向无环图模型,也都把同一个Key的数据在逻辑上作为一个单元进行抽象。随着工业界对于流式数据处理系统的不断研发和运用,到了2015年,仍然是Google,发表了今天我们要解读的这一篇《The Dataflow Model》的论文。

那么,在学完这一讲之后,我希望你能够对过去几讲的论文进一步融会贯通,能够做到:

Dataflow的基础模型

Dataflow的核心计算模型非常简单,它只有两个概念,一个叫做ParDo,顾名思义,也就是并行处理的意思。另一个叫做GroupByKey,也就是按照Key进行分组数据处理的问题。

ParDo,地位相当于MapReduce里的Map阶段。所有的输入数据,都会被一个DoFn,也就是处理函数处理。但是这些数据,不是在一台服务器上处理的,而是和MapReduce一样,会在很多台机器上被并行处理。只不过MapReduce里的数据处理,只有一个Map阶段和一个Reduce阶段。而在Dataflow里,Pardo会和下面的GroupByKey组合起来,可以有很多层,就好像是很多个MapReduce串在一起一样。

而GroupByKey,地位则是MapReduce里的Shuffle操作。在Dataflow里,所有的数据都被抽象成了key-value对。前面的ParDo的输入和Map函数一样,是一个key-value对,输出也是一系列的key-value对。而GroupByKey,则是把相同的Key汇总到一起,然后再通过一个ParDo下的DoFn进行处理。

比如,我们有一个不断输入的日志流,想要统计所有广告展示次数超过100万次的广告。那么,我们可以先通过一个Pardo解析日志,然后输出(广告ID,1)这样的key-value对,通过GroupByKey,把相同的广告ID的数据分组到一起。然后再通过一个ParDo,并行统计每一个广告ID下的展示次数。最后再通过一个ParDo,过滤掉所有展示次数少于100万次的广告就好了。

图片

理解流批一体

那么这样看起来,Dataflow不就是个MapReduce吗?它无非是可以把多个MapReduce的过程串接在一起就是了。当然,答案并没有那么简单,因为在Dataflow里,我们还有一个很重要的维度没有加入进来,这个维度就是时间

Dataflow里的GroupByKey,会把相同Key的数据Shuffle到一起供后续处理,但是它并没有定义在什么时间,这些数据会被Shuffle到一起。

在MapReduce的计算模型下,会有哪些输入数据,是在MapReduce的任务开始之前就确定的。这意味着数据从Map端被Shuffle到Reduce端,只依赖于我们的CPU、网络这些硬件处理能力。而在Dataflow里,输入的数据集是无边界的,随着时间的推移,不断会有新的输入数据加入进来。

如果从这个角度来思考,那么我们之前把大数据处理分成批处理和流式处理,其实并没有找到两种数据处理的核心差异。因为,对于一份预先确定、边界明确的数据,我们一样可以使用流式处理。比如,我们可以把一份固定大小日志,放到Kakfa里,重放一遍给一个Storm的Topology来处理,那也是流式处理,但这是处理的有边界的数据。

而对于不断增长的实时数据,我们一样可以不断定时执行MapReduce这样的批处理任务,或者通过Spark Streaming这样看起来是流式处理,其实是微批(Mini-Batch)的处理方式。

事实上,即使是所谓的“流式”数据处理系统,往往也会为了性能考虑,通过微批的方式来提升性能。一个典型的例子,就是上一讲我们看过的MillWheel里的Checkpoint,就会在等待多条记录处理完之后批量进行。

一旦从这个视角来观察,那么批和流本身是一回事儿。当我们把“批(Batch)”的记录数限制到了每批一条,那么它就是所谓的流了。进一步地,MapReduce的“有边界(Bounded)”的数据集,也只是Dataflow的“无边界(Unbounded)”的数据集的一种特殊情况。所以,Jay Kreps才会在2014年提出流批一体的Kappa架构,而到了2015年的Dataflow,我们就看到了批处理本来就是流处理的一种特殊情况。

时间窗口的分配与合并

在MillWheel的论文里,我们已经看到了一个非常完善的流式数据处理系统了。不过,在这个流式处理系统里,对于“时间”的处理还非常粗糙。MillWheel的确已经开始区分事件的处理时间(Processing Time)和事件的发生时间(Event Time)了,也引入了时间窗口的概念。但是,对于计算结果何时输出,它仍然采用的是一个简单的定时器(Timer)的方案。而到了Dataflow论文里,对这些概念的梳理和抽象就变成了重中之重。

我们先来看一看时间窗口的概念,在流式数据处理里,我们需要的往往不是“统计所有的广告展示数量”,而往往是“每5分钟统计一次广告展示数量”,或者“统计过去5分钟的广告展示数量”。我们常用的时间窗口,也会分成好几种:

图片

既然引入了时间窗口这个概念,相信你很容易理解,我们在Dataflow模型里,需要的不只是GroupByKey,实际在统计数据的时候,往往需要的是GroupByKeyAndWindow。统计一个不考虑任何时间窗口的数据,往往是没有意义的,1分钟内广告展示了100万次,和1个月内展示了100万次代表着完全不同的广告投放力度。我们需要根据特定的时间窗口,来进行数据统计。

而在实际的逻辑实现层面,Dataflow最重要的两个函数,也就是AssignWindows函数和MergeWindows函数。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而AssignWindows要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。

需要注意,一个事件不只可以分配给一个时间窗口,而是可以分配给多个时间窗口。比如,我们有一个广告在12:01展示给了用户,但是我们统计的是“过去2分钟的广告展示”,那么这个事件,就会被分配给[12:00, 12:02)和[12:01, 12:03)两个时间窗口,我们原先一条的事件就可以变成多条记录。

而在有了Window的信息之后,如果我们想要按照固定窗口或者滑动窗口统计数据,我们可以很容易地根据Key+Window进行聚合,完成相应的计算。

图片

但是,有些窗口函数的计算并不容易,比如我们前面讲过的第三种会话窗口,每个事件的发生时间都是不一样的。那么这个时间窗口就很难定义。

而Dataflow里的做法,是通过AssignWindows+MergeWindows的组合,来进行相应的数据统计。我们还是以前面说的,客服30分钟没有互动就算作超时的例子来看看。

比如同一个用户下,有三个事件,发生的时间分别是13:02、13:14、13:57。那么分配窗口的时候,三个窗口会是$[13:02,13:32)$,$[13:14,13:44)$以及$[13:57,14:27)$。前两个时间窗口是有重叠部分的,但是第三个时间窗口并没有重叠,对应的窗口会合并成$[13:02,13:44)$以及$[13:57,14:27)$这样两个时间窗口。

窗口的分配和合并功能,就使得Dataflow可以处理乱序数据。相同的数据以不同的顺序到达我们的计算节点,计算的结果仍然是相同的。并且在这个过程里,我们可以把上一次计算完的结果作为状态持久化下来,然后每一个新进入的事件,都按照AssignWindows和MergeWindows的方式不断对数据进行化简。

图片

你可以来看下论文里的图5,这个图有助于你去理解Dataflow是如何通过它的一些基础操作,来完成对应的数据化简和统计的。

触发器和增量数据处理

这样一来,有了对应的窗口函数逻辑,如果我们的输入数据是确定的,能够一次性都给出来,我们就很容易统计会话数这样的数据了,即使数据是乱序的也没有关系。但是,在实际情况里,我们的输入数据是以流的形式传输到每个计算节点的。并且,我们会遇到延时、容错等情况,所以我们还需要有一个机制告诉我们,在什么时候数据都已经到了,我们可以把计算结果向下游输出了。

在MillWheel的论文里,我们是通过计算一个低水位(Low Watermark)来解决这个问题的。我们会根据获取到的低水位信息,判断是否该处理的事件都已经处理完了,可以把计算结果向下游发送。

但是,这个基于水位的方法在实践中,必然会遇到这样两个问题:

那么,Dataflow里,是怎么解决这个问题的呢?答案是Lamdba架构。

这里的Lambda架构,并不是需要去搭建一个数据的批处理层,而是利用Nathan Marz的Lambda架构的核心思想,就是我们可以尽快给出一个计算结果,但是在后续根据获得的新的数据,不断去修正这个计算结果。而这个思路,在Dataflow里,就体现为触发器(Trigger)机制。

在MillWheel里,我们向下游输出数据,只能通过定时器(Timer)来触发,本质上也就是通过“时间”这一个维度而已。这个定时器,在Millwheel里其实就被改造成了完成度触发器,我们可以根据当前的水位和时间,来判断日志处理的进度进而决定是否触发向下游输出的动作。而在Dataflow里,除了内置的基于水位信息的完成度触发器,它还能够支持基于处理时间、记录数等多个参数组合触发。而且用户可以实现自定义触发器,完全根据自己的需要来实现触发器逻辑。

PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
  .triggering(AfterProcessingTime.pastFirstElementInPane()
  .plusDelayOf(Duration.standardMinutes(1)))
  .discardingFiredPanes());

来自Apache Beam的文档教程

我们可以看一下Apache Beam项目里的一段示例代码。可以看到,在这段代码里,先是设立了一个1分钟的固定窗口。然后在触发器层面,则是设置了在对应的窗口的第一条数据被处理之后,延迟一分钟触发。在Apache Beam的文档里,你还能看到更多不同的触发器策略,你也可以根据自己的需要,来撰写专属于你自己的触发器代码。

而除了确定对应的数据计算什么时候触发,你还可以定义触发之后的输出策略是什么样的。

首先是抛弃(Discarding)策略,也就是触发之后,对应窗口内的数据就被抛弃掉了。这意味着后续如果有窗口内的数据到达,也没法和上一次触发时候的结果进行合并计算。但这样做的好处是,每个计算节点的存储空间占用不会太大。一旦触发向下游输出计算结果了,现有的数据我们也就不需要了。比如,一个监控系统,根据本地时间去统计错误日志的数量并告警,使用这种策略就会比较合适。

然后是累积(Accumulating)策略,也就是触发之后,对应窗口内的数据,仍然会持久化作为状态保存下来。当有新的日志过来,我们仍然会计算新的计算结果,并且我们可以再次触发,向下游发送新的计算结果,而下游也会用新的计算结果来覆盖掉老的计算结果。

这个是一个典型的Lambda架构的思路。我们一般的统计数据,都可以采用这个策略。一方面,我们会尽快根据水位信息,把计算结果发送给下游,使得计算结果的延时尽可能得小。另一方面,在有新的数据过来的时候,我们也会重新修正计算结果。

最后是累积并撤回(Accumulating & Retracting)策略,也就是我们除了“修正”计算结果之外,可能还要“撤回”计算结果。还是以前面的客服会话为例:

图片

当然,这只是我们最理想的状况,抛弃和累积这两种策略并不难实现,但是累积并撤回并不容易实现,即使在2021年的今天,Apache Beam也还没有支持撤回(Retraction)功能。不过,即使你从没有使用过对应的功能,你也需要理解为什么我们需要这样的功能。因为没有这个功能的话,我们的计算结果的正确性,在有些情况下是保障不了的。从这个角度来看,Lambda架构仍未彻底过时。

小结

好了,到这里,我们也算是为整个课程里,大数据的流式处理画上一个句号了。随着时代洪流滚滚向前,Google也针对自己发表的Dataflow这个编程模型,孵化出了Apache Beam这个项目。而在这个时间节点之后,像Apache Flink这样的开源流式处理项目,也都向Dataflow的编程模型靠拢,并实现了Apache Beam的接口。

在Dataflow的论文里,Google把整个大数据的流式处理,抽象成了三个概念。第一个,是对于乱序数据,能够按照事件发生时间计算时间窗口的模型。第二个,是根据数据处理的多维度特征,来决定计算结果什么时候输出的触发器模型。第三个,则是能够把数据的更新和撤回,与前面的窗口模型和触发器模型集成的增量处理策略。

Dataflow不是一篇介绍具体系统实现的论文,而是一篇更加高屋建瓴,从模型角度思考无边界的大数据处理应该如何抽象的论文。

像MapReduce一样,Dataflow是一个抽象的计算模型而不是一个具体的系统实现。用MapReduce的时候,你并不需要Google的C++的原版实现,而完全可以用Java写的Hadoop。而在Dataflow这里,Google更进一步,不仅给出了整个的计算模型,后续还推动了Apache Beam这个项目,希望能让流式数据处理的接口统一。无论你的底层实现是什么,只要能够按照Dataflow里的语义实现对应的接口,那么就算是别人来撰写代码,也都可以实现相同的计算结果。

推荐阅读

想要对大数据的流式处理有深入的了解,我们必须要读的一本书就是《Streaming Systems》。这本书的作者泰勒·阿克道(Tyler Akidau)也是这篇Dataflow论文的作者。我在之前的几讲里,已经推荐过这本书了,目前国内也已经出版了影印本。如果你想深入大数据领域的研发,特别是流式数据处理这个领域,这本书你一定要买回来好好研读一下。

思考题

我们说,Dataflow已经是一个流批一体的计算模型了,有边界的数据也只是无边界的流式数据的一种特殊情况。对于有边界的固定数据,我们当然可以通过重放日志把数据给到Dataflow系统。那么在窗口和触发器层面,我们应该用什么窗口和触发器,来得到我们想要的计算结果呢?

欢迎在留言区分享你的答案和思考,也欢迎你把今天的内容分享给更多的朋友。

评论