Spark SQL/Hive 查询永远需要加入

2021-11-14 00:00:00 apache-spark apache-spark-sql mysql

所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中.

So I'm doing something that should be simple, but apparently it's not in Spark SQL.

如果我在 MySQL 中运行以下查询,查询会在几分之一秒内完成:

If I run the following query in MySQL, the query finishes in a fraction of a second:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒.添加更多连接会使查询运行很长时间(超过 10 分钟).我不确定我在这里做错了什么以及如何加快速度.

However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.

这些表是 MySQL 表,它们作为临时表加载到 Hive 上下文中.它在单个实例中运行,数据库在远程机器上.

The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.

  • 用户表大约有 480 万行.
  • user_address 表有 350,000 行.

表有外键字段,但在数据库中没有定义明确的 fk 关系.我正在使用 InnoDB.

The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.

Spark 中的执行计划:

The execution plan in Spark:

计划:

扫描JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc,{user=, password=, url=jdbc:mysql://, dbtable=user})[address_id#0L,user_address_id#27L]

Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]

过滤器 (user_id#0L = 123) 扫描JDBCRelation(jdbc:mysql://.user_address,[Lorg.apache.spark.Partition;@2ce558f3,{user=, password=,url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

ConvertToUnsafe ConvertToUnsafe

ConvertToUnsafe ConvertToUnsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchangehashpartitioning(user_address_id#27L) TungstenSort [address_id#52LASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

SortMergeJoin [user_address_id#27L], [address_id#52L]

SortMergeJoin [user_address_id#27L], [address_id#52L]

== 物理计划 == TungstenProject [address_id#0L]

== Physical Plan == TungstenProject [address_id#0L]

推荐答案

首先,您执行的查询类型极其低效.至于现在(Spark 1.5.0*)要执行这样的连接,每次执行查询时都必须对两个表进行混洗/散列分区.对于 users 表,其中 user_id = 123 谓词最有可能被下推,但仍然需要对 user_address.

First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of users table where user_id = 123 predicate is most likely pushed-down but still requires full shuffle on user_address.

此外,如果表只注册而不缓存,那么每次执行此查询都会从 MySQL 获取整个 user_address 表到 Spark.

Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole user_address table from MySQL to Spark.

我不确定我在这里做错了什么以及如何加快速度.

I'm not sure what I'm doing wrong here and how I can speed things up.

不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里.

It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.

一般来说,如果应用程序逻辑需要单条记录访问,那么 Spark SQL 的性能就不会很好.它专为分析查询而设计,而不是作为 OLTP 数据库的替代品.

Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.

如果单个表/数据框小得多,您可以尝试广播.

If a single table / data frame is much smaller you could try broadcasting.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

<小时>

* 这应该在 Spark 1.6.0 中改变,SPARK-11410应该启用持久表分区.

相关文章