appengine-mapreduce 达到内存限制
问题描述
我正在开发 appengine-mapreduce 功能,并修改了演示以适合我的目的.基本上我有一百万行,格式如下:userid、time1、time2.我的目的是找到每个用户 ID 的 time1 和 time2 之间的差异.
I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.
但是,当我在 Google App Engine 上运行此程序时,我在日志部分遇到了以下错误消息:
However, as I run this on Google App Engine, I encountered this error message in the logs section:
在服务 130 个请求后,超过了 180.56 MB 的软私有内存限制在处理这个请求时,发现处理这个请求的进程使用了太多内存并被终止.这可能会导致一个新进程被用于对您的应用程序的下一个请求.如果您经常看到此消息,则您的应用程序可能存在内存泄漏.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('
')
for m in q:
reader = csv.reader([m.replace('', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d
" % (key, realtime)
谁能建议我如何更好地优化我的代码?谢谢!!
Can anyone suggest how else I can optimize my code better? Thanks!!
已
这是管道处理程序:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
其余文件与演示完全相同.
The rest of the files are exactly the same as the demo.
我已在 dropbox 上上传了我的代码副本:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
解决方案
您的输入文件的大小可能超过了软内存限制.对于大文件,请使用 BlobstoreLineInputReader
或 BlobstoreZipLineInputReader
.
It is likely your input file exceeds the soft memory limit in size. For big files use either BlobstoreLineInputReader
or BlobstoreZipLineInputReader
.
这些输入阅读器向 map
函数传递不同的东西,它们传递文件中的 start_position
和文本行.
These input readers pass something different to the map
function, they pass the start_position
in the file and the line of text.
您的 map
函数可能类似于:
Your map
function might look something like:
def time_count_map(data):
"""Time count map function."""
text = data[1]
try:
reader = csv.reader([text.replace('', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
使用 BlobstoreLineInputReader
将使作业运行得更快,因为它可以使用多个分片,最多 256 个,但这意味着您需要上传未压缩的文件,这可能会很痛苦.我通过将压缩文件上传到 EC2 windows 服务器来处理它,然后从那里解压缩并上传,因为上游带宽是如此之大.
Using BlobstoreLineInputReader
will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.
相关文章