如何更改记录的时间戳?

2022-04-18 00:00:00 java apache-kafka-streams

我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。

是否有特定于Kafka Streams的解决方案?


我真正感兴趣的时间戳是由fluentd在消息中发送的:

";timestamp";:";1507885936";,&Quot;主机:&Quot;V.X.Y.Z.

以卡夫卡表示的记录:

偏移量=0,时间戳=-1,键=空,值={";timestamp";:";1507885936";,;主机;:V.X.Y.Z.&Quot;}

我希望有这样一张卡夫卡唱片:

OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}

我的解决方法如下所示:

  • 编写消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

  • 编写一个生产者,生成一个时间戳设置为(ProducerRecord(字符串主题,整数分区,长时间戳,K键,V值)的新记录)

我更喜欢KafkaStreams解决方案(如果有)。


解决方案

您可以编写非常简单的Kafka Streams应用程序,如下所示:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

并使用从记录中提取时间戳并返回时间戳的自定义TimestampExtractor配置应用程序。

Kafka Streams在将记录写回Kafka时将使用返回的时间戳。

注意:如果您有乱序数据--即时间戳没有严格排序--结果也将包含乱序时间戳。Kafka Streams使用返回的时间戳回写Kafka(即,无论提取程序返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录中的时间戳用于所有生成的输出记录--这适用于版本1.0,但在将来的版本中可能会更改。)。

更新:

一般来说,您可以通过处理器API修改时间戳。调用context.forward()可以通过To.all().withTimestamp(...)将输出记录时间戳设置为forward()的参数。

相关文章