Spark Streaming执行流程分析

2022-05-31 00:00:00 数据 代码 的是 方法 复制


为了探索Spark Streaming的完整执行流程,我们先看下Spark源码项目examples模块里面提供的Spark Streaming案例: org.apache.spark.examples.streaming.DirectKafkaWordCount

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <groupId> is a consumer group name to consume from topics
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
 *    consumer-group topic1,topic2
 */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <groupId> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <groupId> is a consumer group name to consume from topics
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, groupId, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

复制代码

分析这个案例,可以看到实现一个Spark Streaming流处理应用,一般有以下五个步骤:

  1. 初始化流处理上下文,即创建StreamingContext,作为流处理程序入口,这个过程中也在创建SparkContext这个Spark执行上下文;
  2. 创建输入流 Input DStreams;
  3. 对DStreams各种转换处理 Transformations,形成DStreams DAG;
  4. 执行Output Operations进行输出结果
  5. 启动StreamingContext,并等待终止

一、初始化StreamingContext

完整流程如下:

其中比较重要的是

  • 1)SparkContext初始化,这个主要准备job执行的各种环境(网络通信、序列化反序列化、存储管理等等,需要专门去解读源码,这里暂时先不关心);
  • 2)构建JobScheduler对象,该对象使用JobGenerator对象生成jobs,然后将jobs调度执行;
  • 3)构建JobGenerator对象,该对象从DStreams中或者Checkpointing中生成jobs,同时负责清理DStreams元数据。

二、创建输入流InputDStreams

StreamingContext中提供了各种创建输入流的方法: 

1、 receiverStream(receiver: Receiver[T]) => ReceiverInputDStream[T]

该方法需要用户自己传入一个Receiver(数据接收器),这里介绍一下Receiver: Receiver主要是运行在Spark Worker节点上,用于接收(获取)外部数据。主要方法有onStart()onStop(),其中onStart()方法实现为开始接收数据前的必要启动步骤(其中包括初始化线程池、缓冲区等),以及开始接收数据,这个方法必须是非阻塞的,因此需要使用另外一个线程来执行数据接收工作。onStop()方法实现停止接收数据后的必要清理步骤(其中包括释放线程池、缓冲区资源,也就是onStart方法里面初始化的资源,在这个方法里面进行释放)。 另外Receiver会由一个ReceiverSupervisor进行监管,即ReceiverSupervisor负责Receiver的启动(start)、停止(stop)、重启(restart)以及保存Receiver接收到的数据(store)。

ReceiverInputDStream,主要通过Receiver接收数据生成BlockRDD。

2、socketStream(hostname:String, port:Int) => ReceiverInputDStream[T]

其实生成的是一个 SocketInputDStream,然后内部用到的Receiver是SocketReceiver。

  /**
   * Creates an input stream from TCP source hostname:port. Data is received using
   * a TCP socket and the receive bytes it interpreted as object using the given
   * converter.
   * @param hostname      Hostname to connect to for receiving data
   * @param port          Port to connect to for receiving data
   * @param converter     Function to convert the byte stream to objects
   * @param storageLevel  Storage level to use for storing the received objects
   * @tparam T            Type of the objects received (after converting bytes to objects)
   */
  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }
复制代码
private[streaming]
class SocketInputDStream[T: ClassTag](
    _ssc: StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}
复制代码

SocketReceiver的onStart方法,就是创建一个Socket客户端去连接SocketServer,然后启动了一个单独线程去从SocketServer端获取数据

  def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }
  
    /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}
复制代码

3、rawSocketStream => ReceiverInputDStream

这个方法创建的是一个RawInputDStream对象,该输入流的Receiver是RawNetworkReceiver,内部是使用NIO客户端获取数据,相对socketStream更高效

4、fileStream => InputDStream

