kafka的low-level consumer怎么使用

2023-04-07 02:35:00 Kafka

Kafka的low-level consumer怎么使用

在使用Kafka的low-level consumer API时,首先需要创建一个KafkaConsumer实例,并指定要消费的topic和要连接的broker。然后调用KafkaConsumer.poll()方法来不断地从broker拉取消息。每次调用poll()方法都会返回一个包含多条消息的ConsumerRecord实例的列表。

下面是一个使用Kafka的low-level consumer API的简单示例:

import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaConsumer; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { //配置consumer所需的参数 Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "test-consumer-group"); props.put("zookeeper.session.timeout.ms", "6000"); props.put("zookeeper.connection.timeout.ms", "6000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); //创建KafkaConsumer实例 KafkaConsumer consumer = new KafkaConsumer(props); //指定要消费的topic consumer.subscribe("test-topic"); //不断地从broker拉取消息 while (true) { ConsumerIterator it = consumer.poll().iterator(); while (it.hasNext()) { System.out.println(it.next().value()); } } } }

相关文章