将MongoDB数据同步到Elasticsearch的Python库

2023-04-15 00:00:00 python mongodb 数据同步
  1. 安装Elasticsearch和MongoDB的Python客户端库

安装elasticsearch-py库和pymongo库:

pip install elasticsearch
pip install pymongo
  1. 连接MongoDB和Elasticsearch

使用pymongo库连接MongoDB:

import pymongo

mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
mongo_database = mongo_client["mydb"]
mongo_collection = mongo_database["mycollection"]

使用elasticsearch-py库连接Elasticsearch:

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  1. 创建索引和映射

在Elasticsearch中,数据存储在索引之中,索引存储在集群中,并且文档必须具有映射,以便正确定义和分析数据。

创建索引:

es.indices.create(index='myindex')

创建映射:

mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "content": {"type": "text"},
            "created_date": {"type": "date"}
        }
    }
}

es.indices.put_mapping(body=mapping, index='myindex')

在这个示例中,我们定义了一个包含3个字段的映射:标题、内容和创建日期。

  1. 同步数据到Elasticsearch

从MongoDB获取所有文档,并将它们传输到Elasticsearch:

for doc in mongo_collection.find():
    es.index(index='myindex', id=doc['_id'], body={
        'title': doc['title'],
        'content': doc['content'],
        'created_date': doc['created_date']
    })

在这个示例中,我们将文档中的标题、内容和创建日期传输到Elasticsearch中。请注意,“_id”是MongoDB中每个文档的唯一标识符,我们在文档ID中使用它。

完整代码示例:

import pymongo
from elasticsearch import Elasticsearch

# Connect to MongoDB
mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
mongo_database = mongo_client["mydb"]
mongo_collection = mongo_database["mycollection"]

# Connect to Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

# Create index and mapping
es.indices.create(index='myindex')

mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "content": {"type": "text"},
            "created_date": {"type": "date"}
        }
    }
}

es.indices.put_mapping(body=mapping, index='myindex')

# Sync data from MongoDB to Elasticsearch
for doc in mongo_collection.find():
    es.index(index='myindex', id=doc['_id'], body={
        'title': doc['title'],
        'content': doc['content'],
        'created_date': doc['created_date']
    })

相关文章