Spark Streaming执行流程分析
为了探索Spark Streaming的完整执行流程,我们先看下Spark源码项目examples模块里面提供的Spark Streaming案例: org.apache.spark.examples.streaming.DirectKafkaWordCount
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.streaming
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <groupId> is a consumer group name to consume from topics
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
* consumer-group topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <groupId> <topics>
| <brokers> is a list of one or more Kafka brokers
| <groupId> is a consumer group name to consume from topics
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, groupId, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
复制代码
分析这个案例,可以看到实现一个Spark Streaming流处理应用,一般有以下五个步骤:
- 初始化流处理上下文,即创建StreamingContext,作为流处理程序入口,这个过程中也在创建SparkContext这个Spark执行上下文;
- 创建输入流 Input DStreams;
- 对DStreams各种转换处理 Transformations,形成DStreams DAG;
- 执行Output Operations进行输出结果
- 启动StreamingContext,并等待终止
一、初始化StreamingContext
完整流程如下:
其中比较重要的是
- 1)SparkContext初始化,这个主要准备job执行的各种环境(网络通信、序列化反序列化、存储管理等等,需要专门去解读源码,这里暂时先不关心);
- 2)构建JobScheduler对象,该对象使用JobGenerator对象生成jobs,然后将jobs调度执行;
- 3)构建JobGenerator对象,该对象从DStreams中或者Checkpointing中生成jobs,同时负责清理DStreams元数据。
二、创建输入流InputDStreams
StreamingContext中提供了各种创建输入流的方法:
1、 receiverStream(receiver: Receiver[T]) => ReceiverInputDStream[T]
该方法需要用户自己传入一个Receiver(数据接收器),这里介绍一下Receiver: Receiver主要是运行在Spark Worker节点上,用于接收(获取)外部数据。主要方法有onStart()
和onStop()
,其中onStart()
方法实现为开始接收数据前的必要启动步骤(其中包括初始化线程池、缓冲区等),以及开始接收数据,这个方法必须是非阻塞的,因此需要使用另外一个线程来执行数据接收工作。onStop()
方法实现停止接收数据后的必要清理步骤(其中包括释放线程池、缓冲区资源,也就是onStart方法里面初始化的资源,在这个方法里面进行释放)。 另外Receiver会由一个ReceiverSupervisor进行监管,即ReceiverSupervisor负责Receiver的启动(start)、停止(stop)、重启(restart)以及保存Receiver接收到的数据(store)。
ReceiverInputDStream,主要通过Receiver接收数据生成BlockRDD。
2、socketStream(hostname:String, port:Int) => ReceiverInputDStream[T]
其实生成的是一个 SocketInputDStream,然后内部用到的Receiver是SocketReceiver。
/**
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
复制代码
private[streaming]
class SocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
复制代码
SocketReceiver的onStart方法,就是创建一个Socket客户端去连接SocketServer,然后启动了一个单独线程去从SocketServer端获取数据
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
}
复制代码
3、rawSocketStream => ReceiverInputDStream
这个方法创建的是一个RawInputDStream对象,该输入流的Receiver是RawNetworkReceiver,内部是使用NIO客户端获取数据,相对socketStream更高效
4、fileStream => InputDStream
生成的是FileInputDStream,这个就不是通过Receiver方式接收数据了。监听指定目录的新文件,然后读取新文件生成RDD。
/**
* Finds the files that were modified since the last time this method was called and makes
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
// Find new files
val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
batchTimeToSelectedFiles.synchronized {
batchTimeToSelectedFiles += ((validTime, newFiles))
}
recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
val metadata = Map(
"files" -> newFiles.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
val inputInfo = StreamInputInfo(id, , metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
rdds
}
复制代码
5、queueStream(queue: Queue[RDD[T]], oneAtATime: Boolean=true) => InputStream
创建的是QueueInputDStream,是直接使用外部指定的RDD队列生成流。
小结:这里有必要对InputDStream的相关方法进行说明,其重要的方法,包括继承于DStream的
1)compute(validTime: Time): Option[RDD[T]]
用于生成RDD
2)dependencies: List[DStream[_]]
DStream依赖,InputDStream作为输入,不存在依赖,因此重写了该方法,直接返回空集合:
override def dependencies: List[DStream[_]] = List()
3)generateJob(time: Time): Option[Job]
生成job,该方法主要调用getOrCompute生成RDD,然后创建job对象
4)getOrCompute(time: Time): Option[RDD[T]]
该方法调用compute方法生成RDD,并对RDD进行持久化和checkpoint操作
5)register
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
复制代码
将自己注册到DStreamGraph上,用于输出操作算子得到OutputDStream注册,比如ForEachDStream,其实就是放到DStreamGraph的outputStreams集合中
相应的InputDStream,在初始化创建的时候,就调用DStreamGraph的addInputStream方法将自己注册到DStreamGraph的inputStreams集合中
6)start()
启动接收数据的方法,InputDStream新增的抽象方法,由各个子类实现
/** Method called to start receiving data. Subclasses must implement this method. */
def start(): Unit
复制代码
7)stop()方法
停止接收数据方法
/** Method called to stop receiving data. Subclasses must implement this method. */
def stop(): Unit
复制代码
三、DStreams Transformation
1、StreamingContext提供了两个Transformation方法
1)union,将多个DStream合并,生成UnionDStream
2)transform,对DStream的RDD进行转换生成新的DStream
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
复制代码
2、DStream提供的转换操作方法
以上方法都是提供一个相应的自定义函数对DStream的元素(DStream里的各个RDD的partition里面的元素)进行操作,生成一个新类型的DStream,然后自己作为新类型DStream的parent(上游),这样同时也构建起了DStream的血缘关系。
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
复制代码
transform方法,处理的是DStream中的RDD,也就是也RDD为处理对象,可以实现更多的控制逻辑,相对于transform方法,上面的Transformation方法处理的是RDD分区元素集合或者单个元素对象。
window函数处理的是多个RDD,也就是说Spark Streaming会将DStream中的多个RDD放到一个窗口里面同时处理。
3、PairDStreamFunction提供的Transformation方法
PairDStreamFunction主要提供了针对kv格式的DStream的操作方法。
主要包括聚合操作和关联操作,这里选几个比较经典的看下源码:
1)combineByKey
/**
* Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
*/
def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
val cleanedMergeValue = sparkContext.clean(mergeValue)
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
new ShuffledDStream[K, V, C](
self,
cleanedCreateCombiner,
cleanedMergeValue,
cleanedMergeCombiner,
partitioner,
mapSideCombine)
}
复制代码
这个方法是groupByKey和reduceByKey的底层实现,了解了该方法,那么再去看groupByKey和reduceByKey就容易多了。该方法的参数如下:
- createCombiner: V => C
创建合并器 比如求平均数,先创建一个两个属性的对象(sum和count)
- mergeValue: (C, V) => C
来一个处理元素,将其与合并器进行合并
sum += value
count ++
- mergeCombiner: (C, C) => C
合并器之间进行合并
sum = sum1 + sum2
count = count1 + count2
avg = sum / count
- partitioner: Partitioner
分区器
- mapSideCombine: Boolean = true
是否需要在map端合并,默认true,是一种聚合的优化策略,但是并不是所有聚合都支持。
2)updateStateByKey
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* In every batch the updateFunc will be called for each state even if there are no new values.
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the partitioner object in the generated RDDs.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {
val cleanedFunc = ssc.sc.clean(updateFunc)
val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {
cleanedFunc(it)
}
new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
}
复制代码
Spark Streaming流处理的状态计算算子,不过该算子不支持状态的清理,也就是如果key数据持续增长的话,会导致内存持续增长。
3)mapWithState
目前Spark Streaming状态计算都建议使用该算子,具体如何使用,参考Spark官方提供的案例代码:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
* second starting with initial value of word count.
* Usage: StatefulNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example
* org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse() + state.getOption.getOrElse()
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
复制代码
该方法的参数为一个StateSpec对象,该对象提供了timeout方法用于设置状态的超时时长
四、Output Operations On DStreams
1、foreachRDD
对DStream的RDD进行自定义输出操作
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
* @param foreachFunc foreachRDD function
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* in the `foreachFunc` to be displayed in the UI. If `false`, then
* only the scopes and callsites of `foreachRDD` will override those
* of the RDDs on the display.
*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
复制代码
2、print
输出DStream中的数据,其实也是调用了foreachRDD
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
复制代码
3、saveAsTextFiles
调用了foreachRDD,然后对RDD执行saveAsTextFile输出到文件
* Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
复制代码
五、启动StreamingContext并等待终止
以上四个步骤,主要是在初始化执行环境,然后构建DStreamGraph,并没有真正开始数据流处理,即使调用了输出操作。
Spark Streaming作业真正开始执行的地方是调用StreamingContext的start方法,所以我们需要重点解析该方法的源码:
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE
scheduler.listenerBus.post(
StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
复制代码
整个流程如下:
StreamingContext --> JobScheduler --> JobGenerator --> DStreamGraph --> SparkContext.runJob
相关文章