如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka

我尝试在 PostgreSQL 之间构建 Kafka 连接管道作为源到 SQL Server 作为目标.我使用了 3 个 Kafka broker,需要消费 252 个主题(一个主题与一张 PostgreSQL 表相同).运行一个多小时后,252张表中只能拉出218张.我发现的错误是 SQL Server 中存在死锁机制,可以将事务保存到 SQL Server 并尝试重试,Debezium 复制槽也已存在.

Hi i have try to build Kafka connect pipeline between PostgreSQL as source to SQL Server as the destination. I used 3 Kafka brokers, and need to consume 252 topics (one topics same as one PostgreSQL table). After run for more than an hour, it only can pull 218 out of 252 tables. The error that i found is there's deadlock mechanism in SQL Server which can hold transaction to SQL Server and try to retry it, also Debezium replication slot has been there.

我在接收器上使用最多 3 个工人的分布式连接器,但也许这似乎还不够.还可以尝试使用更高的 offset.time_out.ms 到 60000 和更高的偏移分区 (100).恐怕这不是我想要的生产水平.任何人都可以就此案提出建议吗?是否有任何计算可以确定我需要的最佳工人数量?

I use distributed connectors with 3 max worker on sink, but maybe it seems not enough. Also try with higher offset.time_out.ms to 60000 and higher offset partition (100). I'm afraid that this is not an production level that i want. Anyone can give suggestion about this case? Is there any calculation to decide best number of workers that i need?

更新

这里出现了一些错误.我看到一些连接器被杀死了.有人告诉我 死锁发生在 SQL SERVER 中 :

here some error i get. I see some connectors are killed. One tell me that deadlock happen in SQL SERVER :

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

2020 年 4 月 14 日更新

我仍然有这个问题,我忘了告诉我如何部署连接器.现在我使用 2 个工人,一个用于源,一个用于接收器.我在 csv 中列出我的所有表和 pk 并循环遍历行以创建连接器而无需睡眠或等待每分钟.我还为每个主题使用单个主题分区和 3 个副本.但是我仍然有sql server连接死锁

I still have problem with this, i forgot to tell about how i deploy the connectors. Now i use 2 workers, one for source and one for sink. I list all of my tables and pk in an csv and loop through rows to create the connectors without sleep or wait for every minutes. I also use single topics partition and 3 replica for each topics. But i still have sql server connection deadlock

推荐答案

问题可能是同时访问多个任务的同一个 SQL 表,并导致同步问题,如您提到的死锁.
由于您已经拥有大量主题,并且您的连接器可以并行访问它们,我建议您将每个主题的分区数减少到 1(减少分区数在Kafka,因此您应该删除并使用新的分区数重新创建每个主题).
这样,每个主题只有一个分区;每个分区只能在单个线程(/task/consumer)中访问,因此没有机会对同一个表进行并行 SQL 事务.

The problem may be accessing the same SQL table with multiple tasks in the same time and causing synchronization problems like deadlocks as you mentioned.
Since you already have a large number of topics, and your connector can access them in parallel, I would suggest you to reduce the number partitions for every topic to just 1 (reduce number of partitions is not supported in Kafka so you should delete and recreate every topic with the new number of partitions).
This way, every topic have only one partition; every partition can be accessed only in a single thread(/task/consumer) so there is no chance for parallel SQL transactions to the same table.

或者,更好的方法是创建一个包含 3 个分区的主题(与您拥有的任务/消费者数量相同),并让 生产者使用 SQL 表名作为消息键.
Kafka 保证具有相同键的消息总是转到同一个分区,因此具有相同表的所有消息将驻留在单个分区上(单线程消耗).

Alternatively, a better approach is to create a single topic with 3 partitions (same as the number of tasks/consumers you have) and make the producer use the SQL table name as the message key.
Kafka guarantees messages with the same key to always go to the same partition, so all the messages with the same table will reside on a single partition (single thread consuming).

如果你觉得有用,我可以附上更多关于如何创建 Kafka Producer 和发送密钥消息的信息.

If you find it useful, I can attach more information about how to create Kafka Producer and send keyed messages.

相关文章