如何将PostgreSQL WAL记录注入Kafka
■ 作者简介
钟硕
现供职于迪卡侬,PostgreSQL & Oracle DBA
背景
容器化部署Kafka Connector
部署Kafka Connector前的准备工作
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
部署Debezium Connector插件
检查Kafka Container中插件的情况
docker ps -f name='debezium.*connect' -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
73dd73bd6872 debezium/connect "/.r/r /docker-ent..." 42 hours ago Up 42 hours r-debezium-connector-kafka-connector-1-a3726ec4
docker exec -it 73dd73bd6872 /bin/bash
[kafka@73dd73bd6872 connect]$ ls
debezium-connector-mysql debezium-connector-postgres
[kafka@73dd73bd6872 connect]$ pwd
/kafka/connect
验证Debezium PostgreSQL connector加载的信息
docker logs -t -f 73dd73bd6872 | more
2020-10-20T07:56:08.452286434Z --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
2020-10-20T07:56:09.896637652Z 2020-10-20 07:56:09,889 - INFO [main:DelegatingClassLoader@246] - Loading plugin from: /kafka/connect/debezium-connector-postgres
2020-10-20T07:56:10.574470660Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@269] - Registered loader: PluginClassLoader{pl
uginLocation=file:/kafka/connect/debezium-connector-postgres/}
2020-10-20T07:56:10.574502062Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@198] - Added plugin 'io.debezium.connector.pos
tgresql.PostgresConnector
注册PG连接信息到connector
vim register-postgresql.json
{
"name": "fulfillment-connector", ①
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", ②
"tasks.max": "1", ③
"plugin.name": "wal2json_streaming", ④
"database.hostname": "postgresql", ⑤
"database.port": "5432", ⑥
"database.user": "postgres", ⑦
"database.password": "debezium", ⑧
"database.dbname" : "postgres", ⑨
"database.server.name": "fulfillment", ⑩
"schema.include.list": "inventory" ⑪
}
}
启动注册到Debezium PostgreSQL Connector中的连接服务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgresql.json
验证注册的信息
curl -H "Accept:application/json" localhost:8083/connectors/
["fulfillment-connector"]
2020-10-21T03:49:22.632581000Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - TaskConfig values:
2020-10-21T03:49:22.632592801Z task.class = class io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.632596701Z
2020-10-21T03:49:22.632948722Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:Worker@524] - Instantiated task fulfillm
ent-connector-0 with version 1.3.0.Final of type io.debezium.connector.postgresql.PostgresConnectorTask
2020-10-21T03:49:22.633286743Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - JsonConverterConfi
g values:
2020-10-21T03:49:22.634988447Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@543] - Set up the value converter
class org.apache.kafka.connect.json.JsonConverter for task fulfillment-connector-0 using the worker config
2020-10-21T03:49:22.634994347Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@550] - Set up the header converte
r class org.apache.kafka.connect.storage.SimpleHeaderConverter for task fulfillment-connector-0 using the worker config
停止并删除注册的连接
curl -X DELETE localhost:8083/connectors/fulfillment-connector
# select pg_drop_replication_slot('debezium')
docker exec -it a5d73008228a /bin/bash
root@a5d73008228a:/# psql -d postgres -U postgres
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+----------
slot_name | debezium
plugin | wal2json
slot_type | logical
datoid | 13067
database | postgres
temporary | f
active | t
active_pid | 1681
xmin |
catalog_xmin | 605
restart_lsn | /209B200
confirmed_flush_lsn | /209B238
postgres=# set search_path to inventory ;
SET
postgres=# \dt
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
postgres=# \d customers
Table "inventory.customers"
Column | Type | Collation | Nullable | Default
------------+------------------------+-----------+----------+---------------------------------------
id | integer | | not null | nextval('customers_id_seq'::regclass)
first_name | character varying(255) | | not null |
last_name | character varying(255) | | not null |
email | character varying(255) | | not null |
Topic: fulfillment.inventory.customers
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
…
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"source": {
"version": "1.3.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1603252163493,
"snapshot": "true",
"db": "postgres",
"schema": "inventory",
"table": "customers",
"txId": 602,
"lsn": 34078720,
"xmin": null
},
"op": "r",
"ts_ms": 1603252163495,
"transaction": null
}
}
相关文章