flink-TimestampsAndWatermarks
说多有点累,直接上代码吧。
注意:多流join时,需要添加broadcast
package com.weiye.eventTime;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.table.api.Data*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.*.Row;
import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.Properties;
public class Test {
/**
* proctime 聚合
* @throws Exception
*/
public static void test0KProc() throws Exception {
EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//流式查询
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
//读取数据源
//2、获取输入源数据信息-kafaka
Properties properties = new Properties();
// metricTopic
properties.setProperty("bootstrap.servers", "ch3:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
consumer.setStartFromEarliest();
DataStreamSource<APMDataModel> ds2 = env.addSource(consumer);
ds2.print();
Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code, proctime.proctime, rowtime.rowtime");
tableEnv.registerTable("tempTable", table1);
tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());
Table result2 = tableEnv.sqlQuery("select utc2local(TUMBLE_END(proctime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from tempTable group by TUMBLE(proctime, INTERVAL '10' SECOND),uri ");
/**
*
* 2020-06-08 14:35:10,uri,1
* 2020-06-08 14:36:20,uri,1
*
*/
DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
resultDataStream2.print();
env.execute("hn-testc");
}
/**
* eventTime 聚合--基于事件大时间触发聚合
* @throws Exception
*/
public static void testOk1() throws Exception {
EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//流式查询
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
//读取数据源
//2、获取输入源数据信息-kafaka
Properties properties = new Properties();
// metricTopic
properties.setProperty("bootstrap.servers", "ch3:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
consumer.setStartFromEarliest();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/*
多流join时,需要添加broadcast
DataStream<APMRequestSimpleModel> nsource = t.union(t2).broadcast().assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<APMRequestSimpleModel>(Time.milliseconds(5000)){
@Override
public long extractTimestamp(APMRequestSimpleModel element) {
return element.getRowtime();
}
});*/
DataStream<APMDataModel> ds2 = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<APMDataModel>() {
@Override
public long extractTimestamp(APMDataModel element, long previousElementTimestamp) {
long timestamp = element.rowtime;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
private final long maxTimeLag = 5000; // 5 seconds
private long currentMaxTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxTimeLag);
}
});
ds2.print();
Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code,scount,smaxtime,smintime,ssumtime, proctime.proctime, rowtime.rowtime");
tableEnv.registerTable("tempTable", table1);
tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());
Table result2 = tableEnv.sqlQuery("select utc2local(TUMBLE_END(rowtime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from tempTable group by TUMBLE(rowtime, INTERVAL '10' SECOND),uri ");
DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
resultDataStream2.print();
/**
*
* 系统当前时间: 2020-06-08 15:30:40
*
* {"application":"","cw_biz":"N3S","http_code":-1,"otime":"2020-06-04 11:24:30","ptime":1591601478140,"rowtime":1591241070000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* {"application":"","cw_biz":"N3S","http_code":-1,"otime":"2020-06-04 11:24:31","ptime":11,"rowtime":11,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:30","ptime":11,"rowtime":1591241070000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:50","ptime":1591601478180,"rowtime":1591241090000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* 2020-06-04 11:24:40,Iuri,3
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:51","ptime":1591601549921,"rowtime":1591241091000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:52","ptime":1591601562814,"rowtime":1591241092000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:25:12","ptime":1591601581774,"rowtime":1591241112000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
* 2020-06-04 11:25:00,Iuri,3
*
*/
env.execute("hn-testc");
}
/**
* eventTime 聚合 --基于系统时间触发聚合 ---会有北京时间8小时的时差的问题
* @throws Exception
*/
public static void testOk2() throws Exception {
EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//流式查询
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
//读取数据源
//2、获取输入源数据信息-kafaka
Properties properties = new Properties();
// metricTopic
properties.setProperty("bootstrap.servers", "ch3:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
// consumer.setStartFromEarliest();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<APMDataModel> ds2 = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<APMDataModel>() {
@Override
public long extractTimestamp(APMDataModel element, long previousElementTimestamp) {
long timestamp = element.rowtime;
return timestamp;
}
private final long maxTimeLag = 5000; // 5 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
});
ds2.print();
Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code,scount,smaxtime,smintime,ssumtime, proctime.proctime, rowtime.rowtime");
tableEnv.registerTable("tempTable", table1);
tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());
Table result2 = tableEnv.sqlQuery("select utc2local(TUMBLE_END(rowtime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from tempTable group by TUMBLE(rowtime, INTERVAL '10' SECOND),uri ");
DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
resultDataStream2.print();
/**
*
* 系统当前时间: 2020-06-08 15:47:30 ---等待系统时间输出(2020-06-08 15:48:22 +10s聚合+ 5s延迟)。
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:25:12","ptime":1591602237223,"rowtime":1591241112000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:47:02","ptime":1591602461288,"rowtime":1591602422000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:47:32","ptime":1591602466604,"rowtime":1591602452000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
* {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:48:22","ptime":1591602473471,"rowtime":1591602502000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
* 2020-06-08 15:48:30,DEFAULT.quartzFouth,1
*
*
*/
env.execute("hin-testc");
}
public static void main(String[] args) throws Exception {
// testOk1();
testOk2();
}
}
========================APMDataModel原始数据类
public class APMDataModel implements Serializable {
private static final long serialVersionUID = 1L;
private String application;
private String cw_biz;
private String uri;
private String otime;
private int http_code;
private long scount;
public long ptime=System.currentTimeMillis();
public long rowtime=0l;
}
public void setOtime(String otime) {
this.otime = otime;
this.rowtime = DateUtil.getTimeLongMisFromSudaLog(otime);
}
相关文章