diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java index fa6236600e..3ea27b394f 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java @@ -38,6 +38,9 @@ import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import org.slf4j.Logger; @@ -152,32 +155,44 @@ public void deployCluster(String mainClass, String args, Map con } public boolean initJobId() { - SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState(); - - if (Objects.nonNull(sparkApplicationStatus)) { - this.applicationId = sparkApplicationStatus.getSparkApplicationId(); - this.jobState = - kubernetesOperatorStateConvertSparkState( - sparkApplicationStatus.getApplicationState().getState()); + try { + getKubernetesOperatorState(); + } catch (Exception e) { + try { + // Prevent watch interruption due to network interruption.Restart Watcher. + Thread.sleep(5000); + getKubernetesOperatorState(); + } catch (InterruptedException interruptedException) { + logger.error("Use k8s watch obtain the status failed"); + } } - // When the job is not finished, the appId is monitored; otherwise, the status is // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待) return null != getApplicationId() || (jobState != null && jobState.isFinal()); } - private SparkApplicationStatus getKubernetesOperatorState() { - List sparkApplicationList = - getSparkApplicationClient(client).list().getItems(); - if (CollectionUtils.isNotEmpty(sparkApplicationList)) { - for (SparkApplication sparkApplication : sparkApplicationList) { - if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace()) - && sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) { - return sparkApplication.getStatus(); - } - } - } - return null; + private void getKubernetesOperatorState() { + getSparkApplicationClient(client) + .inNamespace(this.sparkConfig.getK8sNamespace()) + .withName(this.sparkConfig.getAppName()) + .watch( + new Watcher() { + @Override + public void eventReceived(Action action, SparkApplication sparkApplication) { + // todo get status + applicationId = sparkApplication.getStatus().getSparkApplicationId(); + jobState = + kubernetesOperatorStateConvertSparkState( + sparkApplication.getStatus().getApplicationState().getState()); + } + + @Override + public void onClose(WatcherException e) { + // Invoked when the watcher closes due to an Exception.Restart Watcher. + logger.error("Use k8s watch obtain the status failed", e); + getKubernetesOperatorState(); + } + }); } public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) { @@ -216,8 +231,7 @@ public void close() { client.close(); } - public static NonNamespaceOperation< - SparkApplication, SparkApplicationList, Resource> + public static MixedOperation> getSparkApplicationClient(KubernetesClient client) { return client.customResources(SparkApplication.class, SparkApplicationList.class); }