-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: "spark-submit jar on k8s" related label and resource expansion #4784
Conversation
Please handle the third dependency check problem. |
3456d77
to
d349093
Compare
Thanks. Already fix and please check. |
.withClientCertData(k8sClientCertData) | ||
.withClientKeyData(k8sClientKeyData) | ||
.withCaCertData(k8sCaCertData) | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if DefaultKubernetesClient
needs to be initialized every time? Does creating a client for each requestResourceInfo
affect the K8s cluster?
throws RMWarnException { | ||
// if (!super.canRequest(labelContainer, resource)) { | ||
// return false; | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the configuration resource not validated?
public class ClusterLabel extends GenericLabel implements EMNodeLabel { | ||
import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; | ||
|
||
public class ClusterLabel extends GenericLabel implements EngineNodeLabel, UserModifiable { | ||
|
||
public ClusterLabel() { | ||
setLabelKey(LabelKeyConstant.YARN_CLUSTER_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether the LabelKey
can be changed
public class KubernetesResourceRequester implements ExternalResourceRequester { | ||
private static final Logger logger = LoggerFactory.getLogger(KubernetesResourceRequester.class); | ||
private ExternalResourceProvider provider = null; | ||
private String k8sMasterUrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to support different namespaces and different queues?
for (Node item : client.nodes().list().getItems()) { | ||
allocatableMemory += | ||
Long.parseLong(item.getStatus().getAllocatable().get("memory").getAmount()); | ||
allocatableCPU += Long.parseLong(item.getStatus().getAllocatable().get("cpu").getAmount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether the reference usedCPU
is correct?
public class ClusterLabel extends GenericLabel implements EMNodeLabel { | ||
import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; | ||
|
||
public class ClusterLabel extends GenericLabel implements EngineNodeLabel, UserModifiable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column label_key
of linkis_cg_manager_label
needs to be extended to 50
|| !StringUtils.equals( | ||
k8sMasterUrl, (String) provider.getConfigMap().get("k8sMasterUrl"))) { | ||
reloadExternalResourceAddress(provider); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KubernetesResourceRequester is singleton, there are problems with member variables of client and provider
KubernetesResource kubernetesResource = (KubernetesResource) requestResource; | ||
KubernetesResource kubernetesResourceAvailable = (KubernetesResource) availableResource; | ||
KubernetesResource kubernetesResourceMax = (KubernetesResource) maxResource; | ||
if (kubernetesResource.getCores() > kubernetesResource.getCores()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The right thing to do is kubernetesResource.getCores() > kubernetesResourceAvailable.getCores()
} | ||
|
||
public KubernetesResource() { | ||
this(Long.MAX_VALUE, Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add default value of namespace
engineResourceRequest: EngineResourceRequest | ||
): Resource = { | ||
val clusterLabel = LabelUtil.getLabelFromList[ClusterLabel](engineResourceRequest.labels) | ||
if (clusterLabel != null && clusterLabel.getClusterType == "K8S") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change '==' to equals
, and use toUpperCase
to be compatible with string matching
engineResourceRequest: EngineResourceRequest | ||
): Resource = { | ||
val clusterLabel = LabelUtil.getLabelFromList[ClusterLabel](engineResourceRequest.labels) | ||
if (clusterLabel != null && clusterLabel.getStringValue.startsWith("K8S")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use toUpperCase
to be compatible with string matching
executorMemoryWithUnit | ||
) * executorNum + ByteTimeUtils.byteStringAsBytes(driverMemoryWithUnit) | ||
val totalExecutorCores = executorCores * executorNum + driverCores | ||
// logger.info(s"总需求内存:$totalExecutorMemory, 总需求核数$totalExecutorCores") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use English
kubernetesResource.getCores(), | ||
kubernetesResourceAvailable.getCores(), | ||
kubernetesResourceMax.getCores())); | ||
} else if (kubernetesResource.getMemory() > kubernetesResource.getMemory()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to "kubernetesResource.getMemory() > kubernetesResourceAvailable.getMemory()"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
<dependency> | ||
<groupId>io.fabric8</groupId> | ||
<artifactId>kubernetes-model-core</artifactId> | ||
<version>5.4.1</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use props value?
ALTER TABLE `linkis_cg_ec_resource_info_record` MODIFY COLUMN metrics TEXT DEFAULT NULL COMMENT 'ec metrics'; | ||
ALTER TABLE `linkis_cg_manager_label` MODIFY COLUMN label_key varchar(50); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to move 1.5.0_schema
@@ -418,7 +418,6 @@ | |||
<artifactId>kubernetes-model-core</artifactId> | |||
<version>5.4.1</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use props value?
@@ -31,4 +31,22 @@ object SparkResourceConfiguration { | |||
val LINKIS_SPARK_EXECUTOR_INSTANCES = CommonVars[Int]("spark.executor.instances", 3) | |||
val LINKIS_QUEUE_NAME = CommonVars[String]("wds.linkis.rm.yarnqueue", "default") | |||
|
|||
val LINKIS_SPARK_KUBERNETES_NAMESPACE = | |||
CommonVars[String]("spark.kubernetes.namespace", "default") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
): Resource = { | ||
val clusterLabel = LabelUtil.getLabelFromList[ClusterLabel](engineResourceRequest.labels) | ||
if ( | ||
clusterLabel != null && StringUtils.equals(clusterLabel.getClusterType.toUpperCase(), "K8S") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"k8s" should defined constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
What is the purpose of the change
Enable k8s cluster recognition and k8s resource management.
Brief change log
Checklist