MongoDB-Elasticsearch 实时数据导入
搜索功能是App必不可少的一部分,我们使用目前比较流行的Elasticsearch进行全文检索。我们的数据主要存储在MongoDB中,如何将这些数据导入到Elasticsearch中,并能一直保持同步呢?做法大致分为两种:
- 在应用层操作,在读写MongoDB的同时读写Elasticsearch,比如mongoosastic,需要修改已有的业务代码。
- 与业务无关,通过读取MongoDB的replica oplog,将MongoDB产生的操作在Elasticsearch上replay,来实现单向同步。
为了减少老代码修改成本,我们选择了第二种方案,使用mongo-connector来进行数据同步。然而用着用着我们发现mongo-connector有一些问题:
- 有些数据需要关联查询,但是mongo-connector并不支持parent-child模型(其实有一个fork是支持的,但已经落后主分支一个版本,并且合进主分支的希望渺茫)。
- mongo-connector支持断点续传,但是恢复速度非常缓慢。
- mongo-connector可以设置每次处理的文档数量,但坑爹的地方在于,到不了设置的数字,它始终不会写入。比如,MongoDB一个表只有100个文档,但是设置了batch的size为1000,于是那100个文档这辈子也同步不到Elasticsearch中了。
- mongo-connector不会限速,直接把Elasticsearch写炸了,但它不会管,接着写,而且中间丢掉的数据就算后面有oplog里面有update操作,也没办法恢复,会报出404错误。
- 在MongoDB里面存了一张meta表,在Elasticsearch里面也存了一个meta索引,里面存了大量的timestamp,直接使Elasticsearch文档总数翻倍。
于是我们开始寻找更好用的工具,却发现没有好用的工具:
- Elasticsearch Rivers,曾经的官方同步工具。但该项目早已废弃。
-
Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,并且项目进度缓慢。
- elasticsearch-hadoop,先导到hadoop,再导到Elasticsearch。高射炮打蚊子,绕一大圈,不经济。
没办法,只好自己用*cript写一个,取名为mongo-es。
mongo-es导入数据分为两个阶段:
- Scan:扫描整个MongoDB的collection,每条文档都插入到Elasticsearch对应的index里面。使用Bulk API,进行批量写入。在扫描开始前记录当前的时间点,供第二阶段使用。
- Tail:从刚才记录的时间点,或一个指定的时间点开始,将MongoDB的oplog在Elasticsearch上进行replay。使用RxJS的bufferWithTimeOrCount函数,既能批量写入,又能保证同步延迟不会很长(一般是一秒左右)。
mongo-es比mongo-connector进步的地方有:
- 支持parent-child relationship,可以处理需要join的数据。
- 可以逆序Scan,先导入新的数据,这对于出错后重建索引快速恢复非常有用。
- 无需在两边存储多余元数据,只记录oplog的timestamp。只要程序挂的时间不太长,oplog里面还有这个timestamp,就能恢复。
- 遇到缺失文档自动恢复。当因为不可控因素(如网络原因),导致某个本应已经同步了的文档在Elasticsearch中不存在。这时如果oplog里面遇到一个对该文档的update操作,mongo-connector无法处理,打印出404错误。遇到这种情况时,mongo-es会回到MongoDB中,读取到这个文档,进行更新。
- 有限速功能,能够限制每秒钟读取的文档数量,避免把Elasticsearch压垮。
当然了,mongo-connector是一个更加通用的程序,可以把文档导到更多的地方。mongo-es只是把MongoDB的数据导入到Elasticsearch中,这样比较未免有些不公平,但就在MongoDB到Elasticsearch这个使用场景下,还是好用不少的。
开发过程中踩过的坑:
- Scan阶段使用stream,方便控制读取速度。Tail阶段使用cursor,配合noCursorTimeout参数,避免长时间没有oplog时的超时错误。Tail阶段如果用stream,即使是设置了noCursorTimeout,超时了也会报错。
- 对于操作是update的oplog,oplog里面有可能是一个完整的文档,这时候直接就可以写入。也有可能是$set或$unset操作,这时候要去Elasticsearch里面取到旧的,完整的文档,在内存里执行update后再写入回去。好不要直接读MongoDB,以减少MongoDB负担。
- 在内存中执行update时,也要检查变化的字段是否属于我们需要的字段。如果变化的都不是需要的字段,可以忽略这次update操作,如果变化的字段不在我们需要的范围内,则应排除,以减少写入次数。
- 有_parent的文档是不能直接用_id访问到的,因为它的routing是_parent,必须指定_parent的值才行。对于操作是update的oplog,我们只能拿到_id,拿不到_parent对应的字段,所以这时要用es.search代替es.get,访问每个分片,才能拿到文档。
- Timestamp在js代码里表示时low在前,high在后。在mongo shell里面是反过来的。
- Bulk API传入的body长度不能为0,遇到0的情况要跳过,否则会报错。
现已开源至github,并发布到了npm,欢迎大家多多试用,多挑(ti)毛(xu)病(qiu)。
相关文章