Spark Operator 是如何提交 Spark 作业

2023-07-04 17:58:06 提交 作业 Spark Operator

Overview

本文将 Spark 作业称为 Spark Application 或者简称为 Spark App 或者 App。目前我们组内的计算平台的 Spark 作业,是通过 Spark Operator 提交给 Kubernetes 集群的,这与 Spark 原生的直接通过 spark-submit 提交 Spark App 的方式不同,所以理解 Spark Operator 中提交 Spark App 的逻辑,对于用户来说是非常有必要的。本文将就其具体的提交逻辑,介绍一下。

Spark Operator 中的 spark-submit 命令

熟悉 Spark 的同学未必对 Kubernetes 和 Operator 熟悉,所以看 Spark Operator 的逻辑的时候有可能会遇到一些问题,我的建议是先从提交 spark-submit 命令相关的逻辑开始看就会很容易理解。Spark Operator 的提交作业的逻辑主要在 pkg/controller/sparkapplication/submission.go

func runSparkSubmit(submission *submission) (bool, error) {
    sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
    if !present {
        glog.Error("SPARK_HOME is not specified")
    }
    // 这个就是 Spark 用户熟悉的 spark-submit 命令
    var command = filepath.Join(sparkHome, "/bin/spark-submit")

    cmd := execCommand(command, submission.args...)
    glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
    output, err := cmd.Output()
    glog.V(3).Infof("spark-submit output: %s", string(output))
    if err != nil {
        var errorMsg string
        if exitErr, ok := err.(*exec.ExitError); ok {
            errorMsg = string(exitErr.Stderr)
        }
        // The driver pod of the application already exists.
        if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
            glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
            return false, nil
        }
        if errorMsg != "" {
            return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
        }
        return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
    }

    return true, nil
}

相关文章