PYSpark显示最大值(S)和多重排序
问题描述
感谢您在这里提供的帮助。使用Pyspark(请不能使用SQL)。因此,我有一个存储为RDD对的元组列表:
[((‘City1’,‘2020-03-27’,‘X1’),44),
(‘City1’,‘2020-03-28’,‘X1’),44),
(‘City3’,‘2020-03-28’,‘X3’),15),
((‘City4’,‘2020-03-27’,‘X4’),5),
((‘City4’,‘2020-03-26’,‘X4’),4),
(‘City2’,‘2020-03-26’,‘X2’),14),
((‘City2’,‘2020-03-25’,‘X2’),4),
((‘City4’,‘2020-03-25’,‘X4’),1),
((‘City1’,‘2020-03-29’,‘X1’),1),
((‘City5’,‘2020-03-25’,‘X5’),15)]
例如(‘City5’,‘2020-03-25’,‘X5’)作为关键字,15作为最后一对的值。
我希望获得以下结果:
City1,X1,2020-03-27,44
City1,X1,2020-03-28,44
城市5、X3、2020-03-25、15
City3,X3,2020-03-28,15
City2,X2,2020-03-26,14
城市4、X4、2020-03-27、5
请注意,结果显示为:
每个城市的最大值关键字(这是最难的部分,如果在不同日期有相似的最大值,要显示同一城市两次,我假设不能使用ReduceByKey(),因为关键字不唯一,可能是GroupBy()或Filter()?
按以下顺序排序:
- 最大值递减
- 升序日期
- 降序城市名称(例如:City1)
所以我尝试了以下代码:
res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
虽然它给了我最大值的城市,但如果两个城市在两个不同的日期有相似的最大值,它不会两次返回同一个城市(因为减去了键:City)。
我希望它足够清楚,任何精确度请询问! 非常感谢!
解决方案
要使所有城市的值都等于最大值,您仍然可以使用reduceByKey
,但使用数组而不是Over Value:
- 将行转换为键/值,值是元组数组而不是元组
- 按键减少,如果数组包含相同的值,则合并数组,否则保留具有最大值的数组,
reduceByKey
- 通过
flatMap
平面化Value数组,将键与它们合并
- 最后执行排序
完整代码如下:
def merge(array1, array2):
if array1[0][2] > array2[0][2]:
return array1
elif array1[0][2] == array2[0][2]:
return array1 + array2
else:
return array2
res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))
然后您可以打印您的RDD:
[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]
这与您的输入一起提供了以下输出:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
相关文章