如何将PostgreSQL WAL记录注入Kafka

2020-11-02 00:00:00 连接 配置 信息 容器 插件

作者简介

钟硕

现供职于迪卡侬,PostgreSQL & Oracle DBA


背景

一年前,我曾发文介绍过PostgreSQL的Decoding工具wal2json。其目的在于将PG的WAL中的变更记录(主要指DML涉及的数据变更信息)转化为可读信息,提供给运维人员做后续数据的处理。文章中虽提及Debezium项目,但并没有介绍Debezium项目实践的内容。
Debezium是基于Apache Kafka 项目建立,并为 Kafka Connect 提供兼容性插件,用于将数据库系统中的预写式日志中的事件记录转化为事件流,并将其推送至Kafka中。当应用(数据库)出现不可期的异常中断时,作为数据的消费者仍可凭借kafka中保留的相关改变事件流对数据进行准确和完整的处理。
 

容器化部署Kafka Connector

为了简化文档的可读性,我这里使用容器化方式部署,达到快速实践的目的,更关注Zookeeper,Kafka或者正在使用非容器化部署方式的童鞋,可以在理解该解决方案后做相应的调整。
           

部署Kafka Connector前的准备工作

●Zookeeper
https://debezium.io/documentation/reference/tutorial.html#starting-zookeeper
 
●Kafka
https://debezium.io/documentation/reference/tutorial.html#starting-kafka
 
●Kafka connector
https://debezium.io/documentation/reference/tutorial.html#starting-kafka-connect
注意:tutorial中未将容器中的配置文件映射出来,Kafka connector 容器化安装后需额外配置填加数据库连接器插件,请将相应目录映射到宿主机中:
-v /opt/kafka/logs:/kafka/logs
/opt/kafka/data:/kafka/data
/opt/kafka/config:/kafka/config
/opt/kafka/connect:/kafka/connect
connector中配置文件/kafka/config/connect-distributed.properties中默认plugin.path=/kafka/connect
 
●MySQL
https://debezium.io/documentation/reference/tutorial.html#starting-mysql-database
如果使用MySQL DB的可以参考tutorial,不再赘述。
 
●PostgreSQL
https://hub.docker.com/r/debezium/example-postgres
这里使用容器化的方式安装样本PostgreSQL,方便做快速验证。可以从样本里面看到各数据库的配置内容(如 my.cnf和postgresql.conf)。如WAL级别,解码插件等相关信息
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4


部署Debezium Connector插件

下载 Debezium Connector Plugin
https://debezium.io/documentation/reference/install.html
解压到映射的插件目录/opt/kafka/connect下,检查是否被加载到容器中,以PostgreSQL为例:
 
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.0.Final/debezium-connector-postgres-1.3.0.Final-plugin.tar.gz
 
# tar zxvf debezium-connector-postgres-1.3.0.Final-plugin.tar.gz -C /opt/kafka/connect/
 

检查Kafka Container中插件的情况

docker ps -f name='debezium.*connect' -l


CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
73dd73bd6872 debezium/connect "/.r/r /docker-ent..." 42 hours ago Up 42 hours r-debezium-connector-kafka-connector-1-a3726ec4


docker exec -it 73dd73bd6872 /bin/bash


[kafka@73dd73bd6872 connect]$ ls
debezium-connector-mysql debezium-connector-postgres
[kafka@73dd73bd6872 connect]$ pwd
/kafka/connect

验证Debezium PostgreSQL connector加载的信息

docker logs -t -f 73dd73bd6872 | more


2020-10-20T07:56:08.452286434Z --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
2020-10-20T07:56:09.896637652Z 2020-10-20 07:56:09,889 - INFO [main:DelegatingClassLoader@246] - Loading plugin from: /kafka/connect/debezium-connector-postgres
2020-10-20T07:56:10.574470660Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@269] - Registered loader: PluginClassLoader{pl
uginLocation=file:/kafka/connect/debezium-connector-postgres/}
2020-10-20T07:56:10.574502062Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@198] - Added plugin 'io.debezium.connector.pos
tgresql.PostgresConnector


注册PG连接信息到connector

参考配置项(更多参照https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-example-configuration
vim register-postgresql.json


{
"name": "fulfillment-connector", ①
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", ②
"tasks.max": "1", ③
"plugin.name": "wal2json_streaming", ④
"database.hostname": "postgresql", ⑤
"database.port": "5432", ⑥
"database.user": "postgres", ⑦
"database.password": "debezium", ⑧
"database.dbname" : "postgres", ⑨
"database.server.name": "fulfillment", ⑩
"schema.include.list": "inventory"
}
}

① name:注册到connector服务中的名称。
② connector.class:连接器类型,plugin正确安装的前提下,可以通过下面的方式获取到class的内容。
curl -H "Accept:application/json" -sS localhost:8083/connector-plugins
③ tasks.max:每个connector预启动的大的任务数,PG总是使用一个任务,因此不是一个必要的参数。
④ plugin.name:这个是在PG中的shared_preload_libraries里面配置的Decoding的插件(decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput)。使用wal2json时,事务比较大情况下(1GB),解码出来的Json中包含所有的事务流,被buffer到内存会影响数据处理的效率。因此,这种情况下配置wal2json_streaming,将一个事务中的每次改变分成一个消息条目进行流式处理。
⑤,⑥,⑦,⑧,⑨ database.[ hostname| port | user | password | dbname ]: DB的连接信息。因为,在connector服务中做了链路 --link postgresql:postgresql。所以这里PG容器的主机名称是postgresql。
⑩ database.server.name:是在Kafka的Topics的昵称。
⑪ schema.include.list:设定要消费的schema名,多个schema名用逗号分隔,默认为非系统的schema。

启动注册到Debezium PostgreSQL Connector中的连接服务

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgresql.json

验证注册的信息

curl -H "Accept:application/json" localhost:8083/connectors/
["fulfillment-connector"]
同样的在kafka connector容器中应该也能看到如下信息:
2020-10-21T03:49:22.632581000Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - TaskConfig values:2020-10-21T03:49:22.632592801Z task.class = class io.debezium.connector.postgresql.PostgresConnectorTask2020-10-21T03:49:22.632596701Z 2020-10-21T03:49:22.632948722Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:Worker@524] - Instantiated task fulfillment-connector-0 with version 1.3.0.Final of type io.debezium.connector.postgresql.PostgresConnectorTask2020-10-21T03:49:22.633286743Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - JsonConverterConfig values:2020-10-21T03:49:22.634988447Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@543] - Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task fulfillment-connector-0 using the worker config2020-10-21T03:49:22.634994347Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@550] - Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task fulfillment-connector-0 using the worker config

停止并删除注册的连接

curl -X DELETE localhost:8083/connectors/fulfillment-connector
删除数据库中的slot,否则下次再启动连接时,会报slot已经存在的错误(连接配置中默认配hislot.name为debezium的插槽)
# select pg_drop_replication_slot('debezium')
docker exec -it a5d73008228a /bin/bashroot@a5d73008228a:/# psql -d postgres -U postgrespostgres=# select * from pg_replication_slots;-[ RECORD 1 ]-------+----------slot_name | debeziumplugin | wal2jsonslot_type | logicaldatoid | 13067database | postgrestemporary | factive | tactive_pid | 1681xmin | catalog_xmin | 605restart_lsn | /209B200confirmed_flush_lsn | /209B238
postgres=# set search_path to inventory ;SET
postgres=# \dt List of relations Schema | Name | Type | Owner -----------+------------------+-------+---------- inventory | customers | table | postgres inventory | geom | table | postgres inventory | orders | table | postgres inventory | products | table | postgres inventory | products_on_hand | table | postgres inventory | spatial_ref_sys | table | postgres(6 rows)
postgres=# \d customers Table "inventory.customers" Column | Type | Collation | Nullable | Default ------------+------------------------+-----------+----------+--------------------------------------- id | integer | | not null | nextval('customers_id_seq'::regclass) first_name | character varying(255) | | not null | last_name | character varying(255) | | not null | email | character varying(255) | | not null |

通过Kafka HQ或者类似的工具查看数据库生产的信息
Topic: fulfillment.inventory.customers{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, "payload": { "before": null, "after": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "source": { "version": "1.3.0.Final", "connector": "postgresql", "name": "fulfillment", "ts_ms": 1603252163493, "snapshot": "true", "db": "postgres", "schema": "inventory", "table": "customers", "txId": 602, "lsn": 34078720, "xmin": null }, "op": "r", "ts_ms": 1603252163495, "transaction": null }}

参考:
Debezium tutorial
https://debezium.io/documentation/reference/tutorial.html
 
Debezium
Confluent for debezium-connect-postgres
https://docs.confluent.io/current/connect/debezium-connect-postgres/index.html

相关文章