生成的是FileInputDStream,这个就不是通过Receiver方式接收数据了。监听指定目录的新文件,然后读取新文件生成RDD。

  /**
   * Finds the files that were modified since the last time this method was called and makes
   * a union RDD out of them. Note that this maintains the list of files that were processed
   * in the latest modification time in the previous call to this method. This is because the
   * modification time returned by the FileStatus API seems to return times only at the
   * granularity of seconds. And new files may have the same modification time as the
   * latest modification time in the previous call to this method yet was not reported in
   * the previous call.
   */
  override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    // Find new files
    val newFiles = findNewFiles(validTime.milliseconds)
    logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    batchTimeToSelectedFiles.synchronized {
      batchTimeToSelectedFiles += ((validTime, newFiles))
    }
    recentlySelectedFiles ++= newFiles
    val rdds = Some(filesToRDD(newFiles))
    // Copy newFiles to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "files" -> newFiles.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    val inputInfo = StreamInputInfo(id, , metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    rdds
  }
复制代码

5、queueStream(queue: Queue[RDD[T]], oneAtATime: Boolean=true) => InputStream

创建的是QueueInputDStream,是直接使用外部指定的RDD队列生成流。

小结:这里有必要对InputDStream的相关方法进行说明,其重要的方法,包括继承于DStream的

1)compute(validTime: Time): Option[RDD[T]]

用于生成RDD

2)dependencies: List[DStream[_]]

DStream依赖,InputDStream作为输入,不存在依赖,因此重写了该方法,直接返回空集合:

override def dependencies: List[DStream[_]] = List()

3)generateJob(time: Time): Option[Job]

生成job,该方法主要调用getOrCompute生成RDD,然后创建job对象

4)getOrCompute(time: Time): Option[RDD[T]]

该方法调用compute方法生成RDD,并对RDD进行持久化和checkpoint操作

5)register

  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }
复制代码

将自己注册到DStreamGraph上,用于输出操作算子得到OutputDStream注册,比如ForEachDStream,其实就是放到DStreamGraph的outputStreams集合中

相应的InputDStream,在初始化创建的时候,就调用DStreamGraph的addInputStream方法将自己注册到DStreamGraph的inputStreams集合中

6)start()

启动接收数据的方法,InputDStream新增的抽象方法,由各个子类实现

  /** Method called to start receiving data. Subclasses must implement this method. */
  def start(): Unit
复制代码

7)stop()方法

停止接收数据方法

  /** Method called to stop receiving data. Subclasses must implement this method. */
  def stop(): Unit
复制代码

三、DStreams Transformation

1、StreamingContext提供了两个Transformation方法

1)union,将多个DStream合并,生成UnionDStream

2)transform,对DStream的RDD进行转换生成新的DStream

  /**
   * Create a new DStream in which each RDD is generated by applying a function on RDDs of
   * the DStreams.
   */
  def transform[T: ClassTag](
      dstreams: Seq[DStream[_]],
      transformFunc: (Seq[RDD[_]], Time) => RDD[T]
    ): DStream[T] = withScope {
    new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }
复制代码

2、DStream提供的转换操作方法

以上方法都是提供一个相应的自定义函数对DStream的元素(DStream里的各个RDD的partition里面的元素)进行操作,生成一个新类型的DStream,然后自己作为新类型DStream的parent(上游),这样同时也构建起了DStream的血缘关系。

  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }
复制代码

transform方法,处理的是DStream中的RDD,也就是也RDD为处理对象,可以实现更多的控制逻辑,相对于transform方法,上面的Transformation方法处理的是RDD分区元素集合或者单个元素对象。

window函数处理的是多个RDD,也就是说Spark Streaming会将DStream中的多个RDD放到一个窗口里面同时处理。

3、PairDStreamFunction提供的Transformation方法

PairDStreamFunction主要提供了针对kv格式的DStream的操作方法。

主要包括聚合操作和关联操作,这里选几个比较经典的看下源码:

1)combineByKey

  /**
   * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
   * combineByKey for RDDs. Please refer to combineByKey in
   * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
   */
  def combineByKey[C: ClassTag](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiner: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
    val cleanedCreateCombiner = sparkContext.clean(createCombiner)
    val cleanedMergeValue = sparkContext.clean(mergeValue)
    val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
    new ShuffledDStream[K, V, C](
      self,
      cleanedCreateCombiner,
      cleanedMergeValue,
      cleanedMergeCombiner,
      partitioner,
      mapSideCombine)
  }
复制代码

这个方法是groupByKey和reduceByKey的底层实现,了解了该方法,那么再去看groupByKey和reduceByKey就容易多了。该方法的参数如下:

  • createCombiner: V => C

创建合并器 比如求平均数,先创建一个两个属性的对象(sum和count)

  • mergeValue: (C, V) => C

来一个处理元素,将其与合并器进行合并

sum += value

count ++

  • mergeCombiner: (C, C) => C

合并器之间进行合并

sum = sum1 + sum2

count = count1 + count2

avg = sum / count

  • partitioner: Partitioner

分区器

  • mapSideCombine: Boolean = true

是否需要在map端合并,默认true,是一种聚合的优化策略,但是并不是所有聚合都支持。

2)updateStateByKey

  /**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * In every batch the updateFunc will be called for each state even if there are no new values.
   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
   * @param updateFunc State update function. Note, that this function may generate a different
   *                   tuple with a different key than the input key. Therefore keys may be removed
   *                   or added in this way. It is up to the developer to decide whether to
   *                   remember the partitioner despite the key being changed.
   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
   *                    DStream
   * @param rememberPartitioner Whether to remember the partitioner object in the generated RDDs.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner,
      rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {
    val cleanedFunc = ssc.sc.clean(updateFunc)
    val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {
      cleanedFunc(it)
    }
    new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
  }
复制代码

Spark Streaming流处理的状态计算算子,不过该算子不支持状态的清理,也就是如果key数据持续增长的话,会导致内存持续增长。

3)mapWithState

目前Spark Streaming状态计算都建议使用该算子,具体如何使用,参考Spark官方提供的案例代码:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._

/**
 * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
 * second starting with initial value of word count.
 * Usage: StatefulNetworkWordCount <hostname> <port>
 *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
 *   data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
 */
object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StatefulNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    // Initial state RDD for mapWithState operation
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using mapWithState
    // This will give a DStream made of state (which is the cumulative count of the words)
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse() + state.getOption.getOrElse()
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream = wordDstream.mapWithState(
      StateSpec.function(mappingFunc).initialState(initialRDD))
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

复制代码

该方法的参数为一个StateSpec对象,该对象提供了timeout方法用于设置状态的超时时长

四、Output Operations On DStreams

1、foreachRDD

对DStream的RDD进行自定义输出操作

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   * @param foreachFunc foreachRDD function
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
   *                           only the scopes and callsites of `foreachRDD` will override those
   *                           of the RDDs on the display.
   */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }
复制代码

2、print

输出DStream中的数据,其实也是调用了foreachRDD

  /**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }
复制代码

3、saveAsTextFiles

调用了foreachRDD,然后对RDD执行saveAsTextFile输出到文件

   * Save each RDD in this DStream as at text file, using string representation
   * of elements. The file name at each batch interval is generated based on
   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }
复制代码

五、启动StreamingContext并等待终止

以上四个步骤,主要是在初始化执行环境,然后构建DStreamGraph,并没有真正开始数据流处理,即使调用了输出操作。

Spark Streaming作业真正开始执行的地方是调用StreamingContext的start方法,所以我们需要重点解析该方法的源码:

  /**
   * Start the execution of the streams.
   *
   * @throws IllegalStateException if the StreamingContext is already stopped.
   */
  def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        logDebug("Adding shutdown hook") // force eager creation of logger
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }
复制代码

整个流程如下: 

StreamingContext --> JobScheduler --> JobGenerator --> DStreamGraph --> SparkContext.runJob

相关文章