当Elasticsearch遇见Kafka

2020-05-27 00:00:00 创建 数据 腾讯 数据源 插件

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

本文由michelmu发表于云+社区专栏

Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash, Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是Logstash支持的众多输入输出源的其中一个。本文将从实践的角度,研究使用Logstash Kafka Input插件实现将Kafka中数据导入到Elasticsearch的过程。


使用Logstash Kafka插件连接Kafka和Elasticsearch

1 Logstash Kafka input插件简介

Logstash Kafka Input插件使用Kafka API从Kafka topic中读取数据信息,使用时需要注意Kafka的版本及对应的插件版本是否一致。该插件支持通过SSL和Kerveros SASL方式连接Kafka。另外该插件提供了group管理,并使用默认的offset管理策略来操作Kafka topic。

Logstash默认情况下会使用一个单独的group来订阅Kafka消息,每个Logstash Kafka Consumer会使用多个线程来增加吞吐量。当然也可以多个Logstash实例使用同一个group_id,来均衡负载。另外建议把Consumer的个数设置为Kafka分区的大小,以提供更好的性能。

2 测试环境准备

2.1 创建Elasticsearch集群

为了简化搭建过程,本文使用了腾讯云Elasticsearch service。腾讯云Elasticsearch service不仅可以实现Elasticsearch集群的快速搭建,还提供了内置Kibana,集群监控,专用主节点,Ik分词插件等功能,极大的简化了Elasticsearch集群的创建和管理工作。

2.2 创建Kafka服务

Kafka服务的搭建采用腾讯云CKafka来完成。与Elasticsearch Service一样,腾讯云CKafka可以实现Kafka服务的快速创建,兼容开源Kafka API(0.9版本)。

2.3 服务器

除了准备Elasticsearch和Kafka,另外还需要准备一台服务器,用于运行Logstash以连接Elasticsearch和Kafka。本文采用腾讯云CVM服务器

2.4 注意事项

1) 需要将Elasticsearch、Kafka和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将Elasticsearch service,CKafka和CVM创建在同一个私有网路(VPC)下即可。

2) 注意获取Elasticsearch serivce,CKafka和CVM的内网地址和端口,以便后续服务使用

本次测试中:

服务ipportElasticsearch service192.168.0.89200Ckafka192.168.13.109092CVM192.168.0.13-

3 使用Logstash连接Elasticsearch和Kafka

3.1 Kafka准备

可以参考[CKafka 使用入门]

按照上面的教程

1) 创建名为kafka_es_test的topic

2) 安装JDK

3) 安装Kafka工具包

4) 创建producer和consumer验证kafka功能

3.2 安装Logstash

Logstash的安装和使用可以参考[一文快速上手Logstash]

3.3 配置Logstash Kafka input插件

创建kafka_test_pipeline.conf文件内容如下:

input{
        kafka{
                bootstrap_servers=>"192.168.13.10:9092"
                topics=>["kafka_es_test"]
                group_id=>"logstash_kafka_test"
        }
}
output{
        elasticsearch{
                hosts=>["192.168.0.8:9200"]
        }
}

相关文章