Kafka的架构原理,你真的理解吗?

2020-05-28 00:00:00 数据 多个 消息 消费 副本

Apache Kafka 早是由 LinkedIn 开源出来的分布式消息系统,现在是 Apache 旗下的一个子项目,并且已经成为开源领域应用广泛的消息系统之一。




Kafka 社区非常活跃,从 0.9 版本开始,Kafka 的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。


Kafka 和传统的消息系统不同在于:

  • Kafka是一个分布式系统,易于向外扩展。
  • 它同时为发布和订阅提供高吞吐量。
  • 它支持多订阅者,当失败时能自动平衡消费者。
  • 消息的持久化。


Kafka 和其他消息队列的对比:



入门实例


生产者


代码如下:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class UserKafkaProducer extends Thread
{
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();
    public UserKafkaProducer(String topic)
    {
        props.put("metadata.broker.list", "localhost:9092");
        props.put("bootstrap.servers", "master2:6667");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(props);
        this.topic = topic;
    }
@Override
    public void run() {
        int messageNo = 1;
        while (true)
        {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            //返回的是Future<RecordMetadata>,异步发送
            producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

相关文章