Spark 快速入门教程
- 软件文档
Spark 快速入门教程
1. Spark 是什么
Apache Spark 是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。Spark 正如其名,大的特点就是快(Lightning-fast),可比 Hadoop MapReduce 的处理速度快 100 倍。如果你熟悉 Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop 使用 HDFS 来解决分布式数据问题,MapReduce 计算范式提供有效的分布式计算。类似的,Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。
2. Spark 核心组件
Spark 库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似 SQL 查询的支持,机器学习和图算法,对实时流数据的支持。具体核心组件如下:
Spark Core:包含 Spark 的基本功能;尤其是定义 RDD 的 API、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的。
Spark SQL:提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。对熟悉 Hive 和 HiveQL 的人,Spar k可以拿来就用。
Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据。
MLlib:一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库 Mahout,将会转到 Spark,并在未来实现。
GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark 快速流行起来。不仅如此,Spark 也提供了使用 Scala、Java 和Python 编写的 API;满足了不同团体的需求,允许更多数据科学家简便地采用 Spark 作为他们的大数据解决方案。
3. Spark 体系架构
Spark体系架构包括如下三个主要组件:
- 数据存储
- API
- 管理框架
数据存储:Spark 用 HDFS 文件系统存储数据。它可用于存储任何兼容于 Hadoop 的数据源,包括HDFS,Hbase,Cassandra等。
API:利用 API,应用开发者可以用标准的 API 接口创建基于 Spark 的应用。Spark 提供 Scala,Java 和 Python 三种程序设计语言的 API。
下面是三种语言 Spark API 的网站链接。
Scala API
spark.apache.org/docs/latest…
Java
spark.apache.org/docs/latest…
Python
spark.apache.org/docs/latest…
Spark基本概念:
Application:
用户自己写的 Spark 应用程序,批处理作业的集合。Application 的 main 方法为应用程序的入口,用户通过 Spark 的 API,定义了 RDD 和对 RDD 的操作。SparkContext:
Spark 重要的 API,用户逻辑与 Spark 集群主要的交互接口,它会和 Cluster Master 交互,包括向它申请计算资源等。Driver 和 Executor:
Spark 在执行每个 Application 的过程中会启动 Driver 和 Executor 两种 JVM 进程。Driver 进程为主控进程,负责执行用户 Application 中的 main 方法,提交 Job,并将 Job 转化为 Task,在各个 Executor 进程间协调 Task 的调度。运行在Worker上 的 Executor 进程负责执行 Task,并将结果返回给 Driver,同时为需要缓存的 RDD 提供存储功能。
资源管理:
一组计算机的集合,每个计算机节点作为独立的计算资源,又可以虚拟出多个具备计算能力的虚拟机,这些虚拟机是集群中的计算单元。Spark 的核心模块专注于调度和管理虚拟机之上分布式计算任务的执行,集群中的计算资源则交给 Cluster Manager 这个角色来管理,Cluster Manager 可以为自带的Standalone、或第三方的 Yarn和 Mesos。
Cluster Manager 一般采用 Master-Slave 结构。以 Yarn 为例,部署 ResourceManager 服务的节点为 Master,负责集群中所有计算资源的统一管理和分配;部署 NodeManager 服务的节点为Slave,负责在当前节点创建一个或多个具备独立计算能力的 JVM 实例,在 Spark 中,这些节点也叫做 Worker。
另外还有一个 Client 节点的概念,是指用户提交Spark Application 时所在的节点。
弹性分布式数据集(RDD):
弹性分布式数据集(RDD)是 Spark 框架中的核心概念。可以将 RDD 视作数据库中的一张表。其中可以保存任何类型的数据。Spark 将数据存储在不同分区上的 RDD 之中。
RDD 可以帮助重新安排计算并优化数据处理过程。
此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。
RDD 是不可变的。你可以用变换(Transformation)修改 RDD,但是这个变换所返回的是一个全新的RDD,而原有的 RDD 仍然保持不变。
RDD 支持两种类型的操作:
变换(Transformation)
变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。 变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。行动(Action)
行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。 行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。
4. 安装部署
本文档是对 Spark 的一个快速入门。首先,我们通过 Spark 的交互式 shell 介绍一下 API(主要是 Scala),
更完整参考 programming guide:
spark.apache.org/docs/latest…
(1). 安装前设置
注意:本小节部分,实验楼环境已经配置好,无需配置,可以跳过此步骤。
在安装 Hadoop 之前,需要进入 Linux/Ubuntu 环境下,连接 Linux/Ubuntu 使用 SSH (安全 Shell)。按照仅给出简要步骤设立 Linux/Ubuntu 环境,对于本小节您可以百度。
- 修改主机名
- 修改 IP
- 修改主机名和IP的映射关系
- 关闭防火墙
- 重启 Linux/Ubuntu
- 安装JDK
(2). 安装 Spark
官网下载地址:
spark.apache.org/downloads.h…
本文档选择的是 spark-1.6.1-bin-hadoop2.6
版本,下载地址:
d3kbcqa49mib13.cloudfront.net/spark-1.6.1…
>> cd /home/shiyanlou
>>sudo wget https://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz
>> sudo tar zxvf spark-1.6.1-bin-hadoop2.6.tgz
复制代码
(3). 通过 Spark Shell 进行交互分析
Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本文档选择使用 Scala 来进行介绍。
Scala:
是一门现代的多范式编程语言,以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。Scala 是 Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。
1). 启动 spark shell
>>cd spark-1.6.1-bin-hadoop2.6
>>./bin/spark-shell.sh
复制代码
Spark 主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset – RDD)。它可被分发到集群各个节点上,进行并行操作。RDD 可以由 Hadoop InputFormats 读取 HDFS 文件创建得来,或者从其他 RDD 转换得到。下面我们就先利用 Spark 源代码目录下的 README 文件来新建一个 RDD:
我们从 /home/shiyanlou/README.md
文件新建一个 RDD,代码如下:
>>`scala> val textFile = sc.textFile("/home/shiyanlou/README.md")`
> textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:1
复制代码
下面我们就来演示 count() 和 first() 操作:
>> `scala> textFile.count()`
>
> res0: Long = 95
>>`scala> textFile.first()`
>
> res1: String = # Apache Spark
>
复制代码
接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下:
>> `scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))` // 统计包含 Spark 的行数
> linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af39
>
>> `scala> linesWithSpark.count()` // 统计行数
> res2: Long = 17
复制代码
可以看到一共有 17 行内容包含 Spark,这与通过 Linux 命令 cat /home/shiyanlou/README.md | grep "Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:
>>`scala> textFile.filter(line => line.contains("Spark")).count()` // 统计包含 Spark 的行数
> res3: Long = 17
复制代码
RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词多的那一行内容共有几个单词:
>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)`
> res4: Int = 14
复制代码
代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:
>>`scala> import java.lang.Math`
> import java.lang.Math
>>`scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))`
>
> res5: Int = 16
复制代码
Hadoop 上的 MapReduce 是大家耳熟能详的一种通用数据流模式。在 Spark 中同样可以实现(下面这个例子也就是 WordCount):
>> `scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) ` // 实现单词统计
> wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:2
>> `scala> wordCounts.collect()` // 输出单词统计结果
> res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)
复制代码
2). RDD 缓存
Spark 也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的 hot 数据集,或者运行一个类似 PageRank 的算法。
举个简单的例子,对 linesWithSpark RDD 数据集进行缓存,然后再调用 count() 会触发算子操作进行真正的计算,之后再次调用 count() 就不会再重复的计算,直接使用上一次计算的结果的 RDD 了:
>> `scala> linesWithSpark.cache() `
> res8: spark.RDD[String] = spark.FilteredRDD@17e51082
>> `scala> linesWithSpark.count()`
> res9: Long = 17
>> `scala> linesWithSpark.count()`
> res10: Long = 17
复制代码
看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输 RDD 计算的结果。
3). Spark SQL 和 DataFrames
Spark SQL 是 Spark 内嵌的模块,用于结构化数据。在 Spark 程序中可以使用 SQL 查询语句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式来连接多种数据源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,并且可以在多种数据源之间执行 join 操作。
下面仍在 Spark shell 中演示一下 Spark SQL 的基本操作,该部分内容主要参考了 Spark SQL、DataFrames 和 Datasets 指南
spark.apache.org/docs/latest…
Spark SQL 的功能是通过 SQLContext 类来使用的,而创建 SQLContext 是通过 SparkContext 创建的。在 Spark shell 启动时,输出日志的后有这么几条信息:
17/05/25 15:16:34 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/05/25 15:16:34 INFO repl.SparkILoop: Created sql context..
SQL context available as sqlContext.
复制代码
这些信息表明 SparkContent 和 SQLContext 都已经初始化好了,可通过对应的 sc、sqlContext 变量直接进行访问。
使用 SQLContext 可以从现有的 RDD 或数据源创建 DataFrames。作为示例,我们通过 Spark 提供的 JSON 格式的数据源文件 spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json 来进行演示,该数据源内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
复制代码
执行如下命令导入数据源,并输出内容:
>>`scala> val df = sqlContext.read.json("home/shiyanlou/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json")`
> df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
>> `scala> df.show()` // 输出数据源内容
> +----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
复制代码
接着,我们来演示 DataFrames 处理结构化数据的一些基本操作:
>> `scala> df.select("name").show() ` // 只显示 "name" 列
> +-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>> `scala> df.select(df("name"), df("age") + 1).show() ` // 将 "age" 加 1
> +-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
>> `scala> df.filter(df("age") > 21).show() ` //条件语句
> +---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
>> `scala> df.groupBy("age").count().show() ` // groupBy 操作
> +----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
复制代码
当然,我们也可以使用 SQL 语句来进行操作:
>>`scala> .registerTempTable("people") ` // 将 DataFrame 注册为临时表 people
>> `scala> val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")` // 执行 SQL 查询
>> `scala> result.show() ` //输出结果
> +----+-------+
| name | age|
+----+-------+
|Justin| 19|
+----+-------+
复制代码
更多的功能可以查看完整的 DataFrames API:
spark.apache.org/docs/latest…
此外 DataFrames 也包含了丰富的 DataFrames Function:
spark.apache.org/docs/latest…
5. 总结
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 拥有多种语言的函数式编程 API,提供了除 map 和 reduce 之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。这篇文档的目的是帮助你快速入门,完成单机上的 Spark 安装与使用,本文档是 Spark 快速入门,主要介绍了 Spark shell 、RDD、Spark SQL 等,希望对您有所帮助。
更多学习资料请参考:
Spark 集群部署:
spark.apache.org/docs/latest…
Spark 编程指南:
spark.apache.org/docs/latest…
Spark SQL、DataFrames 和 Datasets 指南:
spark.apache.org/docs/latest…
相关文章