Spark源码解析Yarn部署流程(SparkSubmit)

2022-05-31 00:00:00 代码 参数 调用 方法 复制

Spark源码解析Yarn部署流程(SparkSubmit))

更多文章,可打开微信搜索 知了小巷 公众号并关注,公众号后台回复 资料 两个字,有大数据学习视频资料免费领取。

一、Yarn部署流程(SparkSubmit)

1.1 spark-submit 脚本

查看脚本 spark-submit 内容:
源码位置:spark/bin/spark-submit

# $@: 传入脚本的所有参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
复制代码

spark-class

build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}
复制代码

源码位置:spark/launcher/src/main/java/org/apache/spark/launcher/Main.java
Spark应用启动的命令行接口,用于Spark内置脚本。

/**
 * Command line interface for the Spark launcher. Used internally by Spark scripts.
 */
class Main {
	// Usage: Main [class] [class args]
	// This CLI works in two different modes: 两种模式
	// 1. 如果是"spark-submit":即class是org.apache.spark.deploy.SparkSubmit,那么SparkLauncher class is used to launch a Spark application.
	// 2. 如果是"spark-class":if another class is provided, an internal Spark class is run.
	// This class works in tandem with the "bin/spark-class" script on Unix-like systems
	public static void main(String[] argsArray) throws Exception {
    	checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
    	// SparkSubmit
    	if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
			try {
				AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
				cmd = buildCommand(builder, env, printLaunchCommand);
			} catch (IllegalArgumentException e) {
				// ...
				MainClassOptionParser parser = new MainClassOptionParser();
				try {
				  parser.parse(args);
				} catch (Exception ignored) {
				  // Ignore parsing exceptions.
				}

				// ...
				help.add(parser.USAGE_ERROR);
				AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
				// cmd builder不同
				cmd = buildCommand(builder, env, printLaunchCommand);
			}
		} else {
			AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
			// cmd builder不同
			cmd = buildCommand(builder, env, printLaunchCommand);
		}
		// ...
	}
	// ...		

}
复制代码

查看 SparkSubmit 的 main方法:
object SparkSubmit extends CommandLineUtils with Logging

override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>
      // 封装配置参数
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        // 创建SparkSubmitArguments对象
        new SparkSubmitArguments(args) {
          // ...
        }
      }

      // ...

      override def doSubmit(args: Array[String]): Unit = {
        try {
          // 会调到这里
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }
    // 调用doSubmit
    submit.doSubmit(args)
}
复制代码
1.1.1 封装参数 new SparkSubmitArguments

上面可以看到创建了 SparkSubmitArguments对象
spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

/**
 * Parses and encapsulates arguments from the spark-submit script.
 * The env argument is used for testing.
 */
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  //...
  // Set parameters from command line arguments
  // 解析命令行传入的参数
  parse(args.asJava)
  //...
}  
复制代码

parse的具体实现在:
spark/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

/**
 * Parse a list of spark-submit command line options.
 * <p>
 * See SparkSubmitArguments.scala for a more formal description of available options.
 *
 * @throws IllegalArgumentException If an error is found during parsing.
 */
protected final void parse(List<String> args) {
  Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

  int idx = 0;
  // 外层是一个for循环,遍历所有arg参数
  for (idx = 0; idx < args.size(); idx++) {
    String arg = args.get(idx);
    String value = null;

    Matcher m = eqSeparatedOpt.matcher(arg);
    if (m.matches()) {
      arg = m.group(1);
      value = m.group(2);
    }

    // Look for options with a value.
    String name = findCliOption(arg, opts);
    if (name != null) {
      if (value == null) {
        if (idx == args.size() - 1) {
          throw new IllegalArgumentException(
              String.format("Missing argument for option '%s'.", arg));
        }
        idx++;
        value = args.get(idx);
      }
      // 对命令行输入的内容进行解析
      if (!handle(name, value)) {
        break;
      }
      continue;
    }

    // Look for a switch.
    name = findCliOption(arg, switches);
    if (name != null) {
      // 对命令行输入的内容进行解析
      if (!handle(name, null)) {
        break;
      }
      continue;
    }

    if (!handleUnknown(arg)) {
      break;
    }
  }

  if (idx < args.size()) {
    idx++;
  }
  handleExtraArgs(args.subList(idx, args.size()));
}
复制代码

handle方法主要是做模式匹配、然后赋值:
/Users/shaozhipeng/Development/project/java/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
这里是一个模板方法的实现

/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
  // 模式匹配和赋值
  opt match {
    case NAME =>
      name = value

    case MASTER =>
      master = value

    case CLASS =>
      mainClass = value

    case DEPLOY_MODE =>
      if (value != "client" && value != "cluster") {
        error("--deploy-mode must be either \"client\" or \"cluster\"")
      }
      deployMode = value

    // ...

    case _ =>
      error(s"Unexpected argument '$opt'.")
  }
  action != SparkSubmitAction.PRINT_VERSION
}
复制代码
1.1.2 执行 doSubmit 方法

super.doSubmit(args)

/**
 * Main gateway of launching a Spark application.
 *
 * This program handles setting up the classpath with relevant Spark dependencies and provides
 * a layer over the different cluster managers and deploy modes that Spark supports.
 */
private[spark] class SparkSubmit extends Logging {
  import DependencyUtils._
  import SparkSubmit._

  def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
  // ...
}
复制代码

执行 submit 方法
执行 doRunMain 方法
调用 runMain 方法

/**
 * Submit the application using the provided parameters, ensuring to first wrap
 * in a doAs when --proxy-user is specified.
 */
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  // doRunMain 声明了一个方法,后面会调用。而且里面无论如何都会调用runMain
  def doRunMain(): Unit = {
    if (args.proxyUser != null) {
      val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
        UserGroupInformation.getCurrentUser())
      try {
        proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
          override def run(): Unit = {
            runMain(args, uninitLog)
          }
        })
      } catch {
        case e: Exception =>
          // Hadoop's AuthorizationException suppresses the exception's stack trace, which
          // makes the message printed to the output by the JVM not very helpful. Instead,
          // detect exceptions with empty stack traces here, and treat them differently.
          if (e.getStackTrace().length == 0) {
            error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
          } else {
            throw e
          }
      }
    } else {
      runMain(args, uninitLog)
    }
  }
  
  // In standalone cluster mode, there are two submission gateways:
  //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
  //   (2) The new REST-based gateway introduced in Spark 1.3
  // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
  // to use the legacy gateway if the master endpoint turns out to be not a REST server.
  if (args.isStandaloneCluster && args.useRest) {
    try {
      logInfo("Running Spark using the REST application submission protocol.")
      doRunMain()
    } catch {
      // Fail over to use the legacy submission gateway
      case e: SubmitRestConnectionException =>
        logWarning(s"Master endpoint ${args.master} was not a REST server. " +
          "Falling back to legacy submission gateway instead.")
        args.useRest = false
        submit(args, false)
    }
  // In all other modes, just run the main class as prepared
  } else {
    doRunMain()
  }
}
复制代码

runMain方法有两个步骤,如注释:

  1. First, we prepare the launch environment by setting up the appropriate classpath, system properties, and application arguments for running the child main class based on the cluster manager and the deploy mode.
  2. Second, we use this launch environment to invoke the main method of the child main class.
/**
 * Run the main method of the child class using the submit arguments.
 *
 * This runs in two steps. First, we prepare the launch environment by setting up
 * the appropriate classpath, system properties, and application arguments for
 * running the child main class based on the cluster manager and the deploy mode.
 * Second, we use this launch environment to invoke the main method of the child
 * main class.
 *
 * Note that this main class will not be the one provided by the user if we're
 * running cluster deploy mode or python applications.
 */
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  // 1. 准备提交环境
  val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  // ...
  try {
    // 2. 通过反射获取类
    // childMainClass就是我们命令行中输入的那个--class类【客户端模式】
    // org.apache.spark.deploy.yarn.YarnClusterApplication【集群模式】
    mainClass = Utils.classForName(childMainClass)
  } catch {
    // ...
  }
  
  val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    // yarn-cluster实际上是会走这里
    mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
  } else {
    // 查找 调用 main 方法
    new JavaMainApplication(mainClass)
  }
  // ...
  try {
  	  // 3. 此处重点,调用SparkApplication的start方法
      app.start(childArgs.toArray, sparkConf)
  } catch {
 	  case t: Throwable =>
	    throw findCause(t)
  }
}
复制代码

准备提交环境 prepareSubmitEnvironment

/**
 * Prepare the environment for submitting an application.
 *
 * @param args the parsed SparkSubmitArguments used for environment preparation.
 * @param conf the Hadoop Configuration, this argument will only be set in unit test.
 * @return a 4-tuple:
 *        (1) the arguments for the child process,
 *        (2) a list of classpath entries for the child,
 *        (3) a map of system properties, and
 *        (4) the main class for the child
 *
 * Exposed for testing.
 */
private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
    // 返回值,重点看下childMainClass
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    // 比较重要
    var childMainClass = ""
    // ...
    
复制代码

返回值里面的childMainClass,不同模式下要执行的main class:
客户端模式:

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
  childMainClass = args.mainClass
  if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
    childClasspath += localPrimaryResource
  }
  if (localJars != null) { childClasspath ++= localJars.split(",") }
}
复制代码

集群模式:
org.apache.spark.deploy.yarn.YarnClusterApplication

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
  //   private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
  childMainClass = YARN_CLUSTER_SUBMIT_CLASS
  // ...
}
复制代码

JavaMainApplication的实现是通过反射加载类并执行main方法。
runMain#Utils.classForName(childMainClass)
查找并调用main方法:

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  // 重写start方法,里面查找并调用klass的main方法
  override def start(args: Array[String], conf: SparkConf): Unit = {
    // 找到main方法
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    // 判断是否是static
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }
    // 调用main方法
    mainMethod.invoke(null, args)
  }

}
复制代码

YarnClusterApplication是没有main方法的,重写了SparkApplication的start方法,new了一个Client执行run方法。

1.2 org.apache.spark.deploy.yarn.YarnClusterApplication

spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
【图】

// 继承SparkApplication重写了start方法
private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    // 1.封装配置参数 new ClientArguments(args)
    // 2.创建客户端 new Client
    // 3.运行 .run()
    new Client(new ClientArguments(args), conf, null).run()
  }

}
复制代码
1.2.1 封装配置参数 new ClientArguments(args)

就是一些封装配置参数。

/**
 * Command-line parser for the driver client.
 */
private[deploy] class ClientArguments(args: Array[String]) {
  import ClientArguments._

  var cmd: String = "" // 'launch' or 'kill'
  // ...
}
复制代码
1.2.2 创建客户端 new Client

对象中有一个 yarnClient = YarnClient.createYarnClient

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf,
    val rpcEnv: RpcEnv)
  extends Logging {

  import Client._
  import YarnSparkHadoopUtil._

  // 创建YarnClient
  private val yarnClient = YarnClient.createYarnClient
  // ...
}  
复制代码

一个简单的静态工厂
org.apache.hadoop.yarn.client.api#YarnClient

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {

  /**
   * Create a new instance of YarnClient.
   */
  @Public
  public static YarnClient createYarnClient() {
  	// YarnClient客户端实现
    YarnClient client = new YarnClientImpl();
    return client;
  }
  // ...
}
复制代码

org.apache.hadoop.yarn.client.api.impl#YarnClientImpl
可参考【】
重点对象是rmClient

@Private
@Unstable
public class YarnClientImpl extends YarnClient {

  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);

  // ResourceManager客户端
  protected ApplicationClientProtocol rmClient;
  // ...
}
复制代码
1.2.3 调用 client.run 方法

注册Spark应用到ResourceManager

/**
 * Submit an application to the ResourceManager.
 * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
 * reporting the application's status until the application has exited for any reason.
 * Otherwise, the client process will exit after submission.
 * If the application finishes with a failed, killed, or undefined status,
 * throw an appropriate SparkException.
*/
def run(): Unit = {
	// 应用ID - 提交应用
	this.appId = submitApplication()
	// ...
}
复制代码

提交Spark应用
submitApplication()
用来运行当前Spark应用对应的ApplicationMaster。

/**
 * Submit an application running our ApplicationMaster to the ResourceManager.
 *
 * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
 * creating applications and setting up the application submission context. This was not
 * available in the alpha API.
 */
def submitApplication(): ApplicationId = {
	ResourceRequestHelper.validateResources(sparkConf)

	var appId: ApplicationId = null
	try {
	  // 1. YARN客户端初始化和启动
	  launcherBackend.connect()
	  // 初始化YARN客户端
	  yarnClient.init(hadoopConf)
	  // 启动YARN客户端
	  yarnClient.start()

	  logInfo("Requesting a new application from cluster with %d NodeManagers"
	    .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

	  // Get a new application from our RM
	  // 2. 向RM请求创建新的YARN应用,拿到appId
	  val newApp = yarnClient.createApplication()
	  val newAppResponse = newApp.getNewApplicationResponse()
	  appId = newAppResponse.getApplicationId()

	  // ...

	  // Set up the appropriate contexts to launch our AM
	  // 3. 为ApplicationMaster创建上下文
	  val containerContext = createContainerLaunchContext(newAppResponse)
	  val appContext = createApplicationSubmissionContext(newApp, containerContext)

	  // Finally, submit and monitor the application
	  // 4. 提交并监控应用的运行状态
	  logInfo(s"Submitting application $appId to ResourceManager")
	  yarnClient.submitApplication(appContext)
	  launcherBackend.setAppId(appId.toString)
	  reportLauncherState(SparkAppHandle.State.SUBMITTED)
	  // 返回appId
	  appId
	} catch {
	  case e: Throwable =>
	    if (stagingDirPath != null) {
	      cleanupStagingDir()
	    }
	    throw e
	}
}
复制代码

创建容器上下文
containerContext = createContainerLaunchContext(newAppResponse)

/**
 * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
 * This sets up the launch environment, java options, and the command for launching the AM.
 */
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
	 // JVM参数等
	 // ...
    // 这里是重点 - ApplicationMaster的 class
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

    // ...

    // ApplicationMaster启动参数拼接
	val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
      Seq("--dist-cache-conf",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

	// 启动AM的命令行脚本 Command for the ApplicationMaster
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)
    
    // ...  
    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    // 返回amContainer
    amContainer
}
复制代码

ContainerLaunchContext是一个抽象类,包含ContainerId、Resource、User、Security tokens、LocalResource、Environment variables、启动容器的命令等。

/**
 * {@code ContainerLaunchContext} represents all of the information
 * needed by the {@code NodeManager} to launch a container.
 * <p>
 * It includes details such as:
 * <ul>
 *   <li>{ContainerId} of the container.</li>
 *   <li>{Resource} allocated to the container.</li>
 *   <li>User to whom the container is allocated.</li>
 *   <li>Security tokens (if security is enabled).</li>
 *   <li>
 *     {LocalResource} necessary for running the container such
 *     as binaries, jar, shared-objects, side-files etc.
 *   </li>
 *   <li>Optional, application-specific binary service data.</li>
 *   <li>Environment variables for the launched process.</li>
 *   <li>Command to launch the container.</li>
 * </ul>
 * 
 * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
 */
@Public
@Stable
public abstract class ContainerLaunchContext { //...}

相关文章