MillWheel:在网络规模上可容错的流式处理(第二节)

MillWheel:在网络规模上可容错的流式处理(第二节)

2. 动机和需求

Google 的Zeitgeist流水线被用来在检索 query 中追踪趋势。为了证明 MillWheel 的特征集的功能,我们将会查看Zeitgeist系统的需求。这个流水线获取一个连续的检索 query 的输入,并且执行异常探测, 尽快输出波峰或者波谷的 query。这个系统构建了每个 query 的一个历史模型,所以流量中预期内的改变不会引起误报(例如傍晚的 query:”television listings”)。尽快识别出波峰或者波谷的 query 是非常重要的。例如,Zeitgeist帮助驱动了 Google 的 Hot Trends 服务,这个是依赖于新鲜资讯的。这个流水线的基本拓扑如图1所示。

enter image description here

图1: 输入数据(检索 query)经过一系列 MillWheel 的computations,表现为分布式的进程。系统的输出被一个外部的异常通知系统所消费。

为了实现这个Zeitgeist系统,我们的方法是将 records 分桶到二分之一区间,并且比较每次分桶的实际流量与模型预测的预期内流量。如果这些数量相对于大部分分桶都是不同的,我们就有很高的置信度认为一个 query 是波峰或者波谷的。并行地,我们会用最新收到的数据更新这个模型并且存储起来将来使用。

持久化存储:很重要的一点是这个实现既需要短期存储也需要长期存储。一个波峰可能只会持续几秒钟,因此就要依赖于一个小时间窗口的状态,反之模型数据可能对应数月的持续更新。

LowWatermarks:一些Zeitgeist用户对探测流量中的波谷感兴趣,这种情况下一个 query 的数量异常的低(例如如果埃及政府关闭了互联网)。 在一个从全世界接收输入的分布式系统中,数据到达时间无法严格与它的生成时间(这个 case 里就是检索时间)相对应,所以能够分辨出来一批在 t = 1296167641 的阿拉伯语的 query 只是延迟在线路上了,或者还是实际上就没有,是很重要的。MillWheel 通过为每个处理阶段(例如 Window Counter, Model Calculator)到来的数据提供一个 low watermark,low watermark 表示所有这个给定时间戳之前的数据都已经收到了。 low watermark 追踪在分布式系统中的所有行将发生的事件。使用 low watermark,我们可以辨别这两个例子 – 如果 low watermark前进超过了时间 t 而没有 query 到达,那么我们可以有很高的置信度认为这些 query 没有被记录,并不是被延迟了。这种语义也为输入避免了严格有序的需求 – 乱序流才是常态。

防止重复:对于Zeitgeist,重复 record 传递可能会造成假的波峰。将来,exactly-once 处理对于 MillWheel 的许多处理收入的客户是一个需求,这些客户可以依赖这个架构实现的正确性,而不是改造他们自己的去重机制。用户不需要写代码来手动回滚状态更新或者处理一系列失败场景来保证正确性。

鉴于上述情况,我们为 Google 的流式处理架构提出了一些需求,这些都反映到了 MillWheel:

  • 数据一旦产出就应该对消费者可用(也就是说,没有系统固有的同步机制来获取输入以及提供输出数据)。
  • 持久化状态抽象应该对用户代码可用,并且应该被整合到系统的整个一致性模型中。
  • 乱序数据应该被系统优雅地处理。
  • 一个单调递增的数据的时间戳 low watermark 应该由系统计算得到。
  • 随着系统扩展到更多机器,延迟应该保持恒定。
  • 系统应该提供对传递 records 的 exactly-once语义。

3. 系统概述

总的来说,MillWheel 是一个用户定义的转换图,输入数据按这个转换图处理并生成输出数据。我们把这些转换叫做computations,下面会更广泛的定义它们。这些转换中的每一个都可以并行化地运行在任意数目的机器上,所以用户不需要自己关心细粒度的负载均衡。在 Zeitgeist 的 case 中,如图1所示,我们的输入是连续到达的检索 query 集合,输出是波峰或者波谷的 query 集合。

抽象地,在 MillWheel 中的输入和输出都表示为三元组(key, value, timestamp)。key 是一个在系统中带有语义含义的元数据字段,value 可以是一个任意的 byte 字符串,对应于整个 record。用户代码运行的上下文环境被限定到一个特定的 key 上,每个 computation 可以根据它的逻辑需要为每个输入源定义 key 的生成方式。例如,在 Zeitgeist 的特定 computations 可能会选择检索词(例如“cat videos”)作为 key,目的是为每个 query 计算统计信息,而其他 computations 可能会选择地理来源作为 key,目的是根据位置进行聚合。三元组中的 timestamp 可以由 MillWheel 用户指定为任意值(不过它们一般都与事件发生的绝对时间相接近),MillWheel 将会通过这些值计算 low watermarks。如果一个用户聚合了每秒内检索词的次数(正如在 Zeitgeist 中,如图2所示),然后就会给 timestamp 指定一个与执行检索时间相对应的值。

enter image description here

图2:把 web 检索聚合到一秒的桶内并且使用持久化的每个 key 的状态更新模型。每个 computation 都访问了它自己每个 key 的状态,并根据输入 records 对其进行更新。

用户 computations 组成的流水线会形成一个数据流图,一个 computation 的输出作为另一个 computation 的输入。用户可以从一个拓扑中动态地增加或者删除 computations,而不需要重启整个系统。对于处理数据并输出 records,一个 computation 可以任意地合并,修改,创建以及丢弃 records。

MillWheel 使得该架构的 API 对于 record 处理都是幂等的。只要应用使用这个系统提供的状态和通信抽象,失败和重试就会隐藏于用户代码之中。这使得用户代码简单易懂,并且允许用户专注于他们的应用逻辑。在一个 computation 的上下文环境中,用户代码可以访问每个 key,每个 computation 的持久化存储,这样就可以支持强大的按 key 聚合,正如 Zeitgeist 例子中所列举的那样。MillWheel 提供了如下的基础的保证:

传递保证:MillWheel 架构通过 record 处理造成的的所有内部更新都会按 key 原子地进行 checkpoint 操作,并且 records 都会精确地被传递一次。这个保证不能扩展至外部系统。

现在具备了对这个系统的整体概念,我们将会在下一节详细阐述组成 MillWheel 的各个概念。

上一篇:MillWheel:在网络规模上可容错的流式处理(第一节)

下一篇:MillWheel:在网络规模上可容错的流式处理(第三节)

发表评论

电子邮件地址不会被公开。 必填项已用*标注