MongoDB 4.2 内核解析 – Change Stream
MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增强),用于订阅 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也可以将 MongoDB 的增量订阅应用到其他的关联系统;比如电商场景里,MongoDB 里存储新的订单信息,业务需要根据新增的订单信息去通知库存管理系统发货。
Change Stream 与 Tailing Oplog 对比
在 change stream 功能之前,如果要获取 MongoDB 增量的修改,可以通过不断 tailing oplog
的方式来 拉取增量的 oplog ,然后针对拉取到的 oplog 集合,来过滤满足条件的 oplog。这种方式也能满足绝大部分场景的需求,但存在如下的不足。
- 使用门槛较高,用户需要针对 oplog 集合,打开特殊选项的的 tailable cursor (“tailable”: true, “awaitData” : true)。
- 用户需要自己管理增量续传,当拉取应用 crash 时,用户需要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
- 结果过滤必须在拉取侧完成,但只需要订阅部分 oplog 时,比如针对某个 DB、某个 Collection、或某种类型的操作,必须要把左右的 oplog 拉取到再进行过滤。
- 对于 update 操作,oplog 只包含操作的部分内容,比如
{$set: {x: 1}}
,而应用经常需要获取到完整的文档内容。 - 不支持 Sharded Cluster 的订阅,用户必须针对每个 shard 进行 tailing oplog,并且这个过程中不能有 moveChunk 操作,否则结果可能乱序。
MongoDB Change Stream 解决了 Tailing oplog 存在的不足
- 简单易用,提供统一的 Change Stream API,一次 API 调用,即可从 MongoDB Server 侧获取增量修改。
- 统一的进度管理,通过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,即可从上次的位置接着订阅。
- 支持对结果在 Server 端进行 pipeline 过滤,减少网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
- 支持 fullDocument: “updateLookup” 选项,对于 update,返回当时对应文档的完整内容。
- 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,即可获取集群维度全局有序的修改。
Change Stream 实战
以 Mongo shell 为例,使用 Change Stream 非常简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操作。
db.getMongo().watch() 订阅整个实例的修改
db.watch() 订阅指定DB的修改
db.collection.watch() 订阅指定Collection的修改
相关文章