appengine-mapreduce 达到内存限制

问题描述

我正在开发 appengine-mapreduce 功能,并修改了演示以适合我的目的.基本上我有一百万行,格式如下:userid、time1、time2.我的目的是找到每个用户 ID 的 time1 和 time2 之间的差异.

I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.

但是,当我在 Google App Engine 上运行此程序时,我在日志部分遇到了以下错误消息:

However, as I run this on Google App Engine, I encountered this error message in the logs section:

在服务 130 个请求后,超过了 180.56 MB 的软私有内存限制在处理这个请求时,发现处理这个请求的进程使用了​​太多内存并被终止.这可能会导致一个新进程被用于对您的应用程序的下一个请求.如果您经常看到此消息,则您的应用程序可能存在内存泄漏.

def time_count_map(data):
  """Time count map function."""
  (entry, text_fn) = data
  text = text_fn()

  try:
    q = text.split('
')
    for m in q:
        reader = csv.reader([m.replace('', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
  except IndexError, e:
    logging.debug(e)


def time_count_reduce(key, values):
  """Time count reduce function."""
  time = 0.0
  for subtime in values:
    time += float(subtime)
    realtime = int(time)
  yield "%s: %d
" % (key, realtime)

谁能建议我如何更好地优化我的代码?谢谢!!

Can anyone suggest how else I can optimize my code better? Thanks!!

这是管道处理程序:

class TimeCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Time count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "time_count",
        "main.time_count_map",
        "main.time_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=32)
    yield StoreOutput("TimeCount", filekey, output)

Mapreduce.yaml:

Mapreduce.yaml:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4
- name: Make messages upper case
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.upper_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4

其余文件与演示完全相同.

The rest of the files are exactly the same as the demo.

我已在 dropbox 上上传了我的代码副本:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip


解决方案

您的输入文件的大小可能超过了软内存限制.对于大文件,请使用 BlobstoreLineInputReaderBlobstoreZipLineInputReader.

It is likely your input file exceeds the soft memory limit in size. For big files use either BlobstoreLineInputReader or BlobstoreZipLineInputReader.

这些输入阅读器向 map 函数传递不同的东西,它们传递文件中的 start_position 和文本行.

These input readers pass something different to the map function, they pass the start_position in the file and the line of text.

您的 map 函数可能类似于:

Your map function might look something like:

def time_count_map(data):
    """Time count map function."""
    text = data[1]

    try:
        reader = csv.reader([text.replace('', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
    except IndexError, e:
        logging.debug(e)

使用 BlobstoreLineInputReader 将使作业运行得更快,因为它可以使用多个分片,最多 256 个,但这意味着您需要上传未压缩的文件,这可能会很痛苦.我通过将压缩文件上传到 EC2 windows 服务器来处理它,然后从那里解压缩并上传,因为上游带宽是如此之大.

Using BlobstoreLineInputReader will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.

相关文章