使用 RabbitMQ 源的 Spark 结构化流
我正在尝试为 Structured Streaming
编写一个自定义接收器,它将使用来自 RabbitMQ
的消息.Spark
最近发布 DataSource V2 API,看起来很有前途.由于它抽象了许多细节,我想使用这个 API 以既简单又性能好.但是,由于它很新,因此可用的资源并不多.我需要经验丰富的 Spark
人员的说明,因为他们会更容易掌握关键点.我们开始:
I am trying to write a custom receiver for Structured Streaming
that will consume messages from RabbitMQ
.
Spark
recently released DataSource V2 API, which seems very promising. Since it abstracts away many details, I want to use this API for the sake of both simplicity and performance. However, since it's quite new, there are not many sources available. I need some clarification from experienced Spark
guys, since they will grasp the key points easier. Here we go:
我的起点是博客文章系列,第一部分 这里.它展示了如何在没有流式传输功能的情况下实现数据源.为了制作流媒体源,我稍微改变了它们,因为我需要实现 MicroBatchReadSupport 代替(或补充)DataSourceV2.
My starting point is the blog post series, with the first part here. It shows how to implement a data source, without streaming capability. To make a streaming source, I slightly changed them, since I need to implement MicroBatchReadSupport instead of (or in addition to) DataSourceV2.
为了提高效率,明智的做法是让多个 spark 执行器同时使用 RabbitMQ
,即来自同一个队列.如果我不感到困惑,输入的每个分区 - 在 Spark
的术语中 - 对应于队列中的消费者 - 在 RabbitMQ
术语中.因此,我们需要为输入流设置多个分区,对吧?
To be efficient, it's wise to have multiple spark executors consuming RabbitMQ
concurrently, i.e. from the same queue. If I'm not confused, every partition of the input -in Spark
's terminology- corresponds to a consumer from the queue -in RabbitMQ
terminology. Thus, we need to have multiple partitions for the input stream, right?
与 该系列的第 4 部分类似,我实现了 MicroBatchReader如下:
Similar with part 4 of the series, I implemented MicroBatchReader as follows:
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}
我正在返回一个工厂列表,并希望列表中的每个实例都将用于创建一个读取器,该读取器也是一个消费者.这种方法正确吗?
I am returning a list of factories, and hope that every instance in the list will be used to create a reader, which will be also a consumer. Is that approach correct?
我希望我的接收器是可靠的,即在每条处理过的消息之后(或至少写入检查点目录以进行进一步处理),我需要将其返回给 RabbitMQ
.问题从这里开始:这些工厂是在驱动程序中创建的,实际的读取过程通过 DataReaders.但是,commit 方法是 MicroBatchReader
的一部分,而不是 DataReader代码>.由于每个
MicroBatchReader
有许多 DataReader
,我应该如何将这些消息回传给 RabbitMQ
?或者我应该确认 next 方法在 DataReader
上被调用?安全吗?如果是这样,那么 commit
函数的目的是什么?
I want my reciever to be reliable, i.e. after every processed message (or at least written to chekpoint directory for further processing), I need to ack it back to RabbitMQ
. The problem starts after here: these factories are created at the driver, and the actual reading process takes place at executors through DataReaders. However, the commit method is a part of MicroBatchReader
, not DataReader
. Since I have many DataReader
s per MicroBatchReader
, how should I ack these messages back to RabbitMQ
? Or should I ack when the next method is called on DataReader
? Is it safe? If so, what is the purpose of commit
function then?
澄清: OBFUSCATION:答案中提供的有关重命名某些类/函数的链接(除了那里的解释)让一切更加清晰 比以往任何时候都更糟.引用 那里:
CLARIFICATION: OBFUSCATION: The link provided in the answer about the renaming of some classes/functions (in addition to the explanations there) made everything much more clear worse than ever. Quoting from there:
重命名:
DataReaderFactory
到InputPartition
DataReader
到 InputPartitionReader
...
InputPartition
的目的是管理关联的阅读器,现在称为 InputPartitionReader
,带有显式创建操作以镜像关闭操作.这是没有从 API 中清除的时间更长,因为 DataReaderFactory
似乎更多比它更通用,并且不清楚为什么要生产一组它们阅读.
InputPartition
's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader
, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory
appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.
但是,docs 明确表示读取器工厂将被序列化并发送到执行器,然后将在执行器上创建数据读取器并进行实际读取."
However, the docs clearly say that "the reader factory will be serialized and sent to executors, then the data reader will be created on executors and do the actual reading."
为了使消费者可靠,我必须仅在特定消息在 Spark 端提交后才对它进行 ACK.请注意消息必须在传递消息的同一连接上进行确认,但在驱动程序节点调用提交函数.如何在 worker/executor 节点上提交?
To make the consumer reliable, I have to ACK for a particular message only after it is committed at Spark side. Note that the messages have to be ACKed on the same connection that it has been delivered through, but commit function is called at driver node. How can I commit at the worker/executor node?
推荐答案
> 我正在返回一个工厂列表,并希望列表中的每个实例都用于创建一个读取器,它也是一个消费者.这种方法正确吗?源 [socket][1] 源实现有一个线程将消息推送到内部 ListBuffer.换句话说,有一个消费者(线程)填充了内部 ListBuffer,它**然后**被`planInputPartitions`划分为多个分区(`createDataReaderFactories` [renamed][2] 到 `planInputPartitions`).此外,根据 [MicroBatchReadSupport][3] 的 Javadoc> 执行引擎将在流式查询开始时创建一个微批处理读取器,为每个要处理的批处理交替调用 setOffsetRange 和 createDataReaderFactories,然后在执行完成时调用 stop().请注意,由于重新启动或故障恢复,单个查询可能会执行多次.换句话说,`createDataReaderFactories` 应该被调用 **multiple** 次,据我了解,这表明每个 `DataReader` 负责一个静态输入分区,这意味着 DataReader 不应该是消费者.----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是这样,那么 commit 函数的目的是什么?提交函数的部分基本原理可能是防止 MicroBatchReader 的内部缓冲区变大.通过提交偏移量,您可以有效地从缓冲区中删除小于偏移量的元素,因为您承诺不再处理它们.您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代码中看到这种情况<小时><删除>我不确定是否要实现一个可靠的接收器,所以我希望一个更有经验的 Spark 人能过来解决你的问题,因为我也有兴趣!希望这可以帮助!
编辑
我只研究了 socket 和 wiki-edit 来源.这些资源还没有准备好生产,这是问题所在.相反, kafka 源是更好的起点,与前面提到的源不同,它有多个像作者一样的消费者正在寻找.
I had only studied the socket, and wiki-edit sources. These sources are not production ready, which is something that the question was was not looking for. Instead, the kafka source is the better starting point which has, unlike the aforementioned sources, multiple consumers like the author was looking for.
但是,也许如果您正在寻找不可靠的来源,上面的套接字和 wikiedit 来源提供了一个不太复杂的解决方案.
However, perhaps if you're looking for unreliable sources, the socket and wikiedit sources above provide a less complicated solution.
相关文章