Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Inject container states into task (#1176)
Browse files Browse the repository at this point in the history
* Inject container states into Task
  • Loading branch information
Amita Ekbote authored Nov 18, 2021
1 parent 60db019 commit a02c03d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@ public interface FeatureActivationConfiguration {
*/
@DefaultValue("true")
boolean isRelocationBinpackingEnabled();

/**
* Enable shared informer on Titus gateway
*/
@DefaultValue("false")
boolean isInjectingContainerStatesEnabled();
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,5 @@
package com.netflix.titus.gateway.kubernetes;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.netflix.titus.api.jobmanager.model.job.Job;
Expand Down Expand Up @@ -46,6 +31,21 @@
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.netflix.titus.gateway.kubernetes.F8KubeObjectFormatter.formatPodEssentials;

@Singleton
Expand Down Expand Up @@ -93,13 +93,14 @@ public List<TaskStatus.ContainerState> getContainerState(String taskId) {
ContainerStatus containerStatus = iterator.next();
ContainerState status = containerStatus.getState();
TaskStatus.ContainerState.ContainerHealth containerHealth = TaskStatus.ContainerState.ContainerHealth.Unset;
if(status.toString().equals("running")) {
if(status.getRunning() != null) {
containerHealth = TaskStatus.ContainerState.ContainerHealth.Healthy;
} else if (status.toString().equals("waiting")) {
} else if (status.getTerminated() != null) {
containerHealth = TaskStatus.ContainerState.ContainerHealth.Unhealthy;
}
containerStates.add(TaskStatus.ContainerState.newBuilder().setContainerName(containerStatus.getName())
.setContainerHealth(containerHealth).build());
containerStates.add(TaskStatus.ContainerState.newBuilder()
.setContainerName(containerStatus.getName())
.setContainerHealth(containerHealth).build());
}
return containerStates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class TaskRelocationDataInjector {
RelocationServiceClient relocationServiceClient,
RelocationDataReplicator relocationDataReplicator,
KubeApiConnector kubeApiConnector) {
this(configuration, jobManagerConfiguration, featureActivationConfiguration, relocationServiceClient, relocationDataReplicator, kubeApiConnector, Schedulers.computation());
this(configuration, jobManagerConfiguration, featureActivationConfiguration, relocationServiceClient,
relocationDataReplicator, kubeApiConnector, Schedulers.computation());
}

@VisibleForTesting
Expand All @@ -95,12 +96,18 @@ class TaskRelocationDataInjector {
}

Observable<Task> injectIntoTask(String taskId, Observable<Task> taskObservable) {
Observable<Task> taskObservableWithContainerState = taskObservable;

if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
taskObservableWithContainerState = taskObservable.map(this::newTaskWithContainerState);
}

if (!featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) {
return taskObservable;
return taskObservableWithContainerState;
}

if (shouldUseRelocationCache()) {
return taskObservable.map(task -> newTaskWithRelocationPlan(task, relocationDataReplicator.getCurrent().getPlans().get(taskId)));
return taskObservableWithContainerState.map(task -> newTaskWithRelocationPlan(task, relocationDataReplicator.getCurrent().getPlans().get(taskId)));
}

Observable<Optional<TaskRelocationPlan>> relocationPlanResolver = ReactorExt.toObservable(relocationServiceClient.findTaskRelocationPlan(taskId))
Expand All @@ -109,18 +116,29 @@ Observable<Task> injectIntoTask(String taskId, Observable<Task> taskObservable)
.onErrorReturn(e -> Optional.empty());

return Observable.zip(
taskObservable,
taskObservableWithContainerState,
relocationPlanResolver,
(task, planOpt) -> planOpt.map(plan -> newTaskWithRelocationPlan(task, plan)).orElse(task)
);
}

Observable<TaskQueryResult> injectIntoTaskQueryResult(Observable<TaskQueryResult> tasksObservable) {
Observable<TaskQueryResult> tasksObservableWithContainerState = tasksObservable;

if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
tasksObservableWithContainerState.flatMap(queryResult -> {
List<Task> newTaskList = queryResult.getItemsList().stream()
.map(task -> newTaskWithContainerState(task))
.collect(Collectors.toList());
return Observable.just(queryResult.toBuilder().clearItems().addAllItems(newTaskList).build());
});
}

if (!featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) {
return tasksObservable;
return tasksObservableWithContainerState;
}

return tasksObservable.flatMap(queryResult -> {
return tasksObservableWithContainerState.flatMap(queryResult -> {
Set<String> taskIds = queryResult.getItemsList().stream().map(Task::getId).collect(Collectors.toSet());

if (shouldUseRelocationCache()) {
Expand Down Expand Up @@ -162,9 +180,9 @@ private long getTaskRelocationTimeout() {
return (long) (configuration.getRequestTimeout() * jobManagerConfiguration.getRelocationTimeoutCoefficient());
}

static Task newTaskWithContainerState(Task task) {
private Task newTaskWithContainerState(Task task) {
return task.toBuilder().setStatus(TaskStatus.newBuilder()
.addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build();
.addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build();
}

static Task newTaskWithRelocationPlan(Task task, TaskRelocationPlan relocationPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,4 @@
public interface TitusGatewayConfiguration {
@DefaultValue("true")
boolean isProxyErrorLoggingEnabled();

/**
* Enable shared informer on Titus gateway
*/
@DefaultValue("true")
boolean isKubeSharedInformerEnabled();
}

0 comments on commit a02c03d

Please sign in to comment.