Spark 快速入门教程

2022-05-31 00:00:00 数据 操作 代码 分布式 计算

原文链接: www.shiyanlou.com

开 始 体 验 Apache Spark是为大规模数据处理而设计的快速通用的运算框架,初由AMPLab所开发,使用了内存运算技术。相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中,Spark能在数据尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍。
  • 软件文档

Spark 快速入门教程

1. Spark 是什么

Apache Spark 是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。Spark 正如其名,大的特点就是快(Lightning-fast),可比 Hadoop MapReduce 的处理速度快 100 倍。如果你熟悉 Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop 使用 HDFS 来解决分布式数据问题,MapReduce 计算范式提供有效的分布式计算。类似的,Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。

2. Spark 核心组件

Spark 库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似 SQL 查询的支持,机器学习和图算法,对实时流数据的支持。具体核心组件如下:

Spark Core:包含 Spark 的基本功能;尤其是定义 RDD 的 API、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的。

Spark SQL:提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。对熟悉 Hive 和 HiveQL 的人,Spar k可以拿来就用。

Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据。

MLlib:一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库 Mahout,将会转到 Spark,并在未来实现。

GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark 快速流行起来。不仅如此,Spark 也提供了使用 Scala、Java 和Python 编写的 API;满足了不同团体的需求,允许更多数据科学家简便地采用 Spark 作为他们的大数据解决方案。

3. Spark 体系架构

Spark体系架构包括如下三个主要组件:

  • 数据存储
  • API
  • 管理框架

数据存储:Spark 用 HDFS 文件系统存储数据。它可用于存储任何兼容于 Hadoop 的数据源,包括HDFS,Hbase,Cassandra等。

API:利用 API,应用开发者可以用标准的 API 接口创建基于 Spark 的应用。Spark 提供 Scala,Java 和 Python 三种程序设计语言的 API。

下面是三种语言 Spark API 的网站链接。

  • Scala API

    spark.apache.org/docs/latest…

  • Java

    spark.apache.org/docs/latest…

  • Python

    spark.apache.org/docs/latest…

Spark基本概念:

  • Application: 用户自己写的 Spark 应用程序,批处理作业的集合。Application 的 main 方法为应用程序的入口,用户通过 Spark 的 API,定义了 RDD 和对 RDD 的操作。
  • SparkContext: Spark 重要的 API,用户逻辑与 Spark 集群主要的交互接口,它会和 Cluster Master 交互,包括向它申请计算资源等。
  • Driver 和 Executor:Spark 在执行每个 Application 的过程中会启动 Driver 和 Executor 两种 JVM 进程。Driver 进程为主控进程,负责执行用户 Application 中的 main 方法,提交 Job,并将 Job 转化为 Task,在各个 Executor 进程间协调 Task 的调度。运行在Worker上 的 Executor 进程负责执行 Task,并将结果返回给 Driver,同时为需要缓存的 RDD 提供存储功能。

资源管理:

一组计算机的集合,每个计算机节点作为独立的计算资源,又可以虚拟出多个具备计算能力的虚拟机,这些虚拟机是集群中的计算单元。Spark 的核心模块专注于调度和管理虚拟机之上分布式计算任务的执行,集群中的计算资源则交给 Cluster Manager 这个角色来管理,Cluster Manager 可以为自带的Standalone、或第三方的 Yarn和 Mesos。

Cluster Manager 一般采用 Master-Slave 结构。以 Yarn 为例,部署 ResourceManager 服务的节点为 Master,负责集群中所有计算资源的统一管理和分配;部署 NodeManager 服务的节点为Slave,负责在当前节点创建一个或多个具备独立计算能力的 JVM 实例,在 Spark 中,这些节点也叫做 Worker。

另外还有一个 Client 节点的概念,是指用户提交Spark Application 时所在的节点。

弹性分布式数据集(RDD):

弹性分布式数据集(RDD)是 Spark 框架中的核心概念。可以将 RDD 视作数据库中的一张表。其中可以保存任何类型的数据。Spark 将数据存储在不同分区上的 RDD 之中。

RDD 可以帮助重新安排计算并优化数据处理过程。

此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。

RDD 是不可变的。你可以用变换(Transformation)修改 RDD,但是这个变换所返回的是一个全新的RDD,而原有的 RDD 仍然保持不变。

RDD 支持两种类型的操作:

  • 变换(Transformation)变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。 变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。
  • 行动(Action)行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。 行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。

4. 安装部署

本文档是对 Spark 的一个快速入门。首先,我们通过 Spark 的交互式 shell 介绍一下 API(主要是 Scala),

更完整参考 programming guide:

spark.apache.org/docs/latest…

(1). 安装前设置

注意:本小节部分,实验楼环境已经配置好,无需配置,可以跳过此步骤。

在安装 Hadoop 之前,需要进入 Linux/Ubuntu 环境下,连接 Linux/Ubuntu 使用 SSH (安全 Shell)。按照仅给出简要步骤设立 Linux/Ubuntu 环境,对于本小节您可以百度。

  1. 修改主机名
  2. 修改 IP
  3. 修改主机名和IP的映射关系
  4. 关闭防火墙
  5. 重启 Linux/Ubuntu
  6. 安装JDK

(2). 安装 Spark

官网下载地址:

spark.apache.org/downloads.h…

本文档选择的是 spark-1.6.1-bin-hadoop2.6 版本,下载地址:

d3kbcqa49mib13.cloudfront.net/spark-1.6.1…


>> cd /home/shiyanlou

>>sudo wget https://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz

>> sudo tar zxvf spark-1.6.1-bin-hadoop2.6.tgz
复制代码

(3). 通过 Spark Shell 进行交互分析

Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本文档选择使用 Scala 来进行介绍。

