From 633eefae61239f99dd630c64737b3276ff3d3a46 Mon Sep 17 00:00:00 2001 From: tbak Date: Thu, 21 Oct 2021 14:00:50 -0700 Subject: [PATCH] Support all query types from cache in TitusGateway (#1150) --- .../v3/internal/GatewayJobServiceGateway.java | 37 ++++++-- .../v3/internal/LocalCacheQueryProcessor.java | 26 +++++- .../LocalCacheQueryProcessorTest.java | 85 +++++++++++++++++-- 3 files changed, 133 insertions(+), 15 deletions(-) diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java index 86d790f958..091a068a52 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/GatewayJobServiceGateway.java @@ -17,6 +17,7 @@ package com.netflix.titus.gateway.service.v3.internal; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -154,6 +155,14 @@ public GatewayJobServiceGateway(GrpcRequestConfiguration tunablesConfiguration, @Override public Observable findJob(String jobId, CallMetadata callMetadata) { + if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "findJob", callMetadata)) { + Job grpcJob = localCacheQueryProcessor.findJob(jobId).orElse(null); + if (grpcJob != null) { + return Observable.just(grpcJob); + } + return retrieveArchivedJob(jobId); + } + Observable observable = createRequestObservable(emitter -> { StreamObserver streamObserver = createSimpleClientResponseObserver(emitter); createWrappedStub(client, callMetadata, tunablesConfiguration.getRequestTimeoutMs()).findJob(JobId.newBuilder().setId(jobId).build(), streamObserver); @@ -193,6 +202,14 @@ public Observable findJobs(JobQuery jobQuery, CallMetadata callM @Override public Observable findTask(String taskId, CallMetadata callMetadata) { + if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "findTask", callMetadata)) { + Task grpcTask = localCacheQueryProcessor.findTask(taskId).orElse(null); + if (grpcTask != null) { + return Observable.just(grpcTask); + } + return retrieveArchivedTask(taskId); + } + Observable observable = createRequestObservable( emitter -> { StreamObserver streamObserver = createSimpleClientResponseObserver(emitter); @@ -232,14 +249,6 @@ public Observable findTasks(TaskQuery taskQuery, CallMetadata c Set taskStates = Sets.newHashSet(StringExt.splitByComma(taskQuery.getFilteringCriteriaMap().getOrDefault("taskStates", ""))); - // We use cache only if archived data is not requested to keep the implementation simple. - // TODO Support local cache access mixed with archived data. - if (localCacheQueryProcessor.canUseCache(taskQuery.getFilteringCriteriaMap(), "findTasks", callMetadata)) { - if (!taskStates.contains(TaskState.Finished.name())) { - return Observable.just(localCacheQueryProcessor.findTasks(taskQuery)); - } - } - Observable observable; if (v3JobIds.isEmpty()) { // Active task set only @@ -271,6 +280,14 @@ public Observable findTasks(TaskQuery taskQuery, CallMetadata c return taskRelocationDataInjector.injectIntoTaskQueryResult(observable.timeout(tunablesConfiguration.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)); } + @Override + public Observable observeJob(String jobId, CallMetadata callMetadata) { + if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "observeJob", callMetadata)) { + return localCacheQueryProcessor.observeJob(jobId); + } + return super.observeJob(jobId, callMetadata); + } + @Override public Observable observeJobs(ObserveJobsQuery query, CallMetadata callMetadata) { if (localCacheQueryProcessor.canUseCache(query.getFilteringCriteriaMap(), "observeJobs", callMetadata)) { @@ -280,6 +297,10 @@ public Observable observeJobs(ObserveJobsQuery query, Cal } private Observable newActiveTaskQueryAction(TaskQuery taskQuery, CallMetadata callMetadata) { + if (localCacheQueryProcessor.canUseCache(taskQuery.getFilteringCriteriaMap(), "findTasks", callMetadata)) { + return Observable.just(localCacheQueryProcessor.findTasks(taskQuery)); + } + return createRequestObservable(emitter -> { StreamObserver streamObserver = createSimpleClientResponseObserver(emitter); createWrappedStub(client, callMetadata, tunablesConfiguration.getRequestTimeoutMs()).findTasks(taskQuery, streamObserver); diff --git a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessor.java b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessor.java index 18903e0c7f..4aefa9325a 100644 --- a/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessor.java +++ b/titus-server-gateway/src/main/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessor.java @@ -46,6 +46,7 @@ import com.netflix.titus.grpc.protogen.JobDescriptor; import com.netflix.titus.grpc.protogen.JobQuery; import com.netflix.titus.grpc.protogen.JobQueryResult; +import com.netflix.titus.grpc.protogen.JobStatus; import com.netflix.titus.grpc.protogen.ObserveJobsQuery; import com.netflix.titus.grpc.protogen.Task; import com.netflix.titus.grpc.protogen.TaskQuery; @@ -144,6 +145,10 @@ public boolean canUseCache(Map queryParameters, return allow; } + public Optional findJob(String jobId) { + return jobDataReplicator.getCurrent().findJob(jobId).map(GrpcJobManagementModelConverters::toGrpcJob); + } + public JobQueryResult findJobs(JobQuery jobQuery) { JobQueryCriteria queryCriteria = GrpcJobQueryModelConverters.toJobQueryCriteria(jobQuery); Page page = toPage(jobQuery.getPage()); @@ -161,6 +166,12 @@ public JobQueryResult findJobs(JobQuery jobQuery) { .build(); } + public Optional findTask(String taskId) { + return jobDataReplicator.getCurrent() + .findTaskById(taskId) + .map(jobTaskPair -> GrpcJobManagementModelConverters.toGrpcTask(jobTaskPair.getRight(), logStorageInfo)); + } + public TaskQueryResult findTasks(TaskQuery taskQuery) { JobQueryCriteria queryCriteria = GrpcJobQueryModelConverters.toJobQueryCriteria(taskQuery); Page page = toPage(taskQuery.getPage()); @@ -178,6 +189,19 @@ public TaskQueryResult findTasks(TaskQuery taskQuery) { .build(); } + public Observable observeJob(String jobId) { + ObserveJobsQuery query = ObserveJobsQuery.newBuilder().putFilteringCriteria("jobIds", jobId).build(); + return observeJobs(query).takeUntil(this::isJobFinishedEvent); + } + + /** + * Job finished event is the last one that is emitted for every completed job. + */ + private boolean isJobFinishedEvent(JobChangeNotification event) { + return event.getNotificationCase() == JobChangeNotification.NotificationCase.JOBUPDATE && + event.getJobUpdate().getJob().getStatus().getState() == JobStatus.JobState.Finished; + } + public Observable observeJobs(ObserveJobsQuery query) { JobQueryCriteria criteria = toJobQueryCriteria(query); V3JobQueryCriteriaEvaluator jobsPredicate = new V3JobQueryCriteriaEvaluator(criteria, titusRuntime); @@ -211,7 +235,7 @@ public Observable observeJobs(ObserveJobsQuery query) { // to filter them out here. if (jobManagerEvent == JobManagerEvent.keepAliveEvent()) { // Check if staleness is not too high. - if(jobDataReplicator.getStalenessMs() > configuration.getObserveJobsStalenessDisconnectMs()) { + if (jobDataReplicator.getStalenessMs() > configuration.getObserveJobsStalenessDisconnectMs()) { return Mono.error(new StatusRuntimeException(Status.ABORTED.augmentDescription( "Data staleness in the event stream is too high. Most likely caused by connectivity issue to the downstream server." ))); diff --git a/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessorTest.java b/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessorTest.java index f2c318027f..19a4f2f9f3 100644 --- a/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessorTest.java +++ b/titus-server-gateway/src/test/java/com/netflix/titus/gateway/service/v3/internal/LocalCacheQueryProcessorTest.java @@ -16,6 +16,7 @@ package com.netflix.titus.gateway.service.v3.internal; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -103,6 +104,18 @@ public void testCanUseCacheByCallerId() { assertThat(processor.canUseCache(Collections.emptyMap(), "anything", JUNIT_CALL_METADATA)).isTrue(); } + @Test + public void testFindJob() { + Job job1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getLeft(); + Job job2 = addToJobDataReplicator(newJobAndTasks("job2", 4)).getLeft(); + + for (Job job : Arrays.asList(job1, job2)) { + com.netflix.titus.grpc.protogen.Job result = processor.findJob(job.getId()).orElse(null); + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo(job.getId()); + } + } + @Test public void testFindJobs() { Job job1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getLeft(); @@ -157,12 +170,24 @@ public void testFindTasks() { assertThat(taskIds).containsAll(expectedTaskIds); } + @Test + public void testFindTask() { + Task task1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getRight().get(0); + Task task2 = addToJobDataReplicator(newJobAndTasks("job2", 4)).getRight().get(0); + + for (Task task : Arrays.asList(task1, task2)) { + com.netflix.titus.grpc.protogen.Task result = processor.findTask(task.getId()).orElse(null); + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo(task.getId()); + } + } + @Test public void testObserveJobsEmitsEmptySnapshotIfNoJobsAreRunning() throws InterruptedException { ExtTestSubscriber subscriber = new ExtTestSubscriber<>(); processor.observeJobs(ObserveJobsQuery.getDefaultInstance()).subscribe(subscriber); - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.snapshotMarker()), Sinks.EmitFailureHandler.FAIL_FAST); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.snapshotMarker())); JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.SNAPSHOTEND); @@ -180,7 +205,7 @@ public void testObserveJobs() throws InterruptedException { // Job update event, which also triggers snapshot JobUpdateEvent jobUpdateEvent = JobUpdateEvent.newJob(job, JUNIT_CALL_METADATA); - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), jobUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), jobUpdateEvent)); JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); assertThat(receivedEvent).isNotNull(); assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE); @@ -195,7 +220,7 @@ public void testObserveJobs() throws InterruptedException { // Task update event TaskUpdateEvent taskUpdateEvent = TaskUpdateEvent.newTask(job, task1, JUNIT_CALL_METADATA); - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent)); receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); assertThat(receivedEvent).isNotNull(); assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE); @@ -203,14 +228,14 @@ public void testObserveJobs() throws InterruptedException { // Job replicator re-sends events if there is nothing new to keep the stream active. Make sure that // we filter the keep alive events. TaskUpdateEvent taskUpdateEvent2 = TaskUpdateEvent.newTask(job, task2, JUNIT_CALL_METADATA); - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.keepAliveEvent()), Sinks.EmitFailureHandler.FAIL_FAST); - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent2), Sinks.EmitFailureHandler.FAIL_FAST); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.keepAliveEvent())); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent2)); receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); assertThat(receivedEvent).isNotNull(); assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task2.getId()); // Now repeat taskUpdateEvent which this time should go through. - jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent)); receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); assertThat(receivedEvent).isNotNull(); assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task1.getId()); @@ -220,6 +245,50 @@ public void testObserveJobs() throws InterruptedException { assertThat(subscriber.getError()).isInstanceOf(RuntimeException.class); } + @Test + public void testObserveJob() throws InterruptedException { + ExtTestSubscriber subscriber = new ExtTestSubscriber<>(); + processor.observeJob("job1").subscribe(subscriber); + + Pair, List> jobAndTasks = addToJobDataReplicator(newJobAndTasks("job1", 2)); + Job job2 = addToJobDataReplicator(newJobAndTasks("job2", 2)).getLeft(); + Job job1 = jobAndTasks.getLeft(); + Task task1 = jobAndTasks.getRight().get(0); + Task task2 = jobAndTasks.getRight().get(1); + + // Job update event, which also triggers snapshot + emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobUpdateEvent.newJob(job2, JUNIT_CALL_METADATA))); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobUpdateEvent.newJob(job1, JUNIT_CALL_METADATA))); + + JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent).isNotNull(); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE); + assertThat(receivedEvent.getJobUpdate().getJob().getId()).isEqualTo(job1.getId()); + receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE); + assertThat(receivedEvent.getTaskUpdate().getTask().getJobId()).isEqualTo(job1.getId()); + receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE); + assertThat(receivedEvent.getTaskUpdate().getTask().getJobId()).isEqualTo(job1.getId()); + receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.SNAPSHOTEND); + receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE); + assertThat(receivedEvent.getJobUpdate().getJob().getId()).isEqualTo(job1.getId()); + + // Task update event + TaskUpdateEvent taskUpdateEvent = TaskUpdateEvent.newTask(job1, task1, JUNIT_CALL_METADATA); + emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent)); + receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS); + assertThat(receivedEvent).isNotNull(); + assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE); + assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task1.getId()); + + // Check that is correctly terminated + jobDataReplicatorSink.tryEmitError(new RuntimeException("simulated stream error")); + assertThat(subscriber.getError()).isInstanceOf(RuntimeException.class); + } + private Pair, List> addToJobDataReplicator(Pair, List> jobAndTasks) { JobSnapshot updated = jobDataReplicator.getCurrent().updateJob(jobAndTasks.getLeft()).orElse(jobDataReplicator.getCurrent()); for (Task task : jobAndTasks.getRight()) { @@ -237,4 +306,8 @@ private static Pair, List> newJobAndTasks(String jobId, int taskCou List tasks = (List) JobGenerator.batchTasks(job).getValues(taskCount); return Pair.of(job, tasks); } + + private void emitEvent(Pair> e) { + jobDataReplicatorSink.emitNext(e, Sinks.EmitFailureHandler.FAIL_FAST); + } } \ No newline at end of file