带 MRJob 的多个输入

2022-01-13 00:00:00 python mapreduce mrjob

问题描述

我正在尝试学习将 Yelp 的 Python API 用于 MapReduce,MRJob.他们简单的单词计数器示例很有意义,但我很好奇如何处理涉及多个输入的应用程序.例如,不是简单地计算文档中的单词,而是将向量乘以矩阵.我想出了这个解决方案,它可以工作,但感觉很傻:

I'm trying to learn to use Yelp's Python API for MapReduce, MRJob. Their simple word counter example makes sense, but I'm curious how one would handle an application involving multiple inputs. For instance, rather than simply counting the words in a document, multiplying a vector by a matrix. I came up with this solution, which functions, but feels silly:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

这段代码是运行 ./matrix.py <input.txt 之所以起作用,是因为矩阵按列存储在 input.txt 中,相应的向量值位于行尾.

This code is run ./matrix.py < input.txt and the reason it works is that the matrix stored in input.txt by columns, with the corresponding vector value at the end of the line.

所以,下面的矩阵和向量:

So, the following matrix and vector:

表示为 input.txt 为:

are represented as input.txt as:

简而言之,我将如何将矩阵和向量更自然地存储在单独的文件中并将它们都传递到 MRJob 中?

In short, how would I go about storing the matrix and vector more naturally in separate files and passing them both into MRJob?


解决方案

如果您需要针对另一个(或相同的 row_i、row_j)数据集处理原始数据,您可以:

If you're in need of processing your raw data against another (or same row_i, row_j) data set, you can either:

1) 创建一个 S3 存储桶来存储数据的副本.将此副本的位置传递给您的任务类,例如下面代码中的 self.options.bucket 和 self.options.my_datafile_copy_location .警告:不幸的是,似乎整个文件必须在处理之前下载"到任务机器.如果连接失败或加载时间过长,此作业可能会失败.这是一些执行此操作的 Python/MRJob 代码.

1) Create an S3 bucket to store a copy of your data. Pass the location of this copy to your task class, e.g. self.options.bucket and self.options.my_datafile_copy_location in the code below. Caveat: Unfortunately, it seems that the whole file must get "downloaded" to the task machines before getting processed. If the connections falters or takes too long to load, this job may fail. Here is some Python/MRJob code to do this.

把它放在你的映射器函数中:

Put this in your mapper function:

d1 = line1.split('	', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('
'):
    d2 = line2.split('	', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) 创建一个 SimpleDB 域,并将所有数据存储在其中.在这里阅读 boto 和 SimpleDB:http://code.google.com/p/boto/wiki/SimpleDbIntro

2) Create a SimpleDB domain, and store all of your data in there. Read here on boto and SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

您的映射器代码如下所示:

Your mapper code would look like this:

dline = dline.strip()
d0 = dline.split('	', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

如果您有大量数据,第二个选项可能会执行得更好,因为它可以对每一行数据而不是一次全部数据进行请求.请记住,SimpleDB 值的长度最多只能为 1024 个字符,因此如果您的数据值长于此,您可能需要通过某种方法进行压缩/解压缩.

This second option may perform better if you have very large amounts of data, since it can make the requests for each row of data rather than the whole amount at once. Keep in mind that SimpleDB values can only be a maximum of 1024 characters long, so you may need to compress/decompress via some method if your data values are longer than that.

相关文章