Scala:是一门现代的多范式编程语言,以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。

1). 启动 spark shell


>>cd spark-1.6.1-bin-hadoop2.6

>>./bin/spark-shell.sh
复制代码

Spark 主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset – RDD)。它可被分发到集群各个节点上,进行并行操作。RDD 可以由 Hadoop InputFormats 读取 HDFS 文件创建得来,或者从其他 RDD 转换得到。下面我们就先利用 Spark 源代码目录下的 README 文件来新建一个 RDD:

我们从 /home/shiyanlou/README.md 文件新建一个 RDD,代码如下:

>>`scala> val textFile = sc.textFile("/home/shiyanlou/README.md")`

> textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:1
复制代码

下面我们就来演示 count() 和 first() 操作:


>>  `scala> textFile.count()`
>
> res0: Long = 95
>>`scala> textFile.first()`
>
>  res1: String = # Apache Spark
>
复制代码

接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下:


>>  `scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))` // 统计包含 Spark 的行数

> linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af39
> 
>> `scala> linesWithSpark.count()`   // 统计行数

> res2: Long = 17
复制代码

可以看到一共有 17 行内容包含 Spark,这与通过 Linux 命令 cat /home/shiyanlou/README.md | grep "Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:

>>`scala> textFile.filter(line => line.contains("Spark")).count()`  // 统计包含 Spark 的行数

> res3: Long = 17
复制代码

RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词多的那一行内容共有几个单词:

>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)`

> res4: Int = 14
复制代码

代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:

>>`scala> import java.lang.Math`

> import java.lang.Math

>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))`
>
> res5: Int = 16
复制代码

Hadoop 上的 MapReduce 是大家耳熟能详的一种通用数据流模式。在 Spark 中同样可以实现(下面这个例子也就是 WordCount):

>> `scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  `  // 实现单词统计

> wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:2

>> `scala> wordCounts.collect()`   // 输出单词统计结果

> res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)
复制代码

2). RDD 缓存

Spark 也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的 hot 数据集,或者运行一个类似 PageRank 的算法。

举个简单的例子,对 linesWithSpark RDD 数据集进行缓存,然后再调用 count() 会触发算子操作进行真正的计算,之后再次调用 count() 就不会再重复的计算,直接使用上一次计算的结果的 RDD 了:

>>  `scala> linesWithSpark.cache() `

> res8: spark.RDD[String] = spark.FilteredRDD@17e51082

>> `scala> linesWithSpark.count()`

> res9: Long = 17

>> `scala> linesWithSpark.count()`

> res10: Long = 17
复制代码

看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输 RDD 计算的结果。

3). Spark SQL 和 DataFrames

Spark SQL 是 Spark 内嵌的模块,用于结构化数据。在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。

下面仍在 Spark shell 中演示一下 Spark SQL 的基本操作,该部分内容主要参考了 Spark SQL、DataFrames 和 Datasets 指南

spark.apache.org/docs/latest…

Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。在 Spark shell 启动时,输出日志的后有这么几条信息:


17/05/25 15:16:34 INFO repl.SparkILoop: Created spark context..

Spark context available as sc.

17/05/25 15:16:34 INFO repl.SparkILoop: Created sql context..

SQL context available as sqlContext.
复制代码

这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。

使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。作为示例,我们通过 Spark 提供的 JSON 格式的数据源文件 spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json 来进行演示,该数据源内容如下:


{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}
复制代码

执行如下命令导入数据源,并输出内容:


>>`scala> val df = sqlContext.read.json("home/shiyanlou/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json")`

> df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

>> `scala> df.show()`  // 输出数据源内容

>     +----+-------+   
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
复制代码

接着,我们来演示 DataFrames 处理结构化数据的一些基本操作:

>> `scala> df.select("name").show() `  // 只显示 "name" 列



>     +-------+   
    |   name|
    +-------+
    |Michael|
    |   Andy|
    | Justin|
    +-------+

>> `scala> df.select(df("name"), df("age") + 1).show()   ` // 将 "age" 加 1

>     +-------+---------+
    |   name|(age + 1)|
    +-------+---------+
    |Michael|     null|
    |   Andy|       31|
    | Justin|       20|
    +-------+---------+

>> `scala> df.filter(df("age") > 21).show()  `  //条件语句

>     +---+----+
    |age|name|
    +---+----+
    | 30|Andy|
    +---+----+



>> `scala> df.groupBy("age").count().show()   ` // groupBy 操作

>      +----+-----+
     | age|count|
     +----+-----+
     |null|    1|
     |  19|    1|
     |  30|    1|
     +----+-----+
复制代码

当然,我们也可以使用 SQL 语句来进行操作:


>>`scala> .registerTempTable("people")  `    // 将 DataFrame 注册为临时表 people
>> `scala> val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")` // 执行 SQL 查询


>> `scala> result.show()  ` //输出结果

>     +----+-------+   
    | name |  age|
    +----+-------+
    |Justin|   19|
    +----+-------+
复制代码

更多的功能可以查看完整的 DataFrames API:

spark.apache.org/docs/latest…

此外 DataFrames 也包含了丰富的 DataFrames Function:

spark.apache.org/docs/latest…

5. 总结

Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。这篇文档的目的是帮助你快速入门,完成单机上的 Spark 安装与使用,本文档是 Spark 快速入门,主要介绍了 Spark shell 、RDD、Spark SQL 等,希望对您有所帮助。

更多学习资料请参考:

Spark 集群部署:

spark.apache.org/docs/latest…

Spark 编程指南:

spark.apache.org/docs/latest…

Spark SQL、DataFrames 和 Datasets 指南:

spark.apache.org/docs/latest…

相关文章