Apache Flink CEP 实战

2020-07-01 00:00:00 模式 状态 事件 匹配 超时

本文根据Apache Flink 实战&进阶篇系列直播课程整理而成,由哈啰出行大数据实时平台开发刘博分享。通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给打算使用或者已经使用的同学一些帮助。

主要的内容分为如下三个部分:

  1. Flink CEP概念以及使用场景。
  2. 如何使用Flink CEP。
  3. 如何扩展Flink CEP。

Flink CEP 概念以及使用场景

什么是 CEP

CEP的意思是复杂事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流形成的模式称为CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

下图中列出了几个例子:



  • 个是异常行为检测的例子:假设车辆维修的场景中,当一辆车出现故障时,这辆车会被送往维修点维修,然后被重新投放到市场运行。如果这辆车被投放到市场之后还未被使用就又被报障了,那么就有可能之前的维修是的。
  • 第二个是策略营销的例子:假设打车的场景中,用户在APP上规划了一个行程订单,如果这个行程在下单之后超过一定的时间还没有被司机接单的话,那么就需要将这个订单输出到下游做相关的策略调整。
  • 第三个是运维监控的例子:通常运维会监控服务器的CPU、网络IO等指标超过阈值时产生相应的告警。但是在实际使用中,后台服务的重启、网络抖动等情况都会造成瞬间的流量毛刺,对非关键链路可以忽略这些毛刺而只对频繁发生的异常进行告警以减少误报。

Flink CEP 应用场景

  • 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
  • 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
  • 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

Flink CEP原理



Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态中间状态终状态三种,边分为takeignoreproceed三种。

  • take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
  • ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
  • proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。 也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条proceed边和下游的购买状态相连。

下面以一个打车的例子来展示状态是如何流转的,规则见下图所示。



以乘客制定行程作为开始,匹配乘客的下单事件,如果这个订单超时还没有被司机接单的话,就把行程事件和下单事件作为结果集往下游输出。

假如消息到来顺序为:行程-->其他-->下单-->其他。

状态流转如下:

1、开始时状态处于行程状态,即等待用户制定行程。



2、当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过take边将状态往下转移到下单状态



3、由于下单状态上有一条ignore边,所以可以忽略收到的其他事件,直到收到下单事件时将其匹配,放入结果集中,并且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。



4、超时未接单状态时,如果来了一些其他事件,同样可以被ignore边忽略,直到超时事件的触发,将状态往下转移到终状态,这时候整个模式匹配成功,终将结果集中的制定行程事件和下单事件输出到下游。



上面是一个匹配成功的例子,如果是不成功的例子会怎么样?

假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,之前放入结果集中的行程事件和下单事件会被清理。



Flink CEP程序开发

本节将详细介绍Flink CEP的程序结构以及API。

Flink CEP 程序结构

主要分为两部分:定义事件模式和匹配结果处理。

官方示例如下:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void select(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });

相关文章