Flink(三)-Time-&-Watermark
1. Time
Flink中的时间(Time)主要分为三种:
- Event Time:每条数据真实的产生时间,这就要求每条进入Flink应用的数据都要自己带有时间戳,标明数据产生时间;
- Ingestion Time:是介于Event time和 Processing Time之间的时间。在数据通过Source Function 进入Flink应用之后,他就会获取Source Operator的本地时间作为时间戳;
- Processing Time:即数据被处理的时间。当我们的Flink程序是使用这个时间进行处理的时候,所有基于时间的操作都会使用当前机器的系统时钟来做为时间戳。
在应用中指定时间类型
在Flink中默认情况下使用的是Processing Time,如果我们使用了Event time或者Ingestion time那么就需要在创建StreamExecutionEnvironment之后调用setStreamTimeCharacteristic来设置基准时间。这个设置指定了数据的时间分配,以及窗口操作所使用的时间类型。
下面的这段代码就指定数据的时间类型为Processing Time,窗口大小为1小时。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
相关文章