Spark 的核心概念 RDD
1.RDD 概述
1.1 什么是 RDD ?
RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中基本的抽象,它代表一个不可变、可分区、里面元素可以并行计算的集合。
RDD 具有数据流模型特点:自动容错、位置感知性调度和可伸缩。
RDD 允许用户在执行多个查询时,显示地将工作集缓存在内存中,后续的查询能够重用工作集,这将会极大的提升查询的效率。
我们可以认为 RDD 就是一个代理,我们操作这个代理就像操作本地集合一样,不需去关心任务调度、容错等问题。
1.2 RDD 的属性
在 RDD 源码中这样来描述 RDD
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
复制代码
- 一组分片(Partition),即数据集的基本组成单位。 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
- 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
- RDD 之间互相存在依赖关系。 RDD 的每次转换都会生成一个新的 RDD ,所以 RDD 之前就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失部分的分区数据,而不是对 RDD 的所有分区进行重新计算。
- 一个Partitioner ,即 RDD 的分片函数 。当前Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner ,另外一个是基于范围的 RangePartitioner。只有对于key-value的RDD ,才会有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函数不但决定了RDD 本身的分片数量,也决定了 Parent RDD Shuffle 输出时的分片数量。
- 一个列表,存储存取每个Partition 的优先位置(preferred location)。 对于一个HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块位置。安装“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2 创建 RDD
2.1 由一个存在的 Scala 集合进行创建
#通过并行化scala集合创建RDD,一般在测试的时候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
复制代码
2.2 由外部的存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、Hbase
var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
复制代码
2.3 调用一个已经存在了的RDD 的 Transformation,会生成一个新的 RDD。
3 RDD 的编程 API
3.1 Transformation
这种 RDD 中的所有转换都是延迟加载的,也就是说,他们并不会直接就计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个返回结果的 Driver 的动作时,这些操作才会真正的运行。这种设计会让Spark 更加有效率的运行。
常用的 Transformation 操作:
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 调用外部程序 |
coalesce(numPartitions) | 重新分区 个参数是要分多少区,第二个参数是否shuffle 默认false ;少分区变多分区 true ; 多分区变少分区 false |
repartition(numPartitions) | 重新分区 必须shuffle 参数是要分多少区 少变多 |
repartitionAndSortWithinPartitions(partitioner) | 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作 |
3.2 Action
触发代码的运行操作,我们一个Spark 应用,至少需要一个 Action 操作。
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
foreachPartition(func) | 在每个分区上,运行函数 func |
3.3 Spark WordCount 代码示例
执行流程图:
pom.xml 依赖
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
复制代码
scala 版本代码实现:
package com.zhouq.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* scala 版本实现 wc
*
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//这行代码是因为我在windows 上直接跑,需要去读取 hadoop 上的文件,设置我的用户名。如果是linux 环境可以不设置。视情况而定
System.setProperty("HADOOP_USER_NAME", "root")
//创建spark 配置,设置应用程序名字
// val conf = new SparkConf().setAppName("scalaWordCount")
val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")
// conf.set("spark.testing.memory","102457600")
//创建spark 执行的入口
val sc = new SparkContext(conf)
//指定以后从哪里读取数据创建RDD (弹性分布式数据集)
//取到一行数据
val lines: RDD[String] = sc.textFile(args(0))
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//按单词和一组合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//按key 进行聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
// 排序, false 表示倒序
val sorted = reduced.sortBy(_._2, false)
//将结果保存到hdfs中
sorted.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
复制代码
Java7 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Java 版WordCount
*/
public class JavaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//创建SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定读取数据的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception{
return Arrays.asList(line.split(" ")).iterator();
}
});
//将单词进行组合 (a,1),(b,1),(c,1),(a,1)
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String tp) throws Exception {
return new Tuple2<>(tp, 1);
}
});
//先交换再排序,因为 只有groupByKey 方法
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交换顺序
JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
//输出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
复制代码
Java8 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* Java Lambda 表达式版本的 WordCount
*/
public class JavaLambdaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//创建SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定读取数据的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分压平
// lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
//将单词进行组合 (a,1),(b,1),(c,1),(a,1)
// words.mapToPair(tp -> new Tuple2<>(tp,1));
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));
//先交换再排序,因为 只有groupByKey 方法
// swaped.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交换顺序
// sorted.mapToPair(tp -> tp.swap());
JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());
//输出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
复制代码
4 RDD 的依赖关系
RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不同的类型,即 窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖:窄依赖指的是每一个父 RDD 的 Partition 多被子 RDD 的一个分区使用。可以比喻为独生子女。 宽依赖:宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition
5 RDD 的持久化
5.1 RDD 的 cache(持久化)
Spark中重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。
您可以使用persist()或cache()方法标记要保留的RDD 。次在动作中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用初创建它的转换自动重新计算。
5.2 什么时候我们需要持久化?
- 要求的计算速度快
- 集群的资源要足够大
- 重要: cache 的数据会多次触发Action
- 建议先进行数据过滤,然后将缩小范围后的数据再cache 到内存中.
5.3 如何使用
使用 rdd.persist()或者rdd.cache()
val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法来缓存数据到内存
val cache = lines.cache()
//注意查看下面两次count 的时间
cached.count
cached.count
复制代码
5.4 数据缓存的存储级别 StorageLevel
我们在 StorageLevel.scala 源码中可以看到:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码
解释一下各个参数的意思:
个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障导致分区数据丢失,当重新分配任务时,去另外的机器读取备份数据进行重新计算)
OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中
5.5 如何选择存储级别
Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:
- 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率高的选项,会使RDD上的操作尽可能的快。
- 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
- 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
- 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
- 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
- 它运行多个执行者共享Tachyon中相同的内存池
- 它显著地减少垃圾回收的花费
- 如果单个的执行者崩溃,缓存的数据不会丢失
5.6 删除 cache
Spark自动的监控每个节点缓存的使用情况,利用近少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用 RDD.unpersist()方法
5.7 RDD 的 checkpoint机制
我们除了把数据缓存到内存中,还可以把数据缓存到HDFS 中,保证中间数据不丢失.
什么时候我们需要做chechpoint?
- 做复杂的迭代计算,要求保证数据安全,不丢失
- 对速度要求不高(跟 cache 到内存进行对比)
- 将中间结果保存到 hdfs 中
怎么做 checkpoint ?
首先设置 checkpoint 目录,然后再执行计算逻辑,再执行 checkpoint() 操作。
下面代码使用cache 和 checkpoint 两种方式实现计算每门课受欢迎老师的 topN
package com.zhouq.spark
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 求每门课程受欢迎老师TopN --2
* -- 使用cache
* -- 使用checkpoint 一般设置hdfs 目录
*/
object GroupFavTeacher2_cache_checkpoint {
def main(args: Array[String]): Unit = {
//前 N
val topN = args(1).toInt
//学科集合
val subjects = Array("bigdata", "javaee", "php")
val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
//创建spark 执行入口
val sc = new SparkContext(conf)
//checkpoint 得先设置 sc 的checkpoint 的dir
// sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")
//指定读取数据
val lines: RDD[String] = sc.textFile(args(0))
val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
val index = line.lastIndexOf("/")
var teacher = line.substring(index + 1)
var httpHost = line.substring(0, index)
var subject = new URL(httpHost).getHost.split("[.]")(0)
((subject, teacher), 1)
})
//将学科,老师联合当做key
val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)
//种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 以后被反复使用,才使用cache
val cached: RDD[((String, String), Int)] = reduced.cache()
//第二种 使用checkpoint,得先设置 sc 的 checkpointDir
// val cached: RDD[((String, String), Int)] = reduced.checkpoint()
/**
* 先对学科进行过滤,然后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的情况.
* take 是Action 操作,每次take 都会进行一次任务提交,具体查看日志打印情况
*/
for (sub <- subjects) {
//过滤出当前的学科
val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
//使用RDD 的 sortBy ,内存+磁盘排序,避免scala 中的排序因内存不足导致异常情况.
//take 是Action 的,所以每次循环都会触发一次提交任务,祥见日志打印情况
val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
println(favTeacher.toBuffer)
}
/**
* 前面cache的数据已经计算完了,后面还有很多其他的指标要计算
* 后面计算的指标也要触发很多次Action,好将数据缓存到内存
* 原来的数据占用着内存,把原来的数据释放掉,才能缓存新的数据
*/
//把原来缓存的数据释放掉
cached.unpersist(true)
sc.stop()
}
}
复制代码
6 DAG 的生成
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
微信公众号文章链接:Spark RDD
相关文章