使用 Apache Spark 将键值对缩减为键列表对
问题描述
我正在编写一个 Spark 应用程序,并希望将一组键值对 (K, V1), (K, V2), ..., (K, Vn)
组合成一个键-多值对(K, [V1, V2, ..., Vn])
.我觉得我应该能够使用具有某种风味的 reduceByKey
函数来做到这一点:
I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn)
into one Key-Multivalue pair (K, [V1, V2, ..., Vn])
. I feel like I should be able to do this using the reduceByKey
function with something of the flavor:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
发生这种情况时我得到的错误是:
The error that I get when this occurs is:
NoneType"对象没有附加"属性.
'NoneType' object has no attribue 'append'.
我的键是整数,值 V1,...,Vn 是元组.我的目标是使用键和值列表(元组)创建一对.
My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).
解决方案
Map和ReduceByKey
reduce
的输入类型和输出类型必须相同,所以如果你想聚合一个列表,你必须map
输入到列表.然后将这些列表合并为一个列表.
Input type and output type of reduce
must be the same, therefore if you want to aggregate a list, you have to map
the input to lists. Afterwards you combine the lists into one list.
组合列表
您需要一种将列表合并为一个列表的方法.Python 提供了一些组合列表的方法.
You'll need a method to combine lists into one list. Python provides some methods to combine lists.
append
修改第一个列表,并且总是返回 None
.
append
modifies the first list and will always return None
.
x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]
extend
做同样的事情,但解开列表:
extend
does the same, but unwraps lists:
x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]
这两种方法都返回 None
,但您需要一个返回组合列表的方法,因此只需 使用加号.
Both methods return None
, but you'll need a method that returns the combined list, therefore just use the plus sign.
x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]
火花
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" "))
.map(lambda actor: (actor.split(",")[0], actor))
# transform each value into a list
.map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ]))
# combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
.reduceByKey(lambda a, b: a + b)
<小时>
组合键
也可以使用 combineByKey
来解决这个问题,它在内部用于实现 reduceByKey
,但它更复杂并且 在 Spark 中使用一种专门的按键组合器可以更快"一个>.对于上面的解决方案,您的用例已经足够简单了.
It's also possible to solve this with combineByKey
, which is used internally to implement reduceByKey
, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.
GroupByKey
也可以使用 groupByKey
、但它会减少并行化,因此对于大数据集可能会慢得多.
It's also possible to solve this with groupByKey
, but it reduces parallelization and therefore could be much slower for big data sets.
相关文章