Flink任务实时获取并更新规则

2020-07-02 00:00:00 数据 规则 获取 算子 数据流

背景

Flink通常被用来处理流式数据,有着众多的应用场景,比方说实时的ETL、检测报警等业务场景。这些场景通常会涉及到规则的更新,比如对解析规则和报警规则进行更改后,流任务应能够实时感知到,并用新的规则继续检测,避免因为规则更改而重启任务造成的开销。一般来说流式任务的重启是比较重的。

方案选择

接下来分别介绍下两种可行的方案与选型

1.广播变量与广播流

广播变量通常被运用到以下场景中:一个流中的一些数据需要被广播到所有的下游任务,被下游任务缓存在本地并用于处理另一个流上的所有传入数据。例如,一个低吞吐量的流包含了一组规则,我们希望根据这些规则对另一个流的所有数据进行检测。因此,广播变量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同时包含广播流和普通的数据流才可用(3)一个算子可以使用多个广播变量并用名称进行区分

2.异步IO

流计算中经常会需要跟外部存储系统交互,如mysql、redis等。众所周知,在流处理中查询外部存储系统并等结果返回等待时间相对来说是比较长的,若数据量较大则导致会吞吐量会大大降低,使流任务基本处于阻塞不可用状态。

同步处理与异步处理

Flink的异步IO可以使得流任务在不阻塞运算的情况下异步请求外部系统,并且支持超时处理,以及返回结果有序或无序处理等。关于异步IO的详细原理介绍已存在较多资料不在展开。通过异步IO获取规则的原理就是在数据到来之后查询外部系统获取规则,并根据规则检测或解析数据。比方将规则存放在redis、mysql等等。

理论上两种方式都是可以完成从外部系统获取规则并进行更新的。现分析下量两种方式的差别与特点。对于异步IO来说若对于每条数据查一次外部系统,当数据量比较大的时候,外部系统的查询性能会比较容易出现瓶颈,导致流处理能力达到上限。通常的优化方法是设置本地内存缓存规则,并设置过期时间,每次处理数据时判断规则是否过期,若过期则重新查询规则,这也导致了规则的获取可能有一定的延迟性,也就是说需要在数据量较大的情况下需要对处理性能和规则实时性作出一定的平衡,另外若任务运行在分布式架构上,同一算子可能在不同的机器或者容器中运行,则可能导致多个节点查询同一外部数据源的情况。

接下来分析下广播变量和广播流的方法,该方法通常定义一个数据源作为规则流,该数据源可以利用flink已有的connector,如kafka或者其他的消息队列,也可以继承flink的RichSourceFunction自定义数据源,比如利用Mysql作为数据源并设置线程定期获取规则。若将变更的规则放入消息队列作为规则流则可以做到实时更新,若利用外部存储如Mysql、Redis作为数据源定时刷新,可以准实时的更新规则,无论利用那种方式,都可以只设置并行度为1的规则流获取算子,并将获取到的规则广播到下游所有算子。

因为笔者遇到的情况如下:需要更新规则的flink任务数大概在300个以上,并且还会持续增多,另外用户对于规则的更新多可以容忍分钟级别的延迟,但好是可以实时更新,对规则更新的实时性要求不是特别高,另外流任务的数据量比较大,单个任务每秒处理数据条数可能达到几万或几十万。因此使用广播变量并将Mysql作为数据源的方式获取规则,这样可以满足目前的需求,而且利于后期利用消息队列作为规则流实时更新规则扩展。

实践

背景及方案介绍完了,接下来直接上代码吧,talk is cheap, show me the code.

首先是继承RichSourceFunction类并自定义规则流,从msyql定时拉取配置。

public class GetJobConfig extends RichSourceFunction<Map<String,JobConfig>> {


    private volatile boolean isRunning = true;
    /**
     * 规则配置id
     */
    private Long jobHistoryId;

    public GetJobConfig(Long jobHistoryId) {
        this.jobHistoryId = jobHistoryId;
    }

    /**
     * 解析规则查询周期为1分钟
     */
    private Long duration = 1 * 60 * 1000L;

    @Override
    public void run(SourceContext<Map<String,JobConfig>> ctx) throws Exception {
        
        while (isRunning) {
            //从Mysql数据库获取配置
            String jobConfigStr = DBService.getInstance().getJobConfig(jobHistoryId);
            //解析规则与业务逻辑相关请忽略
            if (!StringUtils.isEmpty(jobConfigStr)) {
                JobConfig jobConfig = JsonUtil.string2Object(jobConfigStr, JobConfig.class);
                Map<String,JobConfig> jobConfigMap = new HashMap<>(12);
                jobConfigMap.put("jobConfig",jobConfig);
                //输出规则流
                ctx.collect(jobConfigMap);
            }
            //线程睡眠
            Thread.sleep(duration);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

相关文章