From 46979cdce868400af9e37661700fb67ebe02eca6 Mon Sep 17 00:00:00 2001 From: tbak Date: Mon, 22 Nov 2021 10:15:45 -0800 Subject: [PATCH] Inject container status and task relocation data in the job event stream (#1185) --- .../v3/internal/GatewayJobServiceGateway.java | 13 ++++--- .../service/v3/internal/TaskDataInjector.java | 31 +++++++++++++-- .../TaskRelocationDataInjectorTest.java | 39 +++++++++++++++++-- 3 files changed, 71 insertions(+), 12 deletions(-) diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java index 9b7d4abe6e..0b84c0a3ae 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java @@ -66,7 +66,6 @@ import com.netflix.titus.grpc.protogen.TaskQueryResult; import com.netflix.titus.grpc.protogen.TaskStatus; import com.netflix.titus.runtime.connector.GrpcRequestConfiguration; -import com.netflix.titus.runtime.connector.kubernetes.KubeApiFacade; import com.netflix.titus.runtime.endpoint.JobQueryCriteria; import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters; import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobQueryModelConverters; @@ -294,17 +293,21 @@ public Observable findTasks(TaskQuery taskQuery, CallMetadata c @Override public Observable observeJob(String jobId, CallMetadata callMetadata) { if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "observeJob", callMetadata)) { - return localCacheQueryProcessor.syncCache("observeJob", JobChangeNotification.class).concatWith(localCacheQueryProcessor.observeJob(jobId)); + return localCacheQueryProcessor.syncCache("observeJob", JobChangeNotification.class) + .concatWith(localCacheQueryProcessor.observeJob(jobId)) + .map(taskRelocationDataInjector::injectIntoTaskUpdateEvent); } - return super.observeJob(jobId, callMetadata); + return super.observeJob(jobId, callMetadata).map(taskRelocationDataInjector::injectIntoTaskUpdateEvent); } @Override public Observable observeJobs(ObserveJobsQuery query, CallMetadata callMetadata) { if (localCacheQueryProcessor.canUseCache(query.getFilteringCriteriaMap(), "observeJobs", callMetadata)) { - return localCacheQueryProcessor.syncCache("observeJobs", JobChangeNotification.class).concatWith(localCacheQueryProcessor.observeJobs(query)); + return localCacheQueryProcessor.syncCache("observeJobs", JobChangeNotification.class) + .concatWith(localCacheQueryProcessor.observeJobs(query)) + .map(taskRelocationDataInjector::injectIntoTaskUpdateEvent); } - return super.observeJobs(query, callMetadata); + return super.observeJobs(query, callMetadata).map(taskRelocationDataInjector::injectIntoTaskUpdateEvent); } private Observable newActiveTaskQueryAction(TaskQuery taskQuery, CallMetadata callMetadata) { diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java index 39eda4c5d7..95b6f6bcb4 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/TaskDataInjector.java @@ -32,6 +32,7 @@ import com.netflix.titus.common.util.ExceptionExt; import com.netflix.titus.common.util.rx.ReactorExt; import com.netflix.titus.gateway.kubernetes.KubeApiConnector; +import com.netflix.titus.grpc.protogen.JobChangeNotification; import com.netflix.titus.grpc.protogen.MigrationDetails; import com.netflix.titus.grpc.protogen.Task; import com.netflix.titus.grpc.protogen.TaskQueryResult; @@ -95,10 +96,32 @@ class TaskRelocationDataInjector { this.scheduler = scheduler; } + JobChangeNotification injectIntoTaskUpdateEvent(JobChangeNotification event) { + if (event.getNotificationCase() != JobChangeNotification.NotificationCase.TASKUPDATE) { + return event; + } + + Task updatedTask = event.getTaskUpdate().getTask(); + if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) { + updatedTask = newTaskWithContainerState(updatedTask); + } + if (featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) { + updatedTask = newTaskWithRelocationPlan(updatedTask, relocationDataReplicator.getCurrent().getPlans().get(updatedTask.getId())); + } + + // Nothing changed so return the input event. + if (updatedTask == event.getTaskUpdate().getTask()) { + return event; + } + return event.toBuilder().setTaskUpdate( + event.getTaskUpdate().toBuilder().setTask(updatedTask).build() + ).build(); + } + Observable injectIntoTask(String taskId, Observable taskObservable) { Observable taskObservableWithContainerState = taskObservable; - if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) { + if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) { taskObservableWithContainerState = taskObservable.map(this::newTaskWithContainerState); } @@ -125,7 +148,7 @@ Observable injectIntoTask(String taskId, Observable taskObservable) Observable injectIntoTaskQueryResult(Observable tasksObservable) { Observable tasksObservableWithContainerState = tasksObservable; - if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) { + if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) { tasksObservableWithContainerState.flatMap(queryResult -> { List newTaskList = queryResult.getItemsList().stream() .map(task -> newTaskWithContainerState(task)) @@ -182,11 +205,11 @@ private long getTaskRelocationTimeout() { private Task newTaskWithContainerState(Task task) { return task.toBuilder().setStatus(TaskStatus.newBuilder() - .addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build(); + .addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build(); } static Task newTaskWithRelocationPlan(Task task, TaskRelocationPlan relocationPlan) { - if(relocationPlan == null) { + if (relocationPlan == null) { return task; } diff --git a/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjectorTest.java b/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjectorTest.java index 74b017920b..966962e324 100644 --- a/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjectorTest.java +++ b/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/TaskRelocationDataInjectorTest.java @@ -28,15 +28,17 @@ import com.netflix.titus.common.runtime.TitusRuntime; import com.netflix.titus.common.runtime.TitusRuntimes; import com.netflix.titus.gateway.kubernetes.KubeApiConnector; -import com.netflix.titus.runtime.connector.relocation.RelocationDataReplicator; -import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters; -import com.netflix.titus.runtime.jobmanager.JobManagerConfiguration; +import com.netflix.titus.grpc.protogen.JobChangeNotification; import com.netflix.titus.grpc.protogen.MigrationDetails; import com.netflix.titus.grpc.protogen.Task; import com.netflix.titus.grpc.protogen.TaskQueryResult; import com.netflix.titus.runtime.connector.GrpcClientConfiguration; +import com.netflix.titus.runtime.connector.relocation.RelocationDataReplicator; import com.netflix.titus.runtime.connector.relocation.RelocationServiceClient; +import com.netflix.titus.runtime.connector.relocation.TaskRelocationSnapshot; import com.netflix.titus.runtime.endpoint.common.EmptyLogStorageInfo; +import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters; +import com.netflix.titus.runtime.jobmanager.JobManagerConfiguration; import com.netflix.titus.testkit.model.job.JobGenerator; import com.netflix.titus.testkit.rx.ExtTestSubscriber; import org.junit.Before; @@ -88,6 +90,37 @@ public void setUp() { when(featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()).thenReturn(true); } + @Test + public void testTaskUpdateEventWithRelocationDeadline() { + long deadlineTimestamp = titusRuntime.getClock().wallTime() + 1_000; + + when(relocationDataReplicator.getCurrent()).thenReturn( + newRelocationSnapshot(newRelocationPlan(TASK1, deadlineTimestamp)) + ); + + JobChangeNotification event = JobChangeNotification.newBuilder() + .setTaskUpdate(JobChangeNotification.TaskUpdate.newBuilder().setTask(TASK1).build()) + .build(); + JobChangeNotification updatedEvent = taskRelocationDataInjector.injectIntoTaskUpdateEvent(event); + Task merged = updatedEvent.getTaskUpdate().getTask(); + assertThat(merged.getMigrationDetails().getNeedsMigration()).isTrue(); + assertThat(merged.getMigrationDetails().getDeadline()).isEqualTo(deadlineTimestamp); + } + + @Test + public void testTaskUpdateEventWithoutRelocationDeadline() { + when(relocationDataReplicator.getCurrent()).thenReturn(TaskRelocationSnapshot.empty()); + JobChangeNotification event = JobChangeNotification.newBuilder() + .setTaskUpdate(JobChangeNotification.TaskUpdate.newBuilder().setTask(TASK1).build()) + .build(); + JobChangeNotification updatedEvent = taskRelocationDataInjector.injectIntoTaskUpdateEvent(event); + assertThat(updatedEvent).isEqualTo(event); + } + + private TaskRelocationSnapshot newRelocationSnapshot(TaskRelocationPlan plan) { + return TaskRelocationSnapshot.newBuilder().addPlan(plan).build(); + } + @Test public void testFindTaskWithRelocationDeadline() { long deadlineTimestamp = titusRuntime.getClock().wallTime() + 1_000;