在 Google App Engine 中使用 mapreduce 的简单反例
问题描述
我对 GAE 中 mapreduce 支持的当前状态有些困惑.根据文档 http://code.google.com/p/appengine-mapreduce/ 减少阶段是尚不支持,但在 I/O 2011 的会话描述中(http://www.youtube.com/watch?v=EIxelKcyCC0 ) 上面写着现在可以在 App Engine 上运行完整的 Map Reduce 作业".我想知道我是否可以在这个任务中使用 mapreduce:
I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:
我想做的事:
我有带有字段颜色的模型汽车:
I have model Car with field color:
class Car(db.Model):
color = db.StringProperty()
我想运行 mapreduce 进程(不时,由 cron 定义),它可以计算每种颜色的汽车数量,并将结果存储在数据存储中.似乎是一项非常适合 mapreduce 的工作(但如果我错了,请纠正我),阶段map"将为每个 Car 实体生成对 (, 1),阶段reduce"应该通过 color_name 合并这些数据给我预期的结果.我想得到的最终结果是计算数据存储在数据存储区中的实体,如下所示:
I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:
class CarsByColor(db.Model):
color_name = db.StringProperty()
cars_num = db.IntegerProperty()
问题:我不知道如何在 appengine 中实现这一点……视频显示了定义了 map 和 reduce 函数的示例,但它们似乎是与数据存储区无关的非常一般的示例.我发现的所有其他示例都使用一个函数来处理来自 DatastoreInputReader 的数据,但它们似乎只是映射"阶段,没有示例说明如何执行减少"(以及如何将减少结果存储在数据存储).
Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).
解决方案
我在这里提供解决方案,我最终发现使用 GAE 中的 mapreduce(没有 reduce 阶段).如果我从头开始,我可能会使用 Drew Sears 提供的解决方案.
I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.
它适用于 GAE python 1.5.0
It works in GAE python 1.5.0
在 app.yaml 我添加了 mapreduce 的处理程序:
In app.yaml I added the handler for mapreduce:
- url: /mapreduce(/.*)?
script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py
以及我的 mapreduce 代码的处理程序(我使用 url/mapred_update 来收集 mapreduce 产生的结果):
and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):
- url: /mapred_.*
script: mapred.py
创建 mapreduce.yaml 用于处理 Car 实体:
Created mapreduce.yaml for processing Car entities:
mapreduce:
- name: Color_Counter
params:
- name: done_callback
value: /mapred_update
mapper:
input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
handler: mapred.process
params:
- name: entity_kind
default: models.Car
解释:done_callback是一个url,在mapreduce完成操作后调用.mapred.process 是一个处理单个实体和更新计数器的函数(它在 mapred.py 文件中定义).模型 Car 在 models.py
Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py
mapred.py:
from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
def process(entity):
"""Process individual Car"""
color = entity.color
if color:
yield op.counters.Increment('car_color_%s' % color)
class UpdateCounters(webapp.RequestHandler):
"""Create stats models CarsByColor based on the data
gathered by mapreduce counters"""
def post(self):
"""Called after mapreduce operation are finished"""
# Finished mapreduce job id is passed in request headers
job_id = self.request.headers['Mapreduce-Id']
state = MapreduceState.get_by_job_id(job_id)
to_put = []
counters = state.counters_map.counters
# Remove counter not needed for stats
del counters['mapper_calls']
for counter in counters.keys():
stat = CarsByColor.get_by_key_name(counter)
if not stat:
stat = CarsByColor(key_name=counter,
name=counter)
stat.value = counters[counter]
to_put.append(stat)
db.put(to_put)
self.response.headers['Content-Type'] = 'text/plain'
self.response.out.write('Updated.')
application = webapp.WSGIApplication(
[('/mapred_update', UpdateCounters)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
与问题相比,CarsByColor 模型的定义略有变化.
There is slightly changed definition of CarsByColor model compared to question.
您可以从 url 手动启动 mapreduce 作业:http://yourapp/mapreduce/ 并希望从 cron (我还没有测试 cron).
You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).
相关文章