在 PySpark 中进行排序减少的最有效方法是什么?

问题描述

我正在分析 2015 年以来美国国内航班的准点率记录.我需要按尾号分组,并将每个尾号的所有航班的日期排序列表存储在数据库中,以供我的应用程序检索.我不确定实现这一目标的两种选择中哪一种是最好的.

I am analyzing on-time performance records of US domestic flights from 2015. I need to group by tail number, and store a date sorted list of all the flights for each tail number in a database, to be retrieved by my application. I am not sure which of two options for achieving this is the best one.

# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')

# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x: 
  (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
  )

我可以在减少排序中实现这一点...

I can achieve this in a reduce sort...

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest
flights_per_airplane = flights
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))
  .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))

或者我可以在后续的地图作业中实现它...

Or I can achieve it in a subsequent map job...

# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))
  .reduceByKey(lambda a, b: a + b)
  .map(lambda tuple: 
    (
      tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
    )

在 reduce 中这样做似乎效率很低,但实际上两者都很慢.sorted() 看起来像在 PySpark 文档中执行此操作的方式,所以我想知道 PySpark 是否在内部没有使这个犹太洁食?出于其他原因,哪个选项是最有效或最佳选择?

Doing this in the reduce seems really inefficient, but in fact both are very slow. sorted() looks like the way to do this in the PySpark docs, so I'm wondering if PySpark doesn't make this kosher internally? Which option is the most efficient or the best choice for some other reason?

我的代码也在这里:https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

如果您对这些数据感到好奇,它来自交通统计局,这里是:http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=准时

If you're curious about the data, it is from the Bureau of Transportation Statistics, here: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time


解决方案

不幸的是,在你开始排序之前这两种方法都是错误的,在 Spark 中没有有效和简单的方法来做到这一点.不过,第一个明显比另一个差.

Unfortunately both ways are wrong before you even start sorting and there is no effective and simple way of doing this in Spark. Still, the first one is significantly worse than the other.

为什么两种方式都错了?因为它只是另一个 groupByKey 并且它只是一个昂贵的操作.有一些方法可以尝试改进(尤其是避免减少地图方面),但归根结底,你只需要付出完全洗牌的代价,如果你没有看到任何失败,那可能不值得大惊小怪.

Why both ways are wrong? Because it is just another groupByKey and it is simply an expensive operation. There are some ways you can try to improve things (especially to avoid map side reduction) but at the end of the day you just have to pay the price of a full shuffle and if you don't see any failures it is probably not worth all the fuss.

不过,第二种方法在算法上要好得多*.如果您想像第一次尝试那样一直保持排序结构,您应该使用专用工具(aggregateByKeybisect.insort 将是一个不错的选择)但确实有在这里没有任何收获.

Still, the second approach is much better algorithmically*. If you want to keep sorted structure all the way through like in the first attempt you should dedicated tools (aggregateByKey with bisect.insort would be a good choice) but there is really nothing to gain here.

如果分组输出是硬性要求,您可以做的最好的事情是 keyBygroupByKey 和排序.与第二种解决方案相比,它不会提高性能,但可以说会提高可读性:

If the grouped output is a hard requirement the best thing you can do is to keyBy, groupByKey and sort. It won't improve performance over the second solution but arguably will improve readability:

(flights
    .keyBy(lambda x: x[5])
    .groupByKey()
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])))

<小时>

* 即使您假设 Timsort 的最佳情况,第一种方法也是 N 次 O(N) 而第二个是 O(N log N) 在最坏的情况下.


* Even if you assume the best case scenario for Timsort the first approach is N times O(N) while the second one is O(N log N) in the worst case scenario.

相关文章