Python中使用MongoDB和Elasticsearch的数据聚合和统计方法

2023-04-15 00:00:00 方法 统计 聚合

MongoDB和Elasticsearch都是非关系型数据库,它们在数据聚合和统计方面都提供了丰富的功能。

用Python连接MongoDB

首先需要安装pymongo库进行连接,安装方法:

pip install pymongo

然后连接数据库:

import pymongo

client = pymongo.MongoClient(host='localhost', port=27017)
db = client['test']

创建collection:

collection = db['students']

插入数据:

student = {
    'name': 'Mike',
    'age': 21,
    'gender': 'male'
}

result = collection.insert_one(student)
print(result.inserted_id)

查询数据:

results = collection.find({'age': 21})
for result in results:
    print(result)

根据条件更新数据:

condition = {'name': 'Mike'}
student = collection.find_one(condition)
student['age'] = 23
result = collection.update_one(condition, {'$set': student})
print(result.matched_count, result.modified_count)

删除数据:

result = collection.delete_one({'name': 'Mike'})
print(result.deleted_count)

用Python连接Elasticsearch

首先需要安装elasticsearch库进行连接,安装方法:

pip install elasticsearch

然后连接数据库:

from elasticsearch import Elasticsearch

es = Elasticsearch(['localhost:9200'])

创建索引:

request_body = {
    'settings': {
        'number_of_shards': 1,
        'number_of_replicas': 0
    },
    'mappings': {
        'properties': {
            'title': {'type': 'text'},
            'content': {'type': 'text'},
            'tags': {'type': 'keyword'},
            'date': {'type': 'date'}
        }
    }
}

es.indices.create(index='blog', body=request_body)

插入数据:

doc = {
    'title': 'Python and Elasticsearch',
    'content': 'Elasticsearch is fun, and Python makes it even more fun!',
    'tags': ['elasticsearch', 'python', 'pandas'],
    'date': '2021-06-01'
}

es.index(index='blog', body=doc)

查询数据:

search_body = {
    'query': {
        'match': {
            'content': 'python'
        }
    }
}

results = es.search(index='blog', body=search_body)
for result in results['hits']['hits']:
    print(result['_source'])

根据条件更新数据:

doc = {'content': 'Elasticsearch is fun, and Python makes it even more fun! And it is very useful in web applications.'}

es.update(index='blog', id='1', body={'doc': doc})

删除数据:

es.delete(index='blog', id='1')

MongoDB和Elasticsearch的聚合和统计方法

MongoDB和Elasticsearch都支持聚合和统计方法,下面进行具体介绍。

MongoDB的聚合方法

  1. 聚合管道(aggregation pipeline)

聚合管道是一种数据聚合和处理的方式,它可以对数据集合进行多个阶段的操作,包括筛选、投影、分组、排序、限制等,每个阶段的输出可以成为下一个阶段的输入,最终得到聚合结果。

例如,统计每个人的平均分数:

pipeline = [
    {'$match': {'score': {'$gt': 60}}},
    {'$group': {'_id': '$name', 'avg_score': {'$avg': '$score'}}},
    {'$sort': {'avg_score': -1}},
    {'$limit': 10}
]

results = collection.aggregate(pipeline)
for result in results:
    print(result)
  1. Map-Reduce

Map-Reduce是一种数据处理和分析方法,它将数据分成多个块进行并行处理,最终将结果合并为一个输出。

例如,统计每个人的分数总和:

map_fn = """
function () {
    emit(this.name, this.score);
}
"""

reduce_fn = """
function (key, values) {
    var total = 0;
    for (var i = 0; i < values.length; i++) {
        total += values[i];
    }
    return total;
}
"""

result = collection.map_reduce(map_fn, reduce_fn, 'result')
for doc in result.find():
    print(doc['_id'], doc['value'])

Elasticsearch的聚合方法

  1. 聚合桶(aggregation bucket)

聚合桶是一种数据聚合和处理方式,它可以将数据分成多个桶进行操作,每个桶可以根据不同的条件进行分组、筛选、排序等操作,最终得到聚合结果。

例如,统计每个标签的数量:

agg_body = {
    'aggs': {
        'tag_count': {
            'terms': {'field': 'tags'}
        }
    }
}

result = es.search(index='blog', body=agg_body)
for bucket in result['aggregations']['tag_count']['buckets']:
    print(bucket['key'], bucket['doc_count'])
  1. 指标聚合(metric aggregation)

指标聚合是一种数据聚合和处理方式,它可以对数据进行多种运算,例如求和、平均、最大、最小等,最终得到聚合结果。

例如,统计所有文章的平均长度:

agg_body = {
    'aggs': {
        'avg_length': {
            'avg': {'field': 'content.length'}
        }
    }
}

result = es.search(index='blog', body=agg_body)
print(result['aggregations']['avg_length']['value'])

以上是MongoDB和Elasticsearch的基本用法和聚合方法,可以根据实际情况进行灵活使用。

相关文章