Spark源码解析Yarn部署流程(SparkSubmit)
Spark源码解析Yarn部署流程(SparkSubmit))
更多文章,可打开微信搜索 知了小巷 公众号并关注,公众号后台回复 资料 两个字,有大数据学习视频资料免费领取。
一、Yarn部署流程(SparkSubmit)
1.1 spark-submit 脚本
查看脚本 spark-submit 内容:
源码位置:spark/bin/spark-submit
# $@: 传入脚本的所有参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
复制代码
spark-class
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
复制代码
源码位置:spark/launcher/src/main/java/org/apache/spark/launcher/Main.java
Spark应用启动的命令行接口,用于Spark内置脚本。
/**
* Command line interface for the Spark launcher. Used internally by Spark scripts.
*/
class Main {
// Usage: Main [class] [class args]
// This CLI works in two different modes: 两种模式
// 1. 如果是"spark-submit":即class是org.apache.spark.deploy.SparkSubmit,那么SparkLauncher class is used to launch a Spark application.
// 2. 如果是"spark-class":if another class is provided, an internal Spark class is run.
// This class works in tandem with the "bin/spark-class" script on Unix-like systems
public static void main(String[] argsArray) throws Exception {
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
// SparkSubmit
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
cmd = buildCommand(builder, env, printLaunchCommand);
} catch (IllegalArgumentException e) {
// ...
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
// Ignore parsing exceptions.
}
// ...
help.add(parser.USAGE_ERROR);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
// cmd builder不同
cmd = buildCommand(builder, env, printLaunchCommand);
}
} else {
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
// cmd builder不同
cmd = buildCommand(builder, env, printLaunchCommand);
}
// ...
}
// ...
}
复制代码
查看 SparkSubmit 的 main方法:
object SparkSubmit extends CommandLineUtils with Logging
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
// 封装配置参数
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
// 创建SparkSubmitArguments对象
new SparkSubmitArguments(args) {
// ...
}
}
// ...
override def doSubmit(args: Array[String]): Unit = {
try {
// 会调到这里
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
// 调用doSubmit
submit.doSubmit(args)
}
复制代码
1.1.1 封装参数 new SparkSubmitArguments
上面可以看到创建了 SparkSubmitArguments对象:
spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
/**
* Parses and encapsulates arguments from the spark-submit script.
* The env argument is used for testing.
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser with Logging {
//...
// Set parameters from command line arguments
// 解析命令行传入的参数
parse(args.asJava)
//...
}
复制代码
parse的具体实现在:
spark/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
/**
* Parse a list of spark-submit command line options.
* <p>
* See SparkSubmitArguments.scala for a more formal description of available options.
*
* @throws IllegalArgumentException If an error is found during parsing.
*/
protected final void parse(List<String> args) {
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = 0;
// 外层是一个for循环,遍历所有arg参数
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args.get(idx);
}
// 对命令行输入的内容进行解析
if (!handle(name, value)) {
break;
}
continue;
}
// Look for a switch.
name = findCliOption(arg, switches);
if (name != null) {
// 对命令行输入的内容进行解析
if (!handle(name, null)) {
break;
}
continue;
}
if (!handleUnknown(arg)) {
break;
}
}
if (idx < args.size()) {
idx++;
}
handleExtraArgs(args.subList(idx, args.size()));
}
复制代码
handle方法主要是做模式匹配、然后赋值:
/Users/shaozhipeng/Development/project/java/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
这里是一个模板方法的实现
/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
// 模式匹配和赋值
opt match {
case NAME =>
name = value
case MASTER =>
master = value
case CLASS =>
mainClass = value
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
// ...
case _ =>
error(s"Unexpected argument '$opt'.")
}
action != SparkSubmitAction.PRINT_VERSION
}
复制代码
1.1.2 执行 doSubmit 方法
super.doSubmit(args)
/**
* Main gateway of launching a Spark application.
*
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
private[spark] class SparkSubmit extends Logging {
import DependencyUtils._
import SparkSubmit._
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
// ...
}
复制代码
执行 submit 方法
执行 doRunMain 方法
调用 runMain 方法
/**
* Submit the application using the provided parameters, ensuring to first wrap
* in a doAs when --proxy-user is specified.
*/
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// doRunMain 声明了一个方法,后面会调用。而且里面无论如何都会调用runMain
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
}
} else {
runMain(args, uninitLog)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
复制代码
runMain方法有两个步骤,如注释:
- First, we prepare the launch environment by setting up the appropriate classpath, system properties, and application arguments for running the child main class based on the cluster manager and the deploy mode.
- Second, we use this launch environment to invoke the main method of the child main class.
/**
* Run the main method of the child class using the submit arguments.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 1. 准备提交环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// ...
try {
// 2. 通过反射获取类
// childMainClass就是我们命令行中输入的那个--class类【客户端模式】
// org.apache.spark.deploy.yarn.YarnClusterApplication【集群模式】
mainClass = Utils.classForName(childMainClass)
} catch {
// ...
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
// yarn-cluster实际上是会走这里
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
// 查找 调用 main 方法
new JavaMainApplication(mainClass)
}
// ...
try {
// 3. 此处重点,调用SparkApplication的start方法
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
复制代码
准备提交环境 prepareSubmitEnvironment
/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// 返回值,重点看下childMainClass
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
// 比较重要
var childMainClass = ""
// ...
复制代码
返回值里面的childMainClass,不同模式下要执行的main class:
客户端模式:
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
复制代码
集群模式:
org.apache.spark.deploy.yarn.YarnClusterApplication
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
// private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
// ...
}
复制代码
JavaMainApplication的实现是通过反射加载类并执行main方法。
runMain#Utils.classForName(childMainClass)
查找并调用main方法:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
// 重写start方法,里面查找并调用klass的main方法
override def start(args: Array[String], conf: SparkConf): Unit = {
// 找到main方法
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
// 判断是否是static
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
// 调用main方法
mainMethod.invoke(null, args)
}
}
复制代码
YarnClusterApplication是没有main方法的,重写了SparkApplication的start方法,new了一个Client执行run方法。
1.2 org.apache.spark.deploy.yarn.YarnClusterApplication
spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
【图】
// 继承SparkApplication重写了start方法
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
// 1.封装配置参数 new ClientArguments(args)
// 2.创建客户端 new Client
// 3.运行 .run()
new Client(new ClientArguments(args), conf, null).run()
}
}
复制代码
1.2.1 封装配置参数 new ClientArguments(args)
就是一些封装配置参数。
/**
* Command-line parser for the driver client.
*/
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._
var cmd: String = "" // 'launch' or 'kill'
// ...
}
复制代码
1.2.2 创建客户端 new Client
对象中有一个 yarnClient = YarnClient.createYarnClient
private[spark] class Client(
val args: ClientArguments,
val sparkConf: SparkConf,
val rpcEnv: RpcEnv)
extends Logging {
import Client._
import YarnSparkHadoopUtil._
// 创建YarnClient
private val yarnClient = YarnClient.createYarnClient
// ...
}
复制代码
一个简单的静态工厂
org.apache.hadoop.yarn.client.api#YarnClient
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient() {
// YarnClient客户端实现
YarnClient client = new YarnClientImpl();
return client;
}
// ...
}
复制代码
org.apache.hadoop.yarn.client.api.impl#YarnClientImpl
可参考【】
重点对象是rmClient。
@Private
@Unstable
public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
// ResourceManager客户端
protected ApplicationClientProtocol rmClient;
// ...
}
复制代码
1.2.3 调用 client.run 方法
注册Spark应用到ResourceManager。
/**
* Submit an application to the ResourceManager.
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
* reporting the application's status until the application has exited for any reason.
* Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
// 应用ID - 提交应用
this.appId = submitApplication()
// ...
}
复制代码
提交Spark应用
submitApplication()
用来运行当前Spark应用对应的ApplicationMaster。
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
ResourceRequestHelper.validateResources(sparkConf)
var appId: ApplicationId = null
try {
// 1. YARN客户端初始化和启动
launcherBackend.connect()
// 初始化YARN客户端
yarnClient.init(hadoopConf)
// 启动YARN客户端
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
// 2. 向RM请求创建新的YARN应用,拿到appId
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
// ...
// Set up the appropriate contexts to launch our AM
// 3. 为ApplicationMaster创建上下文
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
// 4. 提交并监控应用的运行状态
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
// 返回appId
appId
} catch {
case e: Throwable =>
if (stagingDirPath != null) {
cleanupStagingDir()
}
throw e
}
}
复制代码
创建容器上下文
containerContext = createContainerLaunchContext(newAppResponse)
/**
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
// JVM参数等
// ...
// 这里是重点 - ApplicationMaster的 class
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
// ...
// ApplicationMaster启动参数拼接
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))
// 启动AM的命令行脚本 Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
// ...
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
setupSecurityToken(amContainer)
// 返回amContainer
amContainer
}
复制代码
ContainerLaunchContext是一个抽象类,包含ContainerId、Resource、User、Security tokens、LocalResource、Environment variables、启动容器的命令等。
/**
* {@code ContainerLaunchContext} represents all of the information
* needed by the {@code NodeManager} to launch a container.
* <p>
* It includes details such as:
* <ul>
* <li>{ContainerId} of the container.</li>
* <li>{Resource} allocated to the container.</li>
* <li>User to whom the container is allocated.</li>
* <li>Security tokens (if security is enabled).</li>
* <li>
* {LocalResource} necessary for running the container such
* as binaries, jar, shared-objects, side-files etc.
* </li>
* <li>Optional, application-specific binary service data.</li>
* <li>Environment variables for the launched process.</li>
* <li>Command to launch the container.</li>
* </ul>
*
* @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
*/
@Public
@Stable
public abstract class ContainerLaunchContext { //...}
相关文章