Skip to content

Commit

Permalink
Spark once task supports engingeConnRuntimeMode label
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Sep 4, 2023
1 parent 4169e70 commit 2e808de
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.linkis.manager.engineplugin.common.creation.{
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.manager.label.utils.LabelUtil
Expand Down Expand Up @@ -86,12 +87,22 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
val sparkHome = SPARK_HOME.getValue(options)
val sparkConfDir = SPARK_CONF_DIR.getValue(options)
val sparkConfig: SparkConfig = getSparkConfig(options)
val sparkConfig: SparkConfig =
getSparkConfig(options, isYarnClusterMode(engineCreationContext.getLabels()))
val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null)
context
}

def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = {
val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels)
val isYarnClusterMode: Boolean = {
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
}
isYarnClusterMode
}

def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = {
logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
val sparkConfig: SparkConfig = new SparkConfig()
sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
Expand All @@ -112,7 +123,14 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
}
sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))

if (master.startsWith("yarn")) {
if (isYarnClusterMode) {
sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER)
} else {
sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT)
}
}
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
Expand Down

0 comments on commit 2e808de

Please sign in to comment.