From a02c03deb63f702872df329d668831493c30f6a9 Mon Sep 17 00:00:00 2001 From: Amita Ekbote Date: Thu, 18 Nov 2021 11:15:32 -0800 Subject: [PATCH] Inject container states into task (#1176) * Inject container states into Task --- .../api/FeatureActivationConfiguration.java | 6 +++ .../gateway/kubernetes/KubeApiConnector.java | 39 ++++++++++--------- ...ataInjector.java => TaskDataInjector.java} | 34 ++++++++++++---- .../startup/TitusGatewayConfiguration.java | 6 --- 4 files changed, 52 insertions(+), 33 deletions(-) rename titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/{TaskRelocationDataInjector.java => TaskDataInjector.java} (84%) diff --git a/titus-api/src/main/java/com/netflix/titus/api/FeatureActivationConfiguration.java b/titus-api/src/main/java/com/netflix/titus/api/FeatureActivationConfiguration.java index 30ac62e9ca..feede16dde 100644 --- a/titus-api/src/main/java/com/netflix/titus/api/FeatureActivationConfiguration.java +++ b/titus-api/src/main/java/com/netflix/titus/api/FeatureActivationConfiguration.java @@ -54,4 +54,10 @@ public interface FeatureActivationConfiguration { */ @DefaultValue("true") boolean isRelocationBinpackingEnabled(); + + /** + * Enable shared informer on Titus gateway + */ + @DefaultValue("false") + boolean isInjectingContainerStatesEnabled(); } diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/kubernetes/KubeApiConnector.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/kubernetes/KubeApiConnector.java index ab6816d06f..e8deb23f8a 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/kubernetes/KubeApiConnector.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/kubernetes/KubeApiConnector.java @@ -1,20 +1,5 @@ package com.netflix.titus.gateway.kubernetes; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.ListIterator; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.PreDestroy; -import javax.inject.Inject; -import javax.inject.Singleton; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.netflix.titus.api.jobmanager.model.job.Job; @@ -46,6 +31,21 @@ import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import static com.netflix.titus.gateway.kubernetes.F8KubeObjectFormatter.formatPodEssentials; @Singleton @@ -93,13 +93,14 @@ public List getContainerState(String taskId) { ContainerStatus containerStatus = iterator.next(); ContainerState status = containerStatus.getState(); TaskStatus.ContainerState.ContainerHealth containerHealth = TaskStatus.ContainerState.ContainerHealth.Unset; - if(status.toString().equals("running")) { + if(status.getRunning() != null) { containerHealth = TaskStatus.ContainerState.ContainerHealth.Healthy; - } else if (status.toString().equals("waiting")) { + } else if (status.getTerminated() != null) { containerHealth = TaskStatus.ContainerState.ContainerHealth.Unhealthy; } - containerStates.add(TaskStatus.ContainerState.newBuilder().setContainerName(containerStatus.getName()) - .setContainerHealth(containerHealth).build()); + containerStates.add(TaskStatus.ContainerState.newBuilder() + .setContainerName(containerStatus.getName()) + .setContainerHealth(containerHealth).build()); } return containerStates; } diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjector.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java similarity index 84% rename from titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjector.java rename to titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java index 163966d4a8..39eda4c5d7 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjector.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java @@ -73,7 +73,8 @@ class TaskRelocationDataInjector { RelocationServiceClient relocationServiceClient, RelocationDataReplicator relocationDataReplicator, KubeApiConnector kubeApiConnector) { - this(configuration, jobManagerConfiguration, featureActivationConfiguration, relocationServiceClient, relocationDataReplicator, kubeApiConnector, Schedulers.computation()); + this(configuration, jobManagerConfiguration, featureActivationConfiguration, relocationServiceClient, + relocationDataReplicator, kubeApiConnector, Schedulers.computation()); } @VisibleForTesting @@ -95,12 +96,18 @@ class TaskRelocationDataInjector { } Observable injectIntoTask(String taskId, Observable taskObservable) { + Observable taskObservableWithContainerState = taskObservable; + + if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) { + taskObservableWithContainerState = taskObservable.map(this::newTaskWithContainerState); + } + if (!featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) { - return taskObservable; + return taskObservableWithContainerState; } if (shouldUseRelocationCache()) { - return taskObservable.map(task -> newTaskWithRelocationPlan(task, relocationDataReplicator.getCurrent().getPlans().get(taskId))); + return taskObservableWithContainerState.map(task -> newTaskWithRelocationPlan(task, relocationDataReplicator.getCurrent().getPlans().get(taskId))); } Observable> relocationPlanResolver = ReactorExt.toObservable(relocationServiceClient.findTaskRelocationPlan(taskId)) @@ -109,18 +116,29 @@ Observable injectIntoTask(String taskId, Observable taskObservable) .onErrorReturn(e -> Optional.empty()); return Observable.zip( - taskObservable, + taskObservableWithContainerState, relocationPlanResolver, (task, planOpt) -> planOpt.map(plan -> newTaskWithRelocationPlan(task, plan)).orElse(task) ); } Observable injectIntoTaskQueryResult(Observable tasksObservable) { + Observable tasksObservableWithContainerState = tasksObservable; + + if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) { + tasksObservableWithContainerState.flatMap(queryResult -> { + List newTaskList = queryResult.getItemsList().stream() + .map(task -> newTaskWithContainerState(task)) + .collect(Collectors.toList()); + return Observable.just(queryResult.toBuilder().clearItems().addAllItems(newTaskList).build()); + }); + } + if (!featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) { - return tasksObservable; + return tasksObservableWithContainerState; } - return tasksObservable.flatMap(queryResult -> { + return tasksObservableWithContainerState.flatMap(queryResult -> { Set taskIds = queryResult.getItemsList().stream().map(Task::getId).collect(Collectors.toSet()); if (shouldUseRelocationCache()) { @@ -162,9 +180,9 @@ private long getTaskRelocationTimeout() { return (long) (configuration.getRequestTimeout() * jobManagerConfiguration.getRelocationTimeoutCoefficient()); } - static Task newTaskWithContainerState(Task task) { + private Task newTaskWithContainerState(Task task) { return task.toBuilder().setStatus(TaskStatus.newBuilder() - .addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build(); + .addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build(); } static Task newTaskWithRelocationPlan(Task task, TaskRelocationPlan relocationPlan) { diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayConfiguration.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayConfiguration.java index 7d8709e4fa..a0322befea 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayConfiguration.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/startup/TitusGatewayConfiguration.java @@ -23,10 +23,4 @@ public interface TitusGatewayConfiguration { @DefaultValue("true") boolean isProxyErrorLoggingEnabled(); - - /** - * Enable shared informer on Titus gateway - */ - @DefaultValue("true") - boolean isKubeSharedInformerEnabled(); }