Spark 的核心概念 RDD

2022-05-31 00:00:00 数据 分区 内存 依赖 计算

1.RDD 概述

1.1 什么是 RDD ?

RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中基本的抽象,它代表一个不可变、可分区、里面元素可以并行计算的集合。

RDD 具有数据流模型特点:自动容错、位置感知性调度和可伸缩。

RDD 允许用户在执行多个查询时,显示地将工作集缓存在内存中,后续的查询能够重用工作集,这将会极大的提升查询的效率。

我们可以认为 RDD 就是一个代理,我们操作这个代理就像操作本地集合一样,不需去关心任务调度、容错等问题。

1.2 RDD 的属性

在 RDD 源码中这样来描述 RDD

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

复制代码
  1. 一组分片(Partition),即数据集的基本组成单位。 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  2. 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目;
  3. RDD 之间互相存在依赖关系。 RDD 的每次转换都会生成一个新的 RDD ,所以 RDD 之前就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失部分的分区数据,而不是对 RDD 的所有分区进行重新计算。
  4. 一个Partitioner ,即 RDD 的分片函数 。当前Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner ,另外一个是基于范围的 RangePartitioner。只有对于key-value的RDD ,才会有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函数不但决定了RDD 本身的分片数量,也决定了 Parent RDD Shuffle 输出时的分片数量。
  5. 一个列表,存储存取每个Partition 的优先位置(preferred location)。 对于一个HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块位置。安装“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

2 创建 RDD

2.1 由一个存在的 Scala 集合进行创建

#通过并行化scala集合创建RDD,一般在测试的时候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
复制代码

2.2 由外部的存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、Hbase

var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
复制代码

2.3 调用一个已经存在了的RDD 的 Transformation,会生成一个新的 RDD。

3 RDD 的编程 API

3.1 Transformation

这种 RDD 中的所有转换都是延迟加载的,也就是说,他们并不会直接就计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个返回结果的 Driver 的动作时,这些操作才会真正的运行。这种设计会让Spark 更加有效率的运行。

常用的 Transformation 操作:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 调用外部程序
coalesce(numPartitions) 重新分区 个参数是要分多少区,第二个参数是否shuffle 默认false ;少分区变多分区 true ; 多分区变少分区 false
repartition(numPartitions) 重新分区 必须shuffle 参数是要分多少区 少变多
repartitionAndSortWithinPartitions(partitioner) 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作

3.2 Action

触发代码的运行操作,我们一个Spark 应用,至少需要一个 Action 操作。

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
foreachPartition(func) 在每个分区上,运行函数 func

3.3 Spark WordCount 代码示例

执行流程图:

pom.xml 依赖

<!-- 导入scala的依赖 -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.2.0</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!-- 指定hadoop-client API的版本 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
复制代码

scala 版本代码实现:

package com.zhouq.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * scala 版本实现 wc
  *
  */
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //这行代码是因为我在windows 上直接跑,需要去读取 hadoop 上的文件,设置我的用户名。如果是linux 环境可以不设置。视情况而定
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建spark 配置,设置应用程序名字
//    val conf = new SparkConf().setAppName("scalaWordCount")
    val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")

//    conf.set("spark.testing.memory","102457600")
    //创建spark 执行的入口
    val sc = new SparkContext(conf)

    //指定以后从哪里读取数据创建RDD (弹性分布式数据集)
    //取到一行数据
    val lines: RDD[String] = sc.textFile(args(0))

    //切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //按单词和一组合
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //按key 进行聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 排序, false 表示倒序
    val sorted = reduced.sortBy(_._2, false)

    //将结果保存到hdfs中
    sorted.saveAsTextFile(args(1))

    //释放资源
    sc.stop()
  }
}
复制代码

Java7 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
* Java 版WordCount
*/
public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //创建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception{
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String tp) throws Exception {
                return new Tuple2<>(tp, 1);
            }
        });

        //先交换再排序,因为 只有groupByKey 方法
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
//                return new Tuple2<>(tp._2, tp._1);
                return tp.swap();
            }
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

Java8 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

/**
* Java Lambda 表达式版本的  WordCount
*/
public class JavaLambdaWordCount {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //创建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定读取数据的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分压平
//        lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());

        //将单词进行组合 (a,1),(b,1),(c,1),(a,1)
//        words.mapToPair(tp -> new Tuple2<>(tp,1));
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));

        //先交换再排序,因为 只有groupByKey 方法
//        swaped.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
//                return new Tuple2<>(tp._2, tp._1);
            return tp.swap();
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交换顺序
//        sorted.mapToPair(tp -> tp.swap());
        JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
复制代码

4 RDD 的依赖关系

RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不同的类型,即 窄依赖(narrow dependency)和宽依赖(wide dependency)。

窄依赖:窄依赖指的是每一个父 RDD 的 Partition 多被子 RDD 的一个分区使用。可以比喻为独生子女。 宽依赖:宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition

5 RDD 的持久化

5.1 RDD 的 cache(持久化)

Spark中重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用persist()或cache()方法标记要保留的RDD 。次在动作中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用初创建它的转换自动重新计算。

5.2 什么时候我们需要持久化?

  1. 要求的计算速度快
  2. 集群的资源要足够大
  3. 重要: cache 的数据会多次触发Action
  4. 建议先进行数据过滤,然后将缩小范围后的数据再cache 到内存中.

5.3 如何使用

使用 rdd.persist()或者rdd.cache()

val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法来缓存数据到内存
val cache = lines.cache()
//注意查看下面两次count 的时间
cached.count
cached.count

复制代码

5.4 数据缓存的存储级别 StorageLevel

我们在 StorageLevel.scala 源码中可以看到:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码

解释一下各个参数的意思:

个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障导致分区数据丢失,当重新分配任务时,去另外的机器读取备份数据进行重新计算)

OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中

5.5 如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  1. 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率高的选项,会使RDD上的操作尽可能的快。
  2. 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
  3. 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
  4. 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
    1. 它运行多个执行者共享Tachyon中相同的内存池
    2. 它显著地减少垃圾回收的花费
    3. 如果单个的执行者崩溃,缓存的数据不会丢失

5.6 删除 cache

Spark自动的监控每个节点缓存的使用情况,利用近少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用 RDD.unpersist()方法

5.7 RDD 的 checkpoint机制

我们除了把数据缓存到内存中,还可以把数据缓存到HDFS 中,保证中间数据不丢失.

什么时候我们需要做chechpoint?

  1. 做复杂的迭代计算,要求保证数据安全,不丢失
  2. 对速度要求不高(跟 cache 到内存进行对比)
  3. 将中间结果保存到 hdfs 中

怎么做 checkpoint ?

首先设置 checkpoint 目录,然后再执行计算逻辑,再执行 checkpoint() 操作。

下面代码使用cache 和 checkpoint 两种方式实现计算每门课受欢迎老师的 topN

package com.zhouq.spark

import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求每门课程受欢迎老师TopN  --2
  *   -- 使用cache
  *   -- 使用checkpoint 一般设置hdfs 目录
  */
object GroupFavTeacher2_cache_checkpoint {
  def main(args: Array[String]): Unit = {
    //前 N
    val topN = args(1).toInt
    //学科集合
    val subjects = Array("bigdata", "javaee", "php")
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
    //创建spark 执行入口
    val sc = new SparkContext(conf)
    //checkpoint 得先设置 sc 的checkpoint 的dir
//    sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")

    //指定读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      var teacher = line.substring(index + 1)
      var httpHost = line.substring(0, index)
      var subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })
    //将学科,老师联合当做key
    val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)

    //种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 以后被反复使用,才使用cache
    val cached: RDD[((String, String), Int)] = reduced.cache()

    //第二种 使用checkpoint,得先设置 sc 的 checkpointDir
//   val cached: RDD[((String, String), Int)] = reduced.checkpoint()

    /**
      * 先对学科进行过滤,然后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的情况.
      * take 是Action 操作,每次take 都会进行一次任务提交,具体查看日志打印情况
      */
    for (sub <- subjects) {
      //过滤出当前的学科
      val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
      //使用RDD 的 sortBy ,内存+磁盘排序,避免scala 中的排序因内存不足导致异常情况.
      //take 是Action 的,所以每次循环都会触发一次提交任务,祥见日志打印情况
      val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
      println(favTeacher.toBuffer)
    }

    /**
      * 前面cache的数据已经计算完了,后面还有很多其他的指标要计算
      * 后面计算的指标也要触发很多次Action,好将数据缓存到内存
      * 原来的数据占用着内存,把原来的数据释放掉,才能缓存新的数据
      */

    //把原来缓存的数据释放掉
    cached.unpersist(true)

    sc.stop()
  }
}
复制代码

6 DAG 的生成

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

微信公众号文章链接:Spark RDD

相关文章