Flink Checkpoint原理解析

2020-06-22 00:00:00 数据 状态 算子 触发 快照

Flink是一个有状态的分布式流式计算引擎,flink中的每个function或者是operator都可以是有状态的,有状态的function在处理流数据或事件的的同时会存储一部分用户自定义的数据,这使得flink的状态可以作为任何更精细操作的基础。然而总会有一些原因使流任务出现异常(如网络故障、代码bug等),为了使得状态可以容错,flink引入了checkpoint机制。checkpoint使得flink能够恢复流任务的状态和位置,从而为流任务提供与无故障执行相同的语义。下面对flink的checkpoint机制进行总结,并分享一些使用经验。

Paper

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

  • 概览

有状态的分布式流式计算使得大规模的部署和云计算成为可能,并且有着两个目标--低延迟和高吞吐。一个基础的挑战就是如何在计算或任务可能失败的情况下,提供数据一致性的保证。之前的通常的方法是依赖周期性的全局快照,然而这些方法有两个缺点。,他们通常需要暂停整个计算逻辑,这通常会影响数据的摄取。第二,他们通常需要持久化所有在传输中的数据,这会导致生成的快照比实际需要的大很多。

这篇paper主要介绍了一个轻量级分布式异步快照,只需要保存较少的数据即可。并且将这种轻量级的算法在flink中实现,之后通过验证表明,这种算法对数据的处理和计算影响很小,而且拥有线性的可扩展性,并且在快照比较频繁的时候性能依旧良好。接下来paper首先介绍了现存的几种流计算种使用的快照算法,以及存在的问题,并且简单的介绍了flink的流计算架构,主要是flink的一些基本概念,因此不再展开介绍。之后就是这篇paper的重点,分布式异步快照的实现以及状态的恢复。后是性能评估和总结。

  • Asynchronous Barrier Snapshotting(ABS)

为了保证输出结果的一致性,分布式流处理系统通常要能处理任务失败的情况。一种常用的做法是周期性的产生快照,在任务失败的情况下能后从快照恢复运行。快照是任务计算图的一个全局的状态,包含所有能够恢复任务的必要的数据。对于一个快照而言,从终性(Termination)与可行性(feasibility)两个方面来阐述如何保障结果的正确性。

ABS将计算逻辑通过有向无环图划分为几个阶段(stage),每个stage将所有的输入数据和输出结果划分为一系列的处理中间状态,对于每一个stage来说,所有先前的输入和输出结果都已经完全处理。所以ABS的核心思想就是创建分阶段的快照,同时保持连续的数据摄取。

接下来介绍周期性的barrier如何起作用。

1)中央协调者周期性的从源头注入barrier。

2)当source接受到barrier后,立即产生一个快照,之后将barrier广播到所有的输出端。

3)当非source算子收到其中一个输入端的barrier后,立刻阻塞这个channel;这个channel中被阻塞的数据将会被缓存起来;直到改算子收到了所有的上游输入的barrier。

4)当非source算子接收到所有的上游输入的barrier之后,则会立即生成快照,保存当前算子的状态,之后将barrier广播到所有的输出端。

5)这个算子解除对上游输入channel的阻塞,继续计算新的输入数据。当数据后完成sink,一个完整的检查点才算完成。

如何证明终性?数据channel的可靠性保证了每一个barrier都会被接收,只要任务还存活。另外的有向无环图中,数据的流动路径是确定的,每个算子终都会收到所有上游算子的 barrier并触发生成快照。

如何证明一致性?由于输入通道的先进先出原则和barrier的阻塞功能,确保了在快照完成之前,下游算子不会计算阻塞的数据。

  • 失败恢复

1)每个算子从持久化存储中恢复对应的状态作为其初始状态。

2)还原并处理备份的数据。

3)从输入通道重新摄取数据

如上图所示,任务只能恢复部分算子的情况也是有可能发生的,为了保证计算不重复,可以通过重放上游数据的方式进行处理。比如通过对比数据的offset,将offset小于已经处理过的大的offset的数据丢弃。

以上就是这篇paper的大概内容和核心算法。

Flink Checkpoint

Flink整体来说是一个主从架构,JobManager负责资源管理、任务调度、checkpoint触发等等协调性的工作,TaskManager负责按照代码逻辑处理数据,真正执行任务的角色。由于Flink的slot-sharing机制,一个TaskManager可能有多个子任务。

checkpoint流程

在checkpoint过程中,JobManager会按照代码配置的规则,定期触发checkpoint,在所有的source的子任务中注入checkpointBarrier,TaskManager在收到所有上游广播的CheckpointBarrier 后,触发checkpoint。当整个DAG图的子任务的checkpoint都做完之后,会汇报给JobManager,JobManager则认为这个checkpoint已经完成。整个流程与上述paper中的流程几乎一致。

Flink Checkpoint支持两种语义,Exactly_Once 和 At_Least_Once。这两种语义的区别主要在于对barrier 对齐方式的处理,Exactly_Once的算子在接受到个barrier后会阻塞上游输入通道,将输入数据缓存起来,知道所有上游的barrier都收到之后才会继续处理这些数据。At_Least_Once则不会阻塞上游数据,即使没有对齐barrier也会处理后续的数据。

源码分析

Checkpoint的主要流程可以CheckpointCoordinator类中看到。

	public PendingCheckpoint triggerCheckpoint(
			long timestamp,
			CheckpointProperties props,
			@Nullable String externalSavepointLocation,
			boolean isPeriodic,
			boolean advanceToEndOfTime) throws CheckpointException {

		if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
			throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
		}

		// 做一些预先的检查
		synchronized (lock) {
			// 若此时coordinator已经被关闭则checkpoint失败
			if (shutdown) {
				throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
			}

			// 若是周期性执行而调度器已经关闭则checkpoint失败
			if (isPeriodic && !periodicScheduling) {
				throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
			}

			// 验证checkpoint是否达到大并发数,验证checkpoint之间的小时间间隔
			// 默认大并发度为1,小间隔为0,这些参数都可以提前自定义配置
			// 这些检查都和savepoint没有关系
			if (!props.forceCheckpoint()) {
				// 检查是否只有一个触发checkpoint的请求
				if (triggerRequestQueued) {
					LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
					throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
				}

				// 检查当前有多少checkpoint在pending
				checkConcurrentCheckpoints();
				// 检查小时间间隔
				checkMinPauseBetweenCheckpoints();
			}
		}

		// 检查所有子任务的状态,若任务不在运行则直接抛弃这次checkpoint
		Execution[] executions = new Execution[tasksToTrigger.length];
		for (int i = ; i < tasksToTrigger.length; i++) {
			Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
			if (ee == null) {
				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
						job);
				throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
			} else if (ee.getState() == ExecutionState.RUNNING) {
				executions[i] = ee;
			} else {
				LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
						job,
						ExecutionState.RUNNING,
						ee.getState());
				throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
			}
		}


		// 检查所有待确认算子的运行状态,若不在运行中则直接抛弃这次checkpoint
		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

		for (ExecutionVertex ev : tasksToWaitFor) {
			Execution ee = ev.getCurrentExecutionAttempt();
			if (ee != null) {
				ackTasks.put(ee.getAttemptId(), ev);
			} else {
				LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
						ev.getTaskNameWithSubtaskIndex(),
						job);
				throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
			}
		}


		// 做完上述的一系列检查后开始真正触发checkpoint
		// 通过加特殊的锁来确保触发请求不会并发
		synchronized (triggerLock) {

			// checkpoint保存路径
			final CheckpointStorageLocation checkpointStorageLocation;
			final long checkpointID;

			try {
				// 这个必须使用全局的锁,因为与外部的系统交互可能会导致阻塞一段时间
				// checkpointID通常是一个递增的时间戳
				checkpointID = checkpointIdCounter.getAndIncrement();

				checkpointStorageLocation = props.isSavepoint() ?
						checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
						checkpointStorage.initializeLocationForCheckpoint(checkpointID);
			}
			catch (Throwable t) {
				// 若失败则记录失败次数,这些指标都可以通过metrics收集起来,用于checkpoint监控
				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
				LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
						job,
						numUnsuccessful,
						t);
				throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
			}

			// 创建一个等待执行的Checkpoint
			final PendingCheckpoint checkpoint = new PendingCheckpoint(
				job,
				checkpointID,
				timestamp,
				ackTasks,
				props,
				checkpointStorageLocation,
				executor);

			if (statsTracker != null) {
				PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
					checkpointID,
					timestamp,
					props);

				checkpoint.setStatsCallback(callback);
			}

			// 调度器会清理掉过期的checkpoint
			final Runnable canceller = () -> {
				synchronized (lock) {
					// 若checkpoint已经过期则直接丢弃
					if (!checkpoint.isDiscarded()) {
						LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

						failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
						pendingCheckpoints.remove(checkpointID);
						rememberRecentCheckpointId(checkpointID);

						triggerQueuedRequests();
					}
				}
			};

			try {
				// 重新获取coordinator锁
				synchronized (lock) {
					// 因为之前做完预检查后释放了锁,所以需要再检查下上述条件
					if (shutdown) {
						throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
					}
					else if (!props.forceCheckpoint()) {
						if (triggerRequestQueued) {
							LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
							throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
						}

						checkConcurrentCheckpoints();

						checkMinPauseBetweenCheckpoints();
					}

					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);

					pendingCheckpoints.put(checkpointID, checkpoint);

					// 创建超时取消线程
					ScheduledFuture<?> cancellerHandle = timer.schedule(
							canceller,
							checkpointTimeout, TimeUnit.MILLISECONDS);

					if (!checkpoint.setCancellerHandle(cancellerHandle)) {
						// 若checkpoint超时则会被取消
						cancellerHandle.cancel(false);
					}

					// 提前创建钩子等待所有子任务触发checkpoint
					final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
							checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
					for (MasterState s : masterStates) {
						checkpoint.addMasterState(s);
					}
				}
				// end of lock scope

				final CheckpointOptions checkpointOptions = new CheckpointOptions(
						props.getCheckpointType(),
						checkpointStorageLocation.getLocationReference());

				// 向所有子任务发送消息触发checkpoint
				for (Execution execution: executions) {
					if (props.isSynchronous()) {
						execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
					} else {
						// 触发checkpoint,从source算子广播CheckBarrier,当所有算子都完成Checkpoint后,jobManager认为Checkpoint已完成
						execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
					}
				}
				// 将失败的触发器数量归0
				numUnsuccessfulCheckpointsTriggers.set();
				return checkpoint;
			}
			catch (Throwable t) {
				// checkpoint出现异常则则从pending缓存中去除
				synchronized (lock) {
					pendingCheckpoints.remove(checkpointID);
				}
				// 记录checkpoint失败次数
				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
				LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
						checkpointID, job, numUnsuccessful, t);

				if (!checkpoint.isDiscarded()) {
					failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
				}

				try {
					// 失败后清除这次checkpoint的保存路径和该路径下的所有数据
					checkpointStorageLocation.disposeOnFailure();
				}
				catch (Throwable t2) {
					LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
				}

				throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
			}

		} // end trigger lock
	}

相关文章