用Flink SQL实现数据流式导入Elasticsearch
通过上一文我们知道要如果用Elasticsearch实现全文检索,需要先将待检索的语料数据导入Elasticsearch集群。我们可以采用定期批量导入,但这样很多时候并不能满足业务需求, 比如新出来的新闻,或者新上传的文档,商家新上传的商品等,我们希望能实时能被检索到,这就需要这些数据导入Elasticsearch的过程是流式的。
Flink作为当前火热的开源流式计算引擎,具有性能好与使用广泛的优势,尤其是flink sql api,可以使得开发者可以像操作关系型数据一样使用简单的sql创建流式计算任务, 本文将介绍如何用flink sql将数据流式到Elasticsearch.
任务描述
假设我们的数据来自kafka中名为person的topic,为了方便陈述,假设仅包含两个字段,id与doc,类型均为string。 现在需要将其导入Elsaticsearch的某个index下面。 可以用PUT请求创建一个名为test_index的index用于存放导入的数据。
创建Source Connector
flink sql api提供了关键字用于创建kafka connector,相应的kafka配置参数通过DDL语句的with参数传入,示例如下
CREATE TABLE person_kafka_source (
id STRING,
doc STRING
) WITH (
-- declare the external system to connect to
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- specify the update-mode for streaming tables
'update-mode' = 'append',
-- declare a format for this system
'format.type' = 'avro',
'format.avro-schema' = '{
"namespace": "org.myorganization",
"type": "record",
"name": "person",
"fields": [
{"name": "id", "type": "string"},
{"name": "doc", "type": "string"}
]
}'
)
相关文章