Streaming Systems (三)

2020-06-24 00:00:00 数据 时间 事件 延迟 水印

到目前为止,我们一直在从pipeline开发者的角度研究流处理,第二章介绍了watermarks,回答了计算什么时间范围内的数据以及什么时候将处理结果物化等基本问题。在本章中,我们将从流处理系统的底层机制的角度来看同样的问题。研究这些机制将帮助我们理解和应用有关watermarks的概念。我们将讨论如何在数据进入点创建水印,如何通过数据处理管道传播,以及如何影响输出结果时间戳。我们还演示了如何在处理无界数据时保留必要的保证,以回答event time数据在何处处理以及何时物化的问题。

定义

对于任何连续接收数据并输出结果的pipeline。我们希望解决的一般问题是,什么时候关闭event-time窗口是安全的,这意味着该窗口不再需要任何数据。

解决事件时间窗口问题的一种简单方法是基于当前处理时间作为事件时间窗口。正如我们在第1章中看到的,我们很快就遇到了问题—数据处理和传输不是即时的,因此处理和事件时间几乎从不相等。在pipeline中出现的任何小问题或尖刺都可能导致将消息分配到错误的窗口。终,这个策略失败了,因为我们没有可靠的方法来保证这样的窗口。

另一种直观但终不正确的方法是考虑pipeline处理消息的速率。尽管这是一个有趣的度量标准,但是速率可能随输入的变化、预期结果的可变性、可用于处理的资源等等而任意变化。更重要的是,速率并不能帮助解决完整性的基本问题。具体来说,速率不会告诉我们何时处理了特定时间间隔内的所有消息。在实际的系统中,会出现消息无法在系统中处理的情况。这可能是暂时错误(如崩溃、网络故障、机器停机)的结果,也可能是持久化错误(如应用程序级故障,需要更改应用程序逻辑或其他手动干预来解决)的结果。当然,如果发生了大量的故障,那么处理速率可能是检测这些故障的一个很好的方式。然而,速率不能告诉我们一条消息没有通过我们的管道被处理。然而,即使是一条这样的消息,也会影响输出结果的正确性。

我们需要一种更有力的进步衡量标准。为了实现这个目标,我们对我们的流数据做了一个基本的假设:每个消息都有一个关联的逻辑事件时间戳。这种假设在连续到达无界数据的情况下是合理的,因为这意味着连续生成输入数据。在大多数情况下,我们可以将原始事件发生的时间作为其逻辑事件时间戳。然后我们可以检查这些时间戳在任意pipeline中的分布。这样的pipeline可以分布在多个agent上并行处理,消费读取的消息,但不能保证在各个分片之间的顺序。因此,该管道中活动的待处理消息(in-flight)的事件时间戳集将形成一个分布,如图3-1所示。

消息由pipeline接收、处理并终标记为完成(completed)。每个消息都是“in-flight”(即已经收到但尚未完成),或者是“completed”(即不再需要对该消息进行处理)。如果我们按照事件时间检查消息的分布,它将类似于图3-1。随着时间的推移,更多的信息将被添加到右边的“in-flight”分布中,更多来自分布“in-flight”部分的消息将被完成并转移到“completed”分布中。

在这个分布上有一个关键点,它位于“in-flight”分布的左边,对应于pipeline中未处理消息的oldest event timestamp。我们使用这个值来定义水印:

水印(Watermark)是一个表示早的尚未完成的工作单调递增的时间戳。

这个定义提出了两个基本属性:

完整性Completeness

如果水印已经提前超过了某个时间戳T,它的单调性保证我们在T点或T点之前不会对准时(非延迟的数据)事件进行更多的处理.因此,我们可以正确地在T点或T点之前发出任何聚合。换句话说,水印让我们知道什么时候关闭窗口是正确的。

可见性Visibility

如果一条消息由于任何原因被卡在我们的pipeline中,水印就不能前进。我们需要找到问题的根源,通过检查阻止水印前进的消息。

数据源 Watermark 创建

这些水印如何产生?要为数据源建立水印,必须为从该数据源进入pipeline的每个消息分配一个逻辑事件时间戳。正如第2章告诉我们的,所有水印的创建分为两大类:perfect or heuristic(完美或启发式)。为了了解关于完美和启发式水印之间的区别,让我们看看图3-2。在第二章中给出了窗口求和的例子。

如图所示,完美水印的特点是保证了水印对所有数据的考虑,而启发式水印会出现一些落后数据元素。

在水印被创建为完美或启发式之后,watermark在pipeline的下游一直存在。至于应该创建完美式或启发式,这取决于很大程度的本质是消费的数据源。要了解原因,让我们看看每种类型的水印创建的几个例子。

创建完美式Watermark

完美水印创建为传入消息分配时间戳,在这种情况下,生成的watermark是一个严格的保证,从这个Source中不会看到任何数据的事件时间小于watermark。使用完美水印创建的pipeline从不需要处理延迟数据。也就是说,在水印之后到达的数据已经超过了新到达消息的事件时间。然而,完美的watermark创建需要对输入有全面的了解,因此对于许多现实的分布式输入源是不切实际的。这里有几个可以创建完美水印的用例:


入口时间戳(Ingress timestamping)

将进入系统的时间指定为数据的event time的数据源可以创建一个完美的水印。在这种情况下,源水印只是简单地跟踪pipeline所观察到的当前处理时间。在2016年之前,几乎所有支持窗口的流式系统都使用了这种方法。

由于事件时间是由单一的、单调递增的数据源(实际处理时间)分配的,因此系统对于数据流中的下一个时间戳具有很好的了解。因此,事件时间进度和窗口语义变得非常容易推理。当然,缺点是水印与数据本身的事件时间没有相关性。这些事件时间被丢弃,而水印只是跟踪相对于其到达系统的数据的进度。

按时间排序的静态日志集(Static sets of time-ordered logs)

一个静态的按时间排序的输入源,在上面创建一个完美的水印是相对简单的(例如,带有一组静态partition的Apache Kafka topic,其中源的每个partition都包含单调递增的事件时间)。为此,源只需跨已知的和静态的源分区集跟踪未处理数据的小事件时间(每个partition中近一次读取记录的小事件时间)


与前面提到的入口时间戳类似,由于静态分区集合中的事件时间是单调增加的,所以系统完全知道下一个时间戳是哪个。这实际上是一种有界无序处理.已知的一组分区的无序程度受到这些分区之间观察到的小事件时间的限制。


从逻辑上讲,保证分区内时间戳单调递增的方法是将分区内的时间戳指定为写入数据的时间戳;例如,通过web前端将事件直接记录到Kafka中。虽然这仍然是一个有局限的用例,但它肯定比到达数据处理系统时的入口时间戳更有用。


创建启发式Watermark


启发式水印的创建,创建的水印仅仅是一个估计值,即事件时间小于watermark的数据将不会再被看到。使用启发式水印创建的pipeline可能需要处理一些落后的数据。落后数据是指当水印已经过了该数据的事件时间之后到达的任何数据。落后数据只能用启发式水印创建。如果启发式是合理的,落后的数据量可能是非常小的,水印仍然是高效的估计。如果要支持需要正确性的用例(例如,计费之类的事情),系统仍然需要为用户提供一种方法来处理延迟的数据。


对于许多实际的分布式输入源,构造一个完美的水印在计算或操作上都是不现实的,但利用输入数据源的结构特征,仍有可能构建出高精度的启发式水印。以下是两个例子,其中启发式水印(不同的特性)是可行的:


按时间排序的动态日志集(Dynamic sets of time-ordered logs)

考虑一组动态的结构化日志文件(每个文件包含的记录相对于同一文件中的其他记录具有单调递增的事件时间,但文件之间没有固定的事件时间关系),在运行时不知道完整的日志文件集(用Kafka的说法就是partition)。这种输入通常出现在由许多独立团队构建和管理的全球范围的服务中。在这样的用例中,在输入上创建一个完美的水印是很难处理的,但是创建一个的启发式水印是很有可能的。

通过跟踪现有日志文件集中未处理数据的小事件时间,数据增长率,并利用网络拓扑和可用带宽等外部信息,您可以创建一个非常准确的水印,即使缺乏所有输入的完美认知。这种类型的输入源是在谷歌中发现的常见的无界数据集类型之一。因此,我们在为这种场景创建和分析水印质量方面拥有丰富的经验,并看到它们在许多用例中得到了良好的效果。


Google Cloud Pub/Sub


云发布/订阅是一个有趣的用例,发布/订阅目前不保证按顺序交付;即使单个发布者按顺序发布两条消息,有可能(通常是很小的可能性)他们可能是无序的。这是由于底层架构的动态性(它允许透明地扩展到非常高的吞吐量级别,而无需用户干预).因此,Cloud Pub/Sub没有办法保证完美的水印。作为本章后面的一个案例研究,详细讨论了这种启发式方法的实现

考虑一个用户玩手机游戏的例子,他们的分数被发送到我们的pipeline进行处理:你通常可以认为,对于任何使用移动设备作为输入的源,通常不可能提供完美的水印。由于设备长时间离线等原因,对于这样的数据源,没有办法提供任何合理的完整性评估。然而,您可以想象构建一个可以准确跟踪当前在线设备的输入完整性的水印,类似于刚刚描述的Google Pub/Sub水印。无论如何,从提供低延迟结果的角度来看,活跃在线的用户可能是相关的用户子集。这通常并不像你初想象的那么严重。

一般来说,使用启发式水印创建时,对数据源的了解越多,启发式越好,看到的落后数据项越少。没有一个统一的解决方案,因为源的类型、事件的分布和使用模式会有很大的不同。但是在任何一种情况下(完美或启发式),在输入源处生成水印后,系统都可以通过管道完美地传播水印。这意味着水印将在下游一直存在,启发式水印将保持严格的启发式,因为他们是建立时决定的。这就是水印方法的好处:您可以将跟踪管道完整性的复杂性完全降低到在Source中创建水印的问题。

Watermark传播


到目前为止,我们只考虑了单个operator或stage上下文中输入的水印。然而,大多数实际的pipeline都包含多个stage。理解watermark如何跨越独立的stage传播对于理解它们如何影响整个pipeline及其结果延迟的观察非常重要。

PIPELINE STAGES

每次pipeline将数据按某个新维度分组时,通常都产生不同的stage。例如,如果您有一个使用原始数据的管道,计算每个用户的聚合,然后使用这些每个用户的聚合来计算每个团队的聚合,那么您可能会得到一个分为三个阶段的pipeline:

  • 一个使用未分组的原始数据
  • 一个是按用户分组数据,并按用户计算聚合
  • 一个是按团队对数据进行分组并计算每个团队的聚合


我们将在第6章了解更多关于分组对pipeline的影响。


如前一节所述,水印是在输入源处创建的。然后,随着数据的流入,它们流经系统。您可以跟踪不同粒度级别的水印。对于包含多个不同stage的管道,每个阶段都可能跟踪自己的水印,其值是之前所有输入和stage的function。因此,管道后面的stage将有更早以前的水印(因为他们看到的总体输入较少)

我们可以在管道中任何单个operator或stage的边界处定义水印。这不仅有助于理解pipeline中的每个stage的进度,而且有助于独立地、尽可能快地调度每个stage的结果。我们对各stage边界的水印给出如下定义:


  • 输入watermark:它捕获该stage上游数据处理的进度(该stage的输入有多完整)。对于源,输入水印是为输入数据创建水印的特定于源的函数。对于非源stage,输入水印被定义为其所有上游源和stage的所有分片/分区/实例的输出水印的小值。
  • 输出watermark:它捕获该stage本身的进度,本质上定义为该stage输入水印的小值和stage内所有非延迟数据的event time。“active”所包含的内容在某种程度上取决于给定stage实际执行的操作和流处理系统的实现。它通常包括为聚合而缓冲但尚未物化到下游的数据、传递到下游阶段的pending输出数据等等。


定义特定stage的输入和输出水印的一个优势是,我们可以使用它们来计算stage带来的事件时间延迟。从一个stage的输出水印的值减去它的输入水印的值,就得到了这个阶段引入的事件时间延迟。这种延迟是指每个stage的输出与实际时间之间的延迟程度。例如,执行10秒窗口聚合的stage将有10秒或更长时间的延迟,这意味着阶段的输出至少要比输入延迟那么久。输入和输出水印的定义提供了整个pipeline中水印的递归关系。pipeline中的每个后续阶段都根据需要延迟水印,这是基于该stage的事件时间延迟


每个stage的处理也不是单一的。我们可以将一个stage内的处理过程分割成具有几个概念组件的流,每一个都有输出水印。如前所述,这些组件的特性取决于stage执行的操作和系统的实现。从概念上讲,每个这样的组件充当一个缓冲区,active消息可以驻留在其中,直到某个操作完成。例如,当数据到达时,它被缓冲以进行处理。处理可能会将数据写入state,以便以后进行延迟聚合。当触发延迟聚合时,可能会将结果写入输出缓冲区,等待下游阶段的消费,如图3-3所示。


我们可以跟踪每个这样的缓冲区与自己的watermark,跨越各级缓冲区的小水印构成该级的输出水印。因此,输出水印可以是下列小值:

  • 每个源的水印(Per-source watermark)-每个发送stage。
  • 每个外部输入的水印(Per-external input watermark)-管道外部的源
  • 每个状态组件水印(Per-state component watermark)-可以写入的每种状态类型
  • 每个输出缓冲水印(Per-output buffer watermark)-每个接收stage

使水印在这个粒度级别上可用,能够更好的描述系统内部状态。水印跟踪消息在系统中不同缓冲区之间的流转状态,以便更容易地诊断阻塞。


理解 Watermark 传播


为了更好地理解输入和输出水印之间的关系以及它们如何影响水印的传播,我们来看一个示例。让我们考虑游戏分数,而不是计算团队分数的总和。尝试分析用户的参与度。我们首先计算每个用户的会话长度,假设用户参与游戏的时间代表用户参与度。在计算会话长度之后,我们还将计算固定时间段内的平均会话长度。


为了让我们的示例更加有趣,我们假设我们使用两个数据集,一个是移动分数,另一个是pc端分数。我们执行相同的分数计算逻辑,通过并行在这两个独立的数据集上计算。一种是计算用户在移动设备上玩游戏的分数,另一种是计算用户pc端上玩游戏的分数,这可能是因为不同平台采用的数据收集策略不同。重要的是,这两个阶段执行的是相同的操作,但是数据不同,因此输出水印也有很大的不同。


首先,让我们看一下示例3-1,看看这个pipeline部分的代码是什么样的。

//Example 3-1. Calculating session lengths
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))) 
    .triggering(AtWatermark())
    .discardingFiredPanes())
.apply(CalculateWindowLength());

PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
     .triggering(AtWatermark())
     .discardingFiredPanes())
.apply(CalculateWindowLength());

相关文章