Spark SQL简单介绍

2022-04-11 00:00:00 视图 创建 数据 函数 操作

Spark SQL概述

Spark SQL是用于结构化数据处理的Spark模块。与Spark RDD API不同,Spark SQL提供的接口可以为Spark提供更多的数据及其计算时的结构信息。Spark使用这些额外的信息实现一些优化。用户可以使用SQL或者Dataset API对SparkSQL进行操作。由于它和用户使用的API或者开发语言无关,这意味着开发人员可以在多个API中轻松地切换

SQL

SparkSQL可以用来执行SQL查询,从一个Hive实例中查询数据。可以从Hive Tables - Spark 3.1.1 Documentation (apache.org)中了解更多的配置。

当使用另外一种编程语言中运行SQL时,结果将作为Dataset/DataFrame
返回。也能使用命令行或者JDBC对SQL进行操作

Datasets和DataFrames

dataset是分布式的数据集。Dataset在Spark1.6之后加入,结合了RDD的优点(强类型,lambda函数的能力),和Spark SQL的优化执行引擎的优点。数据集能从JVM对象中创建,并使用transformation
对其进行操作。python暂不支持Dataset API。但是由于python的动态特性,Dataset API的很多优点实际在python中可以体现。

DataFrame是按列为名称构造的Dataset。它在概念上等价于关系型数据库或者R/Python中的视图,不过在引擎层面有着更加丰富的优化。DataFrame可以从各种各样的数据源中构建,例如:结构数据文件,Hive表,第三方数据库,或者是一个存在的RDD。DataDrameAPI可以供Scala、Java、Python、R使用。在Java或者Scala中,DataFrame是由Rows的Dataset来表示

起步

SparkSession

Spark中所有功能的入口都是SparkSession
类。我们使用SparkSession.builder()
创建一个基础的SparkSession

scala> import org.apache.spark.sql.SparkSession

scala> val spark = SparkSession.
     | builder().
     | appName("Spark SQL basic example").
     | config("spark.some.config.option""some-value").
     | getOrCreate


SparkSession是Spark2.0开始为Hive特性提供的内部支持,包括使用HiveQL进行查询,访问Hive UDFs以及读取Hive table的数据。

创建DataFrame

当Spark Session创建完成之后,我们就能使用RDD、Hive table或者是其他Spark数据源来创建一个DataFrame。

scala> val df = spark.read.json("examples/src/main/resources/people.json")

scala> df.show

// people.json
{"name":"Michael"}
{"name":"Andy""age":30},
{"name":"Justin""age":19}

Untyped Dataset操作(又名DataFrame操作)

如上文所述,DataFrame是Java和Scala API中Rows类型的Dataset。和Java/Scala中的强类型Dataset不同,这些操作以一种无类型transformation的形式引用。

这里举出了一些使用Dataset的结构化数据的例子:

import spark.implicits._

// 以树形的格式打印视图(元数据)
df.printScheme()

// 选择名称为"name"的一列
df.select("name").show()

// 选择"name",和"age",并将age的数值加一
df.select($"name", $"age" + 1).show()

// 选择age > 21的成员
df.filter($"age" > 21).show()

// 以年龄为分组
df.groupBy("age").count().show()

完整的操作见API Documentation

除了简单的列引用和表达式,Dataset还提供了丰富的类库,包括:字符串操作,日期算术,常见的数学运算等等。参考DataFrame Function Reference

SQL编程

Spark的sql
函数能够让SQL以编程形式运行并将一个DataFrame作为结果返回

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show

全局临时视图

Spark SQL的临时视图作用域是会话域,如果创建视图的会话终止,那么视图也会消失。如果希望能够在所有会话中-共享一个临时视图,并让其保持活动状态直到Spark应用程序终止,我们可以使用global temporary view
。全局临时视图域系统保留的数据库global_temp
绑定。我们需要使用正确的名称创建它,如SELECT * FROM global_temp.view1

// 注册一个全局临时视图
df.createGlobalTempView("people")

// 全局临时视图与global_temp关联
spark.sql("SELECT * FROM global_temp.people").show()

// 全局临时视图的作用域
spark.newSession().sql("SELECT * FROM global_temp.people")

创建Datasets

Datasets类似于RDDs,不过没有使用Java serialization或者Kryo,而是使用专门的编码器来序列化对象,以便在网络进行处理或传输。

编码器和标准序列化都能将一个对象转化成一个字符数组,但是编码器是动态生成的,而且不需要进行阶码就可以支持Spark的很多操作,例如filtering
sorting
hashing

case class Person(name: String, age: Long)
// 为case class创建编码器
val caseClassDs = Seq(Person("Any"32)).toDS()

// 大多数常见类型的编码器是由`spark.implicits._`库自动导入
val primitiveDS = Seq(123).toDS()
primitiveDS.map(_ + 1).collect()

// 通过传递一个类,可以将DataFrames转换成Dataset。映射根据名称进行
val path = "src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

RDD互操作

Spark SQL支持两种不同的方法,将现有RDD转换为数据集。个方法是通过反射来推断RDD的schema。基于反射的方式在你了解Spark应用程序的所有schema时,可以让程序变得更加简洁。

第二种方式是允许用户通过一个动态接口创建一个schema并将其应用到一个现有的RDD中。这种方式更加啰嗦,但是它允许你在程序不知道列名或者类型的情况下创建datasets

使用反射进行推断

如前文所述,Spark SQL的Scala接口支持将包含case class
的RDD转换成DataFrame。这个case class定义了表的视图。case class参数的名称被映射为列名称。case class也是能够嵌套的,它可以包含复杂类型例如Seqs
Arrays
。这个RDD能够也能隐式转换成DataFrame,然后注册成一个表。

import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager()).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[StringAny]]
// Primitive * and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name""age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

# people.txt
# Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo
Michael, 29
Andy, 30
Justin, 19

编程方式创建schema

当无法提前定义case类的时候(比如编码为字符串或者对文本数据集进行解析,并为不同的用户投影不同的字段),需要用来以下三步以编程的方式创建DataFrame

  1. 从原始RDD创建一个Rows类型的RDD
  2. 创建一个代表schema的StructType,与步骤一的RDD进行匹配
  3. 通过SparkSession中的createDataFrame
    方法将schema应用成RDD
import org.apache.spark.sql.Row

import org.apache.spark.sql.*._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes()).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

标量函数

与聚合函数多行得到一个值返回不同,标量函数是每行返回一个值。Spark SQL支持多种Built-in Scalar Functions,也支持User Defined Scalar Functions

聚合函数

Aggregate function
返回多行的一个值,Built-in Aggregation Functions提供了多种聚合函数——count()
avg()
等等。用户也能自己创建聚合函数,更多细节参考User Defined Aggregate Functions。

相关文章