Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Inject container status and task relocation data in the job event str…
Browse files Browse the repository at this point in the history
…eam (#1185)
  • Loading branch information
tbak authored Nov 22, 2021
1 parent 1452c3b commit 46979cd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import com.netflix.titus.grpc.protogen.TaskQueryResult;
import com.netflix.titus.grpc.protogen.TaskStatus;
import com.netflix.titus.runtime.connector.GrpcRequestConfiguration;
import com.netflix.titus.runtime.connector.kubernetes.KubeApiFacade;
import com.netflix.titus.runtime.endpoint.JobQueryCriteria;
import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters;
import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobQueryModelConverters;
Expand Down Expand Up @@ -294,17 +293,21 @@ public Observable<TaskQueryResult> findTasks(TaskQuery taskQuery, CallMetadata c
@Override
public Observable<JobChangeNotification> observeJob(String jobId, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "observeJob", callMetadata)) {
return localCacheQueryProcessor.syncCache("observeJob", JobChangeNotification.class).concatWith(localCacheQueryProcessor.observeJob(jobId));
return localCacheQueryProcessor.syncCache("observeJob", JobChangeNotification.class)
.concatWith(localCacheQueryProcessor.observeJob(jobId))
.map(taskRelocationDataInjector::injectIntoTaskUpdateEvent);
}
return super.observeJob(jobId, callMetadata);
return super.observeJob(jobId, callMetadata).map(taskRelocationDataInjector::injectIntoTaskUpdateEvent);
}

@Override
public Observable<JobChangeNotification> observeJobs(ObserveJobsQuery query, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(query.getFilteringCriteriaMap(), "observeJobs", callMetadata)) {
return localCacheQueryProcessor.syncCache("observeJobs", JobChangeNotification.class).concatWith(localCacheQueryProcessor.observeJobs(query));
return localCacheQueryProcessor.syncCache("observeJobs", JobChangeNotification.class)
.concatWith(localCacheQueryProcessor.observeJobs(query))
.map(taskRelocationDataInjector::injectIntoTaskUpdateEvent);
}
return super.observeJobs(query, callMetadata);
return super.observeJobs(query, callMetadata).map(taskRelocationDataInjector::injectIntoTaskUpdateEvent);
}

private Observable<TaskQueryResult> newActiveTaskQueryAction(TaskQuery taskQuery, CallMetadata callMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.titus.common.util.ExceptionExt;
import com.netflix.titus.common.util.rx.ReactorExt;
import com.netflix.titus.gateway.kubernetes.KubeApiConnector;
import com.netflix.titus.grpc.protogen.JobChangeNotification;
import com.netflix.titus.grpc.protogen.MigrationDetails;
import com.netflix.titus.grpc.protogen.Task;
import com.netflix.titus.grpc.protogen.TaskQueryResult;
Expand Down Expand Up @@ -95,10 +96,32 @@ class TaskRelocationDataInjector {
this.scheduler = scheduler;
}

JobChangeNotification injectIntoTaskUpdateEvent(JobChangeNotification event) {
if (event.getNotificationCase() != JobChangeNotification.NotificationCase.TASKUPDATE) {
return event;
}

Task updatedTask = event.getTaskUpdate().getTask();
if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
updatedTask = newTaskWithContainerState(updatedTask);
}
if (featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()) {
updatedTask = newTaskWithRelocationPlan(updatedTask, relocationDataReplicator.getCurrent().getPlans().get(updatedTask.getId()));
}

// Nothing changed so return the input event.
if (updatedTask == event.getTaskUpdate().getTask()) {
return event;
}
return event.toBuilder().setTaskUpdate(
event.getTaskUpdate().toBuilder().setTask(updatedTask).build()
).build();
}

Observable<Task> injectIntoTask(String taskId, Observable<Task> taskObservable) {
Observable<Task> taskObservableWithContainerState = taskObservable;

if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
taskObservableWithContainerState = taskObservable.map(this::newTaskWithContainerState);
}

Expand All @@ -125,7 +148,7 @@ Observable<Task> injectIntoTask(String taskId, Observable<Task> taskObservable)
Observable<TaskQueryResult> injectIntoTaskQueryResult(Observable<TaskQueryResult> tasksObservable) {
Observable<TaskQueryResult> tasksObservableWithContainerState = tasksObservable;

if(featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
if (featureActivationConfiguration.isInjectingContainerStatesEnabled()) {
tasksObservableWithContainerState.flatMap(queryResult -> {
List<Task> newTaskList = queryResult.getItemsList().stream()
.map(task -> newTaskWithContainerState(task))
Expand Down Expand Up @@ -182,11 +205,11 @@ private long getTaskRelocationTimeout() {

private Task newTaskWithContainerState(Task task) {
return task.toBuilder().setStatus(TaskStatus.newBuilder()
.addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build();
.addAllContainerState(kubeApiConnector.getContainerState(task.getId()))).build();
}

static Task newTaskWithRelocationPlan(Task task, TaskRelocationPlan relocationPlan) {
if(relocationPlan == null) {
if (relocationPlan == null) {
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.runtime.TitusRuntimes;
import com.netflix.titus.gateway.kubernetes.KubeApiConnector;
import com.netflix.titus.runtime.connector.relocation.RelocationDataReplicator;
import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters;
import com.netflix.titus.runtime.jobmanager.JobManagerConfiguration;
import com.netflix.titus.grpc.protogen.JobChangeNotification;
import com.netflix.titus.grpc.protogen.MigrationDetails;
import com.netflix.titus.grpc.protogen.Task;
import com.netflix.titus.grpc.protogen.TaskQueryResult;
import com.netflix.titus.runtime.connector.GrpcClientConfiguration;
import com.netflix.titus.runtime.connector.relocation.RelocationDataReplicator;
import com.netflix.titus.runtime.connector.relocation.RelocationServiceClient;
import com.netflix.titus.runtime.connector.relocation.TaskRelocationSnapshot;
import com.netflix.titus.runtime.endpoint.common.EmptyLogStorageInfo;
import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters;
import com.netflix.titus.runtime.jobmanager.JobManagerConfiguration;
import com.netflix.titus.testkit.model.job.JobGenerator;
import com.netflix.titus.testkit.rx.ExtTestSubscriber;
import org.junit.Before;
Expand Down Expand Up @@ -88,6 +90,37 @@ public void setUp() {
when(featureActivationConfiguration.isMergingTaskMigrationPlanInGatewayEnabled()).thenReturn(true);
}

@Test
public void testTaskUpdateEventWithRelocationDeadline() {
long deadlineTimestamp = titusRuntime.getClock().wallTime() + 1_000;

when(relocationDataReplicator.getCurrent()).thenReturn(
newRelocationSnapshot(newRelocationPlan(TASK1, deadlineTimestamp))
);

JobChangeNotification event = JobChangeNotification.newBuilder()
.setTaskUpdate(JobChangeNotification.TaskUpdate.newBuilder().setTask(TASK1).build())
.build();
JobChangeNotification updatedEvent = taskRelocationDataInjector.injectIntoTaskUpdateEvent(event);
Task merged = updatedEvent.getTaskUpdate().getTask();
assertThat(merged.getMigrationDetails().getNeedsMigration()).isTrue();
assertThat(merged.getMigrationDetails().getDeadline()).isEqualTo(deadlineTimestamp);
}

@Test
public void testTaskUpdateEventWithoutRelocationDeadline() {
when(relocationDataReplicator.getCurrent()).thenReturn(TaskRelocationSnapshot.empty());
JobChangeNotification event = JobChangeNotification.newBuilder()
.setTaskUpdate(JobChangeNotification.TaskUpdate.newBuilder().setTask(TASK1).build())
.build();
JobChangeNotification updatedEvent = taskRelocationDataInjector.injectIntoTaskUpdateEvent(event);
assertThat(updatedEvent).isEqualTo(event);
}

private TaskRelocationSnapshot newRelocationSnapshot(TaskRelocationPlan plan) {
return TaskRelocationSnapshot.newBuilder().addPlan(plan).build();
}

@Test
public void testFindTaskWithRelocationDeadline() {
long deadlineTimestamp = titusRuntime.getClock().wallTime() + 1_000;
Expand Down

0 comments on commit 46979cd

Please sign in to comment.