用Flink SQL实现数据流式导入Elasticsearch

2020-05-29 00:00:00 创建 数据 语句 参数 导入

通过上一文我们知道要如果用Elasticsearch实现全文检索,需要先将待检索的语料数据导入Elasticsearch集群。我们可以采用定期批量导入,但这样很多时候并不能满足业务需求, 比如新出来的新闻,或者新上传的文档,商家新上传的商品等,我们希望能实时能被检索到,这就需要这些数据导入Elasticsearch的过程是流式的。
Flink作为当前火热的开源流式计算引擎,具有性能好与使用广泛的优势,尤其是flink sql api,可以使得开发者可以像操作关系型数据一样使用简单的sql创建流式计算任务, 本文将介绍如何用flink sql将数据流式到Elasticsearch.

任务描述

假设我们的数据来自kafka中名为person的topic,为了方便陈述,假设仅包含两个字段,id与doc,类型均为string。 现在需要将其导入Elsaticsearch的某个index下面。 可以用PUT请求创建一个名为test_index的index用于存放导入的数据。

创建Source Connector

flink sql api提供了关键字用于创建kafka connector,相应的kafka配置参数通过DDL语句的with参数传入,示例如下

CREATE TABLE person_kafka_source (
  id STRING,
  doc STRING
) WITH (
  -- declare the external system to connect to
  'connector.type' = 'kafka',
  'connector.version' = '0.10',
  'connector.topic' = 'topic_name',
  'connector.startup-mode' = 'earliest-offset',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',

  -- specify the update-mode for streaming tables
  'update-mode' = 'append',

  -- declare a format for this system
  'format.type' = 'avro',
  'format.avro-schema' = '{
                            "namespace": "org.myorganization",
                            "type": "record",
                            "name": "person",
                            "fields": [
                                {"name": "id", "type": "string"},
                                {"name": "doc", "type": "string"}
                            ]
                         }'
)

相关文章