Spark编程之Spark SQL

2022-04-11 00:00:00 创建 模式 支持 样本 可以通过

Spark SQL

Spark SQL 允许 Spark 执行用SQLHiveQL或者Scala表示的关系查询。这个模块的核心是一个新类型的 RDD-SchemaRDD。

SchemaRDDs 由@Row:org.apache.spark.sql.catalyst.expressions.Row.type">行对象组成, 行对象拥有一个模式( scheme)来描述行中每一列的数据类型。SchemaRDD 与关系型数据库中的表很相似。可以通过存在的 RDD、 一个 Parquet 文件、 一个 JSON 数据库或者对存储在 Apache Hive 中的数据执行 HiveSQL 查询中创建

本章节的所有例子都利用了 Spark 分布式系统中的样本数据, 可以在 spark-shell 中运行它们。

让我们开始了解步吧

Spark 中所有相关功能的入口点是SQLContext 类或者它的子类,创建一个 SQLContext 的所有需要仅仅是一个 SparkContext。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
5. import sqlContext.createSchemaRDD

除了一个基本的 SQLContext, 我们也能够创建一个 HiveContext, 它支持基本 SQLContext 所支持功能的一个超集。

它的额外的功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启, SQLContext 可用的数据源对 HiveContext 也可用。

HiveContext 分开打包是为了避免在 Spark 构建时包含了所有的 Hive 依赖。如果对你的应用程序来说, 这些依赖不存在问题, Spark 1.2 推荐使用 HiveContext。以后的稳定版本将专注于为 SQLContext 提供与 HiveContext 等价的功能。

用来解析查询语句的特定 SQL 变种语言可以通过 spark.sql.dialect 选项来选择。这个参数可以通过两种方式改变:

  1. 一种方式是通过 setConf 方法设定
  2. 另一种方式是在 SQL 命令中通过 SETkey=value 来设定。对于 SQLContext, 可用的方言是“sql”, 它是 Spark SQL 提供的一个简单的 SQL 解析器。在 HiveContext 中, 虽然也支持”sql”, 但默认的方言是“hiveql”。这是因为 HiveQL 解析器更完整。

在很多用例中推荐使用“hiveql”。

Spark SQL 能操作什么数据源呢

Spark SQL 支持通过SchemaRDD接口操作各种数据源。一个 SchemaRDD 能够作为一个一般的 RDD 被操作, 也可以被注册为一个临时的表。注册一个 SchemaRDD 为一个表就可以允许你在其数据上运行 SQL 查询。

加载数据为 SchemaRDD 的多种方法

RDDs

Spark 支持两种方法将存在的 RDDs 转换为 SchemaRDDs。

  1. 种方法使用反射来推断包含特定对象类型的 RDD 的模式(schema)。在你写 spark 程序的同时, 当你已经知道了模式, 这种基于反射的方法可以使代码更简洁并且程序工作得更好。

  2. 创建 SchemaRDDs 的第二种方法是通过一个编程接口来实现, 这个接口允许你构造一个模式, 然后在存在的 RDDs 上使用它。虽然这种方法更冗长, 但是它允许你在运行期之前不知道列以及列的类型的情况下构造 SchemaRDDs。

    Spark SQL 的 Scala 接口支持将包含样本类的 RDDs 自动转换为 SchemaRDD。这个样本类定义了表的模式。给样本类的参数名字通过反射来读取, 然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个 RDD 可以隐式转化为一个 SchemaRDD, 然后注册为一个表。表可以在后续的 sql 语句中使用。

    • 利用反射推断模式
// sc is an existing SparkContext.
2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
3. // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
4. import sqlContext.createSchemaRDD

5. // Define the schema using a case class.
6. // Note: Case classes in Scala 2.10 can support only up to22 fields. To work around this limit,
7. // you can use custom classes that implement the Product interface.
8. case class Person(name: String, age: Int)
9. // Create an RDD of Person objects and register it as a table.
10. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p =>
Person(p(0), p(1).trim.toInt))
11. people.registerTempTable("people")

12 // SQL statements can be run by using the sql methods provided by sqlContext.
13 val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
14 The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
15// The columns of a row in the result can be accessed by ordinal.
16. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

  • 编程指定模式

当样本类不能提前确定( 例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 SchemaRDD 可以通过三步来创建。

  1. 原来的 RDD 创建一个行的 RDD
  2. 创建由一个 StructType 表示的模式与步创建的 RDD 的行结构相匹配
  3. 在行 RDD 上通过 applySchema 方法应用模式
1.// sc is an existing SparkContext.
2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
3.
4. // Create an RDD
5. val people = sc.textFile("examples/src/main/resources/people.txt")
6.
7. // The schema is encoded in a string
8. val schemaString = "name age"
9.
10. // Import Spark SQL data * and Row.
11. import org.apache.spark.sql._
12.
13. // Generate the schema based on the string of schema
14. val schema =
15. StructType(
16. schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
17.
18. // Convert records of the RDD (people) to Rows.
19. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
20.
21. // Apply the schema to the RDD.
22. val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
23.
24. // Register the SchemaRDD as a table.
25. peopleSchemaRDD.registerTempTable("people")
26.
27. // SQL statements can be run by using the sql methods provided by sqlContext.
28. val results = sqlContext.sql("SELECT name FROM people")
29.
30. // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
31. // The columns of a row in the result can be accessed by ordinal.
32. results.map(t => "Name: " + t(0)).collect().foreach(println)

相关文章