流式计算系统系列(4):状态
状态需求的来源
现实中的计算可以根据对状态的需求分为无状态的计算和有状态的计算两种。无状态的计算意味着单条输入包含计算出输出的所有信息,不需要其他信息即可得到输出;而有状态的计算意味着单条输入仅包含计算出输出的部分信息,其他信息依赖于之前的输入积累的状态。无状态的计算典型的例子比如从 socket 源将数据导入 HDFS,有状态的计算典型的例子比如经典的 WordCount 就需要累积之前的单词计数。
显而易见,大多数有业务价值的计算场景都是有状态的计算。状态的需求包括状态数据的存储和加载,快照和恢复,以及分布式场景下的划分和伸缩三个方面。批处理和流式计算都有对状态的需求,但是其重要性和实际处理方式并不相同。
对于状态数据的存储和加载,在批处理的场景下,单条输入实际上意味着单批数据。数据被划分为不同的分片,通常,根据具体的操作(如 KeyBy)被划分为彼此相关的若干块。在这种划分方式下,每个任务处理其中一个分片,终聚合输出得到结果。在这个过程中,相互依赖的数据被分在一起,并且每个批次都只有有限的数据,内存状态即可解决有状态计算中数据相互依赖的问题,对于状态的需求并不明显。
但在流式计算中,单条输入就意味着单条数据。由于流式计算的输入理论上是个无限流,我们必须有某种方式来管理先前输入积累的状态。注意到区别于批处理分钟、小时级的处理时间,流式计算的运行时间以天、月甚至年为单位。在这种情况下,我们不能总是依赖内存来管理状态。内存状态的管理方式并不能很好的应对大数据量的状态的存储问题,容错场景下的一致性问题和横向扩展时的伸缩问题。
存储的问题是明显的,批处理每批只处理有限的数据,而流式计算需要应对无限的输入。容错场景下的一致性问题在本系列篇文章中也有提到,在批处理的场景下,单批失败只需要重新计算,而流式计算的场景里上游无法无限缓存中间结果,必须依赖状态的快照和恢复来容错。如果说在批处理的场景下,快照是一个快速恢复的优化,那么在流式计算的场景里 ,这就是一个必须实现的功能了。
另外,状态的划分和应对规模伸缩在流式计算中也需要被特别对待。这是因为在批处理中,状态是和一批有限的数据绑定的,在规模伸缩时,只要把数据按照不同的方式重新划分好,状态自然就跟着数据被划分好了,也就是说,状态是跟着有限的数据走的。对于已完成的计算如果牵扯进状态划分,则可以重算一遍有限数据的批次。但是在流式计算中,我们不可能实际的完整划分无限的数据,也不可能回溯过往的所有数据,规模伸缩实际改变的是网络拓扑,也就是所谓的重分区(repartition),见下图。在这种情况下,我们必须做好状态数据的格式定义,以在重分区的场景下正确地将状态重新划分。当然,批处理也可以做这样的优化避免已计算完的状态重新计算,不过只是一个优化,而且对于正在执行的作业,这样的优化有些没必要。
传统的流式计算对状态管理仅提供有限的支持(Spark Streaming)或者根本不提供支持(Storm),但是有状态的计算是确实存在的业务需求。在这种情况下采取的方法往往是引入另一个系统来辅助实现状态管理。例如经典的 Storm + HBase 架构,把状态数据存放在 HBase 中,计算的时候再从 HBase 中读取状态数据。但是这样的方案显然有不可忽略的性能开销,而且仅仅是解决了流式计算可能产生超大数据量的问题,以及持久化容错但不保证强一致性。同时,这样的方案在伸缩的时候需要用户手动编写代码去重新分配状态,更广泛的说,每一个业务自己都要重新实现一整套状态管理的逻辑。这种重复实现不仅是没必要的,而且显而易见的会在每次实现的时候碰到相同不相同的 BUG。
我们不会要求业务从操作系统开始自己写起来搭建应用,作为流式计算的操作系统,其本身也应该支持这种常见而且有规律的需求,即提供开箱即用的本地状态管理。这里的本地是相对于 Storm + HBase 方案中需要对每个操作都走网络而言的,强调的是状态和计算在同一台机器上,包括但不限于内存状态。
状态种类的划分
新时代的流式计算系统基本都将状态管理考虑在需要支持的核心特性当中,其中 Flink 由于长时间的考验和眼睛,支持的状态种类和兼顾到的应用场景是多的。我们来看到 Flink 中的状态种类划分,其他的系统基本大同小异。
总的来说,Flink 的状态区分为 KeyedState 和 OperatorState 两种。顾名思义,KeyedState 跟每一个 key 相关联,也就只能使用在 KeyedStream 当中;而 OperatorState 与每个算子相关,可以使用在任意算子里。
首先说 KeyedState,我们先看一段 Flink 代码,感受一下实际使用的情况。
// KeyedState
public class WordCount extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private transient ValueState<Integer> state;
@Override
public void open(Configuration conf) {
final ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("C", Integer.class, );
state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String word, Collector<Tuple2<String, Integer>> collector) {
int c = state.value();
c += 1;
state.update(c);
collector.collect(new Tuple2<>(word, count));
}
}
相关文章