MillWheel:在网络规模上可容错的流式处理(第三节)

MillWheel:在网络规模上可容错的流式处理(第三节)

4. 核心概念

MillWheel 除了流式系统的基本要素,还提供了简洁的抽象概念。数据通过用户定义的 computations 有向图流过我们的系统(如图3),每个 computation 都可以独立地处理并且下发数据。

enter image description here

图3:在一个 MillWheel 拓扑中定义的一个单独节点。输入流和输出流对应拓扑图中的有向边。

4.1 Computations

应用逻辑存在于 computations 之中,可以封装任意的用户代码。Computation 的代码是针对收到的输入数据调用的,此时用户定义的行为会被触发,包括联络外部系统,操作其他的 MillWheel 原语,或者输出数据。如果外部系统被联络到,就要依赖用户确保他们的代码对于这些系统的影响是幂等的。Computation 的代码是用来在单个 key 的上下文环境中进行操作的,并且它对于分布在不同机器上的 keys 是不可知的。正如在图4中列举的,按单个 key 来处理是顺序的,但是不同 keys 之间的处理可以是并行化的。

enter image description here

图4:按单个 key 的处理是顺序的,这样对于某个 key 一次只能处理一个 record。多个 keys 之间可以并行执行。

4.2 Keys

在 MillWheel 中 keys 是对于聚合以及比较不同 records 的基本抽象概念。对于系统中的每个 record,消费者会指定一个提取 key 的函数,用来为 record 指定一个 key。Computation 的代码运行在某个指定 key 的环境中,并且只被授予访问那个指定 key 的状态的权限。例如,在 Zeitgeist 系统中,为 query records 指定 key 的一个好选择是 query 文本自身,这是因为我们需要按 key 聚合次数以及计算模型。另外,一个作弊探测器可能会选择 cookie 指纹作为 key,目的是阻止恶意的行为。图5展示了不同的消费者从相同的输入流中提取不同的 keys。

enter image description here

图5:多个 computations 可以从相同的流提取不同的 keys。提取 key 的方法是由一条流的消费者指定的。

4.3 流

在 MillWheel 中 流是不同 computations 之间的传递机制。一个 computation 订阅0个或者多个的输入流并且发布一个或者多个输出流,系统可以保证沿着这些通道进行传递。提取 key 的函数是由每个消费者以各条流为基础来指定的,所以多个消费者可以订阅相同的流并且按不同的方式来聚合它的数据。流是通过它们的名字来唯一识别的,没有任何其他的资格证明 – 任何 computation 都可以订阅任何流,并且可以给任何流生成 records(productions)。

4.4 持久化存储

在 MillWheel 中的持久化状态以其最基本的形式,是一个按 key 管理的 byte 字符串。用户提供序列化和反序列的程序(例如将一个丰富的数据结构转换成有线格式),现在有一系列方便的机制可用(例如 Protocol Buffers [13])。持久化状态通过一个有备份的,高可用的数据存储来支撑(例如 Bigtable [7] 或者 Spanner [9]),这使得数据以一种完全对终端用户透明的方式确保完整性。对状态的常见使用包括对 records 在时间窗口下的计数器聚合以及为 join 缓存数据。

4.5 Low Watermarks

对于一个 computation 的 low watermark 提供了将来到达这个 computation 的 records 的时间戳的一个范围。

定义:我们基于一个流水线的数据流,提供了对 low watermarks 的一种递归定义。给定一个 computation A,令在 A 中最老的未完成(处理中的,已存储的,或者是即将传递的)的 record 对应的时间戳作为”oldest work of A”。然后我们将 low watermark of A 定义为:

min(oldest work of A, low watermark of C : C outputs to A)

如果没有输入流,则 low watermark 和 oldest work 的值是相同的。

Low watermark 的源头是由 injectors 生成的,injectors 就是给 MillWheel 发送数据的外部系统。在外部系统对于即将发生的 work 的测量通常是一个估计值,所以在实践中,computations 应该能预料到来自这种系统的很小比例的晚到 records – 晚于 low watermark 的 records。Zeitgeist 会丢弃这种晚到数据,并且会记录丢弃了多少数据(按照经验大概有0.001%的 records)。其他流水线当晚到 records 到达时会回溯修正它们的聚合结果。尽管没有反映在上述的定义中,系统会保证一个 computation 的 low watermark 是单调递增的即使存在晚到数据的情况。

通过等待一个 computation 的 low watermark 前进经过某个特定的值,用户可以确定他们具有直到那个时间点的完整的数据视图,正如前面列举的 Zeitgeist 的低谷探测。当给新的或者聚合的 records 设置时间戳时,这依赖于用户选择一个不比任何来源 records 小的时间戳。由 MillWheel 架构汇报的 low watermark 测量了系统中已知的 work,如图6所示。

enter image description here

图6:low watermark 随着 records 流过系统而向前推进。在每一个快照上,即将处理的 records 显示在时间轴的上面,而已经处理完毕的 records 显示在下面。新的 records 作为即将处理的 work 出现在连续的快照上,这些 records 带有的时间戳比 watermark 更大。数据没必要按顺序处理,low watermark 反映了系统中所有即将处理的 work。

4.6 Timers

Timers 是在某个特定的系统时间或者 low watermark 值触发的按 key 可编程的 hooks。Timers 是在一个 computation 的环境中被创建和运行的,可以相应地运行任何代码。决定使用系统时间还是 low watermark 值依赖于应用 – 一个启发式的监控系统想要推送每个小时的邮件(准点的,不管数据是否延迟了)可能会使用系统时间的 timers,而一个执行按窗口聚合的分析系统就会使用 low watermark timers。一旦设置了,timers 就会确保按递增的时间戳顺序启动。它们会被记录在持久化状态中,可以容忍进程重启和机器故障。当一个 timer 启动之后,它会运行特定的用户函数,并且具有和输入 records 相同的 exactly-once 保证。在 Zeitgeist 中对探测波谷的一种简单实现就是为一个给定的时间桶的末端设置一个 low watermark timer,如果观测到的流量下降到低于模型预测的值,则报告一个波谷。

对 timers 的使用是可选的 – 对于基于时间的同步语义没有需求的应用可以跳过它们。例如,Zeitgeist 不用 timers 就可以探测波峰 queries,这是因为即使没有数据的完整视图,一个波峰可能也是很明显的。如果观测到的流量已经超过了模型的预测值,那些延迟的数据只会加入到总量并增加波峰的大小。

发表评论

电子邮件地址不会被公开。 必填项已用*标注