如何更改记录的时间戳?
我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。
是否有特定于Kafka Streams的解决方案?
我真正感兴趣的时间戳是由fluentd在消息中发送的:
";timestamp";:";1507885936";,&Quot;主机:&Quot;V.X.Y.Z.
以卡夫卡表示的记录:
偏移量=0,时间戳=-1,键=空,值={";timestamp";:";1507885936";,;主机;:V.X.Y.Z.&Quot;}
我希望有这样一张卡夫卡唱片:
OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}
我的解决方法如下所示:
编写消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
编写一个生产者,生成一个时间戳设置为(ProducerRecord(字符串主题,整数分区,长时间戳,K键,V值)的新记录)
我更喜欢KafkaStreams解决方案(如果有)。
解决方案
您可以编写非常简单的Kafka Streams应用程序,如下所示:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用从记录中提取时间戳并返回时间戳的自定义TimestampExtractor
配置应用程序。
Kafka Streams在将记录写回Kafka时将使用返回的时间戳。
注意:如果您有乱序数据--即时间戳没有严格排序--结果也将包含乱序时间戳。Kafka Streams使用返回的时间戳回写Kafka(即,无论提取程序返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录中的时间戳用于所有生成的输出记录--这适用于版本1.0,但在将来的版本中可能会更改。)。
更新:
一般来说,您可以通过处理器API修改时间戳。调用context.forward()
可以通过To.all().withTimestamp(...)
将输出记录时间戳设置为forward()
的参数。
相关文章