MongoDb 实时(或接近实时)流出插入的数据
我有许多 MongoDB 集合,它们从各种流式源中获取许多 JSON 文档.换句话说,有许多进程不断地将数据插入到一组 MongoDB 集合中.
I have a number of MongoDB collections which take a number of JSON documents from various streaming sources. In other-words there a a number of processes which are continually inserting data into a set of MongoDB collections.
我需要一种将数据从 MongoDB 流式传输到下游应用程序的方法.所以我想要一个概念上看起来像这样的系统:
I need a way to stream the data out of MongoDB into downstream applications. So I want an system that conceptually looks like this:
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
或者这个:
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
问题是我如何从 Mongo 中流式传输数据而无需不断地轮询/查询数据库?
The question is how do I stream data out of Mongo without having to continually poll/query the database?
显而易见的问题答案是为什么不更改这些应用程序流式处理以将消息发送到 Rabbit、Zero 或 ActiveMQ 之类的队列,然后让它们像这样立即发送到您的 Mongo 流式处理和 Mongo":
The obvious question answer would be "why dont you change those app streaming processes to send messages to a Queue like Rabbit, Zero or ActiveMQ which then has them send to your Mongo Streaming processes and Mongo at once like this":
MONGODB
/|
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
在理想的世界中是的,这很好,但是我们需要 Mongo 来确保首先保存消息,避免重复并确保生成所有 ID 等.Mongo 必须位于中间作为持久层.
In an ideal world yes that would be good, but we need Mongo to ensure that messages are saved first, to avoid duplicates and ensure that IDs are all generated etc. Mongo has to sit in the middle as the persistence layer.
那么我如何将消息从 Mongo 集合(不使用 GridFS 等)流式传输到这些下游应用程序中.基本思想流派只是轮询新文档,收集到的每个文档通过向存储在数据库中的 JSON 文档添加另一个字段来更新它,就像 SQL 表中存储已处理时间戳的进程标志一样.IE.每 1 秒轮询已处理的文档 == null.... 添加已处理 = now().... 更新文档.
So how do I stream messages out of a Mongo collection (not using GridFS etc) into these down stream apps. The basic school of thought has been to just poll for new documents and each document that is collected update it by adding another field to the JSON documents stored in the database, much like a process flag in a SQL table that stores a processed time stamp. I.e. every 1 second poll for documents where processed == null.... add processed = now().... update document.
有没有更简洁/计算效率更高的方法?
Is there a neater/more computationally efficient method?
仅供参考 - 这些都是 Java 进程.
FYI - These are all Java processes.
干杯!
推荐答案
如果你正在写入一个有上限的集合(或多个集合),你可以使用 tailablecursor 将新数据推送到流上,或者推送到消息队列中,从那里可以流出.但是,这不适用于无上限的集合.
If you are writing to a capped collection (or collections), you can use a tailablecursor to push new data on the stream, or on a message queue from where it can be streamed out. However this will not work for a non-capped collection though.
相关文章