flink-TimestampsAndWatermarks

2020-07-01 00:00:00 数据 专区 时间 系统 数据源

说多有点累,直接上代码吧。

注意:多流join时,需要添加broadcast

package com.weiye.eventTime;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.table.api.Data*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.*.Row;

import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.Properties;

public class Test {

    /**
     * proctime 聚合
     * @throws Exception
     */
    public static void test0KProc() throws Exception {
        EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //流式查询
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
        //读取数据源
        //2、获取输入源数据信息-kafaka
        Properties properties = new Properties();
        // metricTopic
        properties.setProperty("bootstrap.servers", "ch3:9092"); 
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
        consumer.setStartFromEarliest();

        DataStreamSource<APMDataModel> ds2 = env.addSource(consumer);
        ds2.print();
       
       Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code, proctime.proctime, rowtime.rowtime");
        tableEnv.registerTable("tempTable", table1);
        tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());
        Table result2 = tableEnv.sqlQuery("select  utc2local(TUMBLE_END(proctime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from  tempTable group by TUMBLE(proctime, INTERVAL '10' SECOND),uri ");
        /**
         *
         * 2020-06-08 14:35:10,uri,1
         * 2020-06-08 14:36:20,uri,1
         *
         */

        DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
        resultDataStream2.print();

        env.execute("hn-testc");

    }


    /**
     * eventTime 聚合--基于事件大时间触发聚合
     * @throws Exception
     */
    public static void testOk1() throws Exception {

        EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //流式查询
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
        //读取数据源
        //2、获取输入源数据信息-kafaka
        Properties properties = new Properties();
        // metricTopic
        properties.setProperty("bootstrap.servers", "ch3:9092"); 
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
        consumer.setStartFromEarliest();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

       /*
       多流join时,需要添加broadcast
      DataStream<APMRequestSimpleModel> nsource = t.union(t2).broadcast().assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<APMRequestSimpleModel>(Time.milliseconds(5000)){
            @Override
            public long extractTimestamp(APMRequestSimpleModel element) {
                return element.getRowtime();
            }
        });*/

        DataStream<APMDataModel> ds2 = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<APMDataModel>() {
            @Override
            public long extractTimestamp(APMDataModel element, long previousElementTimestamp) {
                long timestamp = element.rowtime;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
            }

            private final long maxTimeLag = 5000; // 5 seconds
            private long currentMaxTimestamp;
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
               return new Watermark(currentMaxTimestamp - maxTimeLag);
            }

        });
        ds2.print();


        Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code,scount,smaxtime,smintime,ssumtime, proctime.proctime, rowtime.rowtime");

        tableEnv.registerTable("tempTable", table1);
        tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());

        Table result2 = tableEnv.sqlQuery("select  utc2local(TUMBLE_END(rowtime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from  tempTable group by TUMBLE(rowtime, INTERVAL '10' SECOND),uri ");

        DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
        resultDataStream2.print();

        /**
         *
         * 系统当前时间: 2020-06-08 15:30:40
         *
         * {"application":"","cw_biz":"N3S","http_code":-1,"otime":"2020-06-04 11:24:30","ptime":1591601478140,"rowtime":1591241070000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * {"application":"","cw_biz":"N3S","http_code":-1,"otime":"2020-06-04 11:24:31","ptime":11,"rowtime":11,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:30","ptime":11,"rowtime":1591241070000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:50","ptime":1591601478180,"rowtime":1591241090000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * 2020-06-04 11:24:40,Iuri,3
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:51","ptime":1591601549921,"rowtime":1591241091000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:24:52","ptime":1591601562814,"rowtime":1591241092000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:25:12","ptime":1591601581774,"rowtime":1591241112000,"scount":3,"smaxtime":6007,"smintime":6004,"ssumtime":18017,"uri":"Iuri"}
         * 2020-06-04 11:25:00,Iuri,3
         *
         */

        env.execute("hn-testc");

    }

    /**
     * eventTime 聚合 --基于系统时间触发聚合 ---会有北京时间8小时的时差的问题
     * @throws Exception
     */
    public static void testOk2() throws Exception {

        EnvironmentSettings fsSetting =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //流式查询
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSetting);
        //读取数据源
        //2、获取输入源数据信息-kafaka
        Properties properties = new Properties();
        // metricTopic
        properties.setProperty("bootstrap.servers", "ch3:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer<APMDataModel> consumer = new FlinkKafkaConsumer<APMDataModel>("zxy_test", new KafkaMsgDesSchemaTest(APMDataModel.class), properties);
//        consumer.setStartFromEarliest();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<APMDataModel> ds2 = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<APMDataModel>() {
            @Override
            public long extractTimestamp(APMDataModel element, long previousElementTimestamp) {
                long timestamp = element.rowtime;
                return timestamp;
            }

            private final long maxTimeLag = 5000; // 5 seconds
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(System.currentTimeMillis() - maxTimeLag);
            }

        });
        ds2.print();

        Table table1 = tableEnv.fromDataStream(ds2,"application,cw_biz,uri,otime,http_code,scount,smaxtime,smintime,ssumtime, proctime.proctime, rowtime.rowtime");
        tableEnv.registerTable("tempTable", table1);
        tableEnv.registerFunction("utc2local",new UTC2LocalTimeFun());

        Table result2 = tableEnv.sqlQuery("select  utc2local(TUMBLE_END(rowtime, INTERVAL '10' SECOND)) as optTime, uri , count(*) from  tempTable group by TUMBLE(rowtime, INTERVAL '10' SECOND),uri ");
        DataStream<Row> resultDataStream2 = tableEnv.toAppendStream(result2, Row.class);
        resultDataStream2.print();

        /**
         *
         * 系统当前时间: 2020-06-08 15:47:30  ---等待系统时间输出(2020-06-08 15:48:22 +10s聚合+ 5s延迟)。
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-04 11:25:12","ptime":1591602237223,"rowtime":1591241112000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:47:02","ptime":1591602461288,"rowtime":1591602422000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:47:32","ptime":1591602466604,"rowtime":1591602452000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
         * {"application":"","cw_biz":"beauty","http_code":-1,"otime":"2020-06-08 15:48:22","ptime":1591602473471,"rowtime":1591602502000,"scount":3,"smaxtime":6007,"smaxtimegetSsumtime":18017,"smintime":6004,"ssumtime":18017,"uri":"DEFAULT.quartzFouth"}
         * 2020-06-08 15:48:30,DEFAULT.quartzFouth,1
         *
         *
         */
        env.execute("hin-testc");

    }


    public static void main(String[] args) throws Exception {
//        testOk1();
        testOk2();
    }
}



========================APMDataModel原始数据类

public class APMDataModel implements Serializable {

    private static final long serialVersionUID = 1L;
    private String application;
    private  String cw_biz;
    private String uri;
    private String otime;

    private int http_code;
    private long scount;

    public long ptime=System.currentTimeMillis();
    public long rowtime=0l;

}
  public void setOtime(String otime) {
        this.otime = otime;
        this.rowtime =  DateUtil.getTimeLongMisFromSudaLog(otime);
    }

相关文章