Skip to content

Commit

Permalink
feat: "spark-submit jar on k8s" related label and resource expansion (#…
Browse files Browse the repository at this point in the history
…4784)

* feat: add k8sCluster label recognition

* feat: add kubernetes resource related basic classes

* feat: add kubernetes resource related classes

* fix: supplement third dependency

* feat: k8s resource support different namespace

* fix: fix some basic problems

* feat: extend the length of column label_key of linkis_cg_manager_label to 50

* fix: format code according to scala style

* feat: KubernetesResourceRequester supports multiple providers

* fix: fix wrong variable use

* feat: adjust constants

* feat: adjust constants

* fix: format code according to scala style
  • Loading branch information
lenoxzhao authored Aug 5, 2023
1 parent bbcb518 commit 958c494
Show file tree
Hide file tree
Showing 35 changed files with 1,076 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
public class FitterUtils {

public static boolean isOption(final String arg) {
return arg.matches("-[a-zA-Z-]+");
return arg.matches("-[0-9a-zA-Z-]+");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class UniversalCmdTemplate extends AbstractCmdTemplate implements Cloneab
option(
CliKeys.JOB_LABEL,
CliKeys.JOB_LABEL_CLUSTER,
new String[] {"-yarnCluster"},
new String[] {"-yarnCluster", "-k8sCluster"},
"specify linkis yarn cluster for this job",
true,
"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,31 @@
<version>${gson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes-client.version}</version>
<exclusions>
<exclusion>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-common</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ public enum RMErrorCode implements LinkisErrorCode {

CLUSTER_QUEUE_INSTANCES_INSUFFICIENT(12012, "Insufficient cluster queue instance(集群队列实例不足)"),

NAMESPACE_MEMORY_INSUFFICIENT(12100, "Insufficient cluster namespace memory(命名空间内存不足)"),

NAMESPACE_CPU_INSUFFICIENT(12101, "Insufficient cluster namespace cpu(命名空间cpu不足)"),

NAMESPACE_MISMATCHED(12102, "Mismatched namespace(命名空间不匹配,建议配置对应命名空间的资源)"),

KUBERNETES_NAMESPACE_MEMORY_INSUFFICIENT(
12110, "Insufficient cluster namespace memory(K8S集群命名空间内存不足)"),

KUBERNETES_NAMESPACE_CPU_INSUFFICIENT(
12111, "Insufficient cluster namespace cpu(K8S集群命名空间cpu不足)"),

KUBERNETES_UNKNOWN_RESOURCE_TYPE(12112, "Unsupported resource type(不支持的资源类型)"),

ECM_RESOURCE_INSUFFICIENT(11000, "ECM resources are insufficient(ECM 资源不足)"),

ECM_MEMORY_INSUFFICIENT(11001, "ECM memory resources are insufficient(ECM 内存资源不足)"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.external.kubernetes;

import org.apache.linkis.manager.common.entity.resource.ResourceType;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;

public class KubernetesResourceIdentifier implements ExternalResourceIdentifier {

String namespace;

public KubernetesResourceIdentifier(String namespace) {
this.namespace = namespace;
}

public String getNamespace() {
return namespace;
}

@Override
public ResourceType getResourceType() {
return ResourceType.Kubernetes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.external.kubernetes;

import org.apache.linkis.manager.common.entity.resource.*;
import org.apache.linkis.manager.rm.external.domain.ExternalAppInfo;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesResourceRequester implements ExternalResourceRequester {
private static final Logger logger = LoggerFactory.getLogger(KubernetesResourceRequester.class);
private final Map<String, DefaultKubernetesClient> clientMap = new ConcurrentHashMap<>();

@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
if (client == null) {
constructKubernetesClient(provider);
client = clientMap.get(k8sMasterUrl);
}
String namespace = ((KubernetesResourceIdentifier) identifier).getNamespace();
Pair<KubernetesResource, KubernetesResource> kubernetesResources =
getResources(client, namespace);

CommonNodeResource nodeResource = new CommonNodeResource();
nodeResource.setMaxResource(kubernetesResources.getKey());
nodeResource.setUsedResource(kubernetesResources.getValue());

return nodeResource;
}

public Pair<KubernetesResource, KubernetesResource> getResources(
DefaultKubernetesClient client, String namespace) {
long usedMemory = 0;
long allocatableMemory = 0;
long usedCPU = 0;
long allocatableCPU = 0;

List<ResourceQuota> resourceQuotaList =
client.resourceQuotas().inNamespace(namespace).list().getItems();

// Get resource from resourcequota if deployed, otherwise from node status metrics.
if (CollectionUtils.isNotEmpty(resourceQuotaList)) {
Map<String, Quantity> usedQuotaResource = resourceQuotaList.get(0).getStatus().getUsed();
usedCPU = getKubernetesCPUInMilli(usedQuotaResource);
usedMemory = getKubernetesMemoryInBytes(usedQuotaResource);
long hardMemory = Long.MAX_VALUE;
long hardCPU = Long.MAX_VALUE;
for (ResourceQuota resourceQuota : resourceQuotaList) {
Map<String, Quantity> hardResource = resourceQuota.getStatus().getHard();
long c = getKubernetesCPUInMilli(hardResource);
long m = getKubernetesMemoryInBytes(hardResource);
if (m < hardMemory) {
hardMemory = m;
}
if (c < hardCPU) {
hardCPU = c;
}
}
allocatableCPU = hardCPU;
allocatableMemory = hardMemory;
} else {
for (NodeMetrics nodeMetrics : client.top().nodes().metrics().getItems()) {
usedMemory += getKubernetesMemoryInBytes(nodeMetrics.getUsage());
usedCPU += getKubernetesCPUInMilli(nodeMetrics.getUsage());
}
for (Node node : client.nodes().list().getItems()) {
allocatableMemory += getKubernetesMemoryInBytes(node.getStatus().getAllocatable());
allocatableCPU += getKubernetesCPUInMilli(node.getStatus().getAllocatable());
}
}

logger.info(
"usedMemory: {}, usedCPU: {}, allocatableMemory: {}, allocatableCPU: {}",
usedMemory,
usedCPU,
allocatableMemory,
allocatableCPU);

return Pair.of(
new KubernetesResource(allocatableMemory, allocatableCPU, namespace),
new KubernetesResource(usedMemory, usedCPU, namespace));
}

/**
* Get the CPU in milli example: 0.5 means 500 milli 500m means 500 milli 1000000n means 1 milli
* (The cpu would be formated with "n" when query resource from node metrics by fabric8 api)
*
* @param resourceMap
* @return cpu in milli
*/
private long getKubernetesCPUInMilli(Map<String, Quantity> resourceMap) {
String cpuKey = resourceMap.containsKey("cpu") ? "cpu" : "requests.cpu";
return (long) (Quantity.getAmountInBytes(resourceMap.get(cpuKey)).doubleValue() * 1000);
}

/**
* Get the memory in bytes example: 500Ki means 500 * 1024 bytes 500Mi means 500 * 1024 * 1024
* bytes
*
* @param resourceMap
* @return memory in bytes
*/
private long getKubernetesMemoryInBytes(Map<String, Quantity> resourceMap) {
String memoryKey = resourceMap.containsKey("memory") ? "memory" : "requests.memory";
return Quantity.getAmountInBytes(resourceMap.get(memoryKey)).longValue();
}

@Override
public List<ExternalAppInfo> requestAppInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
// TODO
return null;
}

@Override
public ResourceType getResourceType() {
return ResourceType.Kubernetes;
}

@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) {
if (null != provider) {
DefaultKubernetesClient client =
clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
if (client != null) {
client.close();
}
constructKubernetesClient(provider);
}
return true;
}

private void constructKubernetesClient(ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sClientCertData = (String) provider.getConfigMap().get("k8sClientCertData");
String k8sClientKeyData = (String) provider.getConfigMap().get("k8sClientKeyData");
String k8sCaCertData = (String) provider.getConfigMap().get("k8sCaCertData");
DefaultKubernetesClient client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
clientMap.put(k8sMasterUrl, client);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.external.parser;

import org.apache.linkis.manager.common.entity.resource.ResourceType;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
import org.apache.linkis.manager.rm.external.kubernetes.KubernetesResourceIdentifier;

import java.util.Map;

public class KubernetesResourceIdentifierParser implements ExternalResourceIdentifierParser {
public static String NAMESPACE = "namespace";

@Override
public ExternalResourceIdentifier parse(Map<String, Object> identifierMap) {
return new KubernetesResourceIdentifier((String) identifierMap.get(NAMESPACE));
}

@Override
public ResourceType getResourceType() {
return ResourceType.Kubernetes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.linkis.manager.rm.external.domain.ExternalAppInfo;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
import org.apache.linkis.manager.rm.external.kubernetes.KubernetesResourceRequester;
import org.apache.linkis.manager.rm.external.parser.ExternalResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.parser.KubernetesResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
Expand Down Expand Up @@ -87,9 +89,15 @@ public List<ExternalResourceProvider> load(String resourceType) {

@Override
public void afterPropertiesSet() throws Exception {
resourceRequesters = new ExternalResourceRequester[] {new YarnResourceRequester()};

identifierParsers = new ExternalResourceIdentifierParser[] {new YarnResourceIdentifierParser()};
resourceRequesters =
new ExternalResourceRequester[] {
new YarnResourceRequester(), new KubernetesResourceRequester()
};

identifierParsers =
new ExternalResourceIdentifierParser[] {
new YarnResourceIdentifierParser(), new KubernetesResourceIdentifierParser()
};
}

@Override
Expand Down
Loading

0 comments on commit 958c494

Please sign in to comment.