Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Support all query types from cache in TitusGateway (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbak authored Oct 21, 2021
1 parent a796cb8 commit 633eefa
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.titus.gateway.service.v3.internal;


import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -154,6 +155,14 @@ public GatewayJobServiceGateway(GrpcRequestConfiguration tunablesConfiguration,

@Override
public Observable<Job> findJob(String jobId, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "findJob", callMetadata)) {
Job grpcJob = localCacheQueryProcessor.findJob(jobId).orElse(null);
if (grpcJob != null) {
return Observable.just(grpcJob);
}
return retrieveArchivedJob(jobId);
}

Observable<Job> observable = createRequestObservable(emitter -> {
StreamObserver<Job> streamObserver = createSimpleClientResponseObserver(emitter);
createWrappedStub(client, callMetadata, tunablesConfiguration.getRequestTimeoutMs()).findJob(JobId.newBuilder().setId(jobId).build(), streamObserver);
Expand Down Expand Up @@ -193,6 +202,14 @@ public Observable<JobQueryResult> findJobs(JobQuery jobQuery, CallMetadata callM

@Override
public Observable<Task> findTask(String taskId, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "findTask", callMetadata)) {
Task grpcTask = localCacheQueryProcessor.findTask(taskId).orElse(null);
if (grpcTask != null) {
return Observable.just(grpcTask);
}
return retrieveArchivedTask(taskId);
}

Observable<Task> observable = createRequestObservable(
emitter -> {
StreamObserver<Task> streamObserver = createSimpleClientResponseObserver(emitter);
Expand Down Expand Up @@ -232,14 +249,6 @@ public Observable<TaskQueryResult> findTasks(TaskQuery taskQuery, CallMetadata c

Set<String> taskStates = Sets.newHashSet(StringExt.splitByComma(taskQuery.getFilteringCriteriaMap().getOrDefault("taskStates", "")));

// We use cache only if archived data is not requested to keep the implementation simple.
// TODO Support local cache access mixed with archived data.
if (localCacheQueryProcessor.canUseCache(taskQuery.getFilteringCriteriaMap(), "findTasks", callMetadata)) {
if (!taskStates.contains(TaskState.Finished.name())) {
return Observable.just(localCacheQueryProcessor.findTasks(taskQuery));
}
}

Observable<TaskQueryResult> observable;
if (v3JobIds.isEmpty()) {
// Active task set only
Expand Down Expand Up @@ -271,6 +280,14 @@ public Observable<TaskQueryResult> findTasks(TaskQuery taskQuery, CallMetadata c
return taskRelocationDataInjector.injectIntoTaskQueryResult(observable.timeout(tunablesConfiguration.getRequestTimeoutMs(), TimeUnit.MILLISECONDS));
}

@Override
public Observable<JobChangeNotification> observeJob(String jobId, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(Collections.emptyMap(), "observeJob", callMetadata)) {
return localCacheQueryProcessor.observeJob(jobId);
}
return super.observeJob(jobId, callMetadata);
}

@Override
public Observable<JobChangeNotification> observeJobs(ObserveJobsQuery query, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(query.getFilteringCriteriaMap(), "observeJobs", callMetadata)) {
Expand All @@ -280,6 +297,10 @@ public Observable<JobChangeNotification> observeJobs(ObserveJobsQuery query, Cal
}

private Observable<TaskQueryResult> newActiveTaskQueryAction(TaskQuery taskQuery, CallMetadata callMetadata) {
if (localCacheQueryProcessor.canUseCache(taskQuery.getFilteringCriteriaMap(), "findTasks", callMetadata)) {
return Observable.just(localCacheQueryProcessor.findTasks(taskQuery));
}

return createRequestObservable(emitter -> {
StreamObserver<TaskQueryResult> streamObserver = createSimpleClientResponseObserver(emitter);
createWrappedStub(client, callMetadata, tunablesConfiguration.getRequestTimeoutMs()).findTasks(taskQuery, streamObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.netflix.titus.grpc.protogen.JobDescriptor;
import com.netflix.titus.grpc.protogen.JobQuery;
import com.netflix.titus.grpc.protogen.JobQueryResult;
import com.netflix.titus.grpc.protogen.JobStatus;
import com.netflix.titus.grpc.protogen.ObserveJobsQuery;
import com.netflix.titus.grpc.protogen.Task;
import com.netflix.titus.grpc.protogen.TaskQuery;
Expand Down Expand Up @@ -144,6 +145,10 @@ public boolean canUseCache(Map<String, String> queryParameters,
return allow;
}

public Optional<com.netflix.titus.grpc.protogen.Job> findJob(String jobId) {
return jobDataReplicator.getCurrent().findJob(jobId).map(GrpcJobManagementModelConverters::toGrpcJob);
}

public JobQueryResult findJobs(JobQuery jobQuery) {
JobQueryCriteria<TaskStatus.TaskState, JobDescriptor.JobSpecCase> queryCriteria = GrpcJobQueryModelConverters.toJobQueryCriteria(jobQuery);
Page page = toPage(jobQuery.getPage());
Expand All @@ -161,6 +166,12 @@ public JobQueryResult findJobs(JobQuery jobQuery) {
.build();
}

public Optional<Task> findTask(String taskId) {
return jobDataReplicator.getCurrent()
.findTaskById(taskId)
.map(jobTaskPair -> GrpcJobManagementModelConverters.toGrpcTask(jobTaskPair.getRight(), logStorageInfo));
}

public TaskQueryResult findTasks(TaskQuery taskQuery) {
JobQueryCriteria<TaskStatus.TaskState, JobDescriptor.JobSpecCase> queryCriteria = GrpcJobQueryModelConverters.toJobQueryCriteria(taskQuery);
Page page = toPage(taskQuery.getPage());
Expand All @@ -178,6 +189,19 @@ public TaskQueryResult findTasks(TaskQuery taskQuery) {
.build();
}

public Observable<JobChangeNotification> observeJob(String jobId) {
ObserveJobsQuery query = ObserveJobsQuery.newBuilder().putFilteringCriteria("jobIds", jobId).build();
return observeJobs(query).takeUntil(this::isJobFinishedEvent);
}

/**
* Job finished event is the last one that is emitted for every completed job.
*/
private boolean isJobFinishedEvent(JobChangeNotification event) {
return event.getNotificationCase() == JobChangeNotification.NotificationCase.JOBUPDATE &&
event.getJobUpdate().getJob().getStatus().getState() == JobStatus.JobState.Finished;
}

public Observable<JobChangeNotification> observeJobs(ObserveJobsQuery query) {
JobQueryCriteria<TaskStatus.TaskState, JobDescriptor.JobSpecCase> criteria = toJobQueryCriteria(query);
V3JobQueryCriteriaEvaluator jobsPredicate = new V3JobQueryCriteriaEvaluator(criteria, titusRuntime);
Expand Down Expand Up @@ -211,7 +235,7 @@ public Observable<JobChangeNotification> observeJobs(ObserveJobsQuery query) {
// to filter them out here.
if (jobManagerEvent == JobManagerEvent.keepAliveEvent()) {
// Check if staleness is not too high.
if(jobDataReplicator.getStalenessMs() > configuration.getObserveJobsStalenessDisconnectMs()) {
if (jobDataReplicator.getStalenessMs() > configuration.getObserveJobsStalenessDisconnectMs()) {
return Mono.error(new StatusRuntimeException(Status.ABORTED.augmentDescription(
"Data staleness in the event stream is too high. Most likely caused by connectivity issue to the downstream server."
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.titus.gateway.service.v3.internal;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,6 +104,18 @@ public void testCanUseCacheByCallerId() {
assertThat(processor.canUseCache(Collections.emptyMap(), "anything", JUNIT_CALL_METADATA)).isTrue();
}

@Test
public void testFindJob() {
Job<?> job1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getLeft();
Job<?> job2 = addToJobDataReplicator(newJobAndTasks("job2", 4)).getLeft();

for (Job<?> job : Arrays.asList(job1, job2)) {
com.netflix.titus.grpc.protogen.Job result = processor.findJob(job.getId()).orElse(null);
assertThat(result).isNotNull();
assertThat(result.getId()).isEqualTo(job.getId());
}
}

@Test
public void testFindJobs() {
Job<?> job1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getLeft();
Expand Down Expand Up @@ -157,12 +170,24 @@ public void testFindTasks() {
assertThat(taskIds).containsAll(expectedTaskIds);
}

@Test
public void testFindTask() {
Task task1 = addToJobDataReplicator(newJobAndTasks("job1", 2)).getRight().get(0);
Task task2 = addToJobDataReplicator(newJobAndTasks("job2", 4)).getRight().get(0);

for (Task task : Arrays.asList(task1, task2)) {
com.netflix.titus.grpc.protogen.Task result = processor.findTask(task.getId()).orElse(null);
assertThat(result).isNotNull();
assertThat(result.getId()).isEqualTo(task.getId());
}
}

@Test
public void testObserveJobsEmitsEmptySnapshotIfNoJobsAreRunning() throws InterruptedException {
ExtTestSubscriber<JobChangeNotification> subscriber = new ExtTestSubscriber<>();
processor.observeJobs(ObserveJobsQuery.getDefaultInstance()).subscribe(subscriber);

jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.snapshotMarker()), Sinks.EmitFailureHandler.FAIL_FAST);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.snapshotMarker()));

JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.SNAPSHOTEND);
Expand All @@ -180,7 +205,7 @@ public void testObserveJobs() throws InterruptedException {

// Job update event, which also triggers snapshot
JobUpdateEvent jobUpdateEvent = JobUpdateEvent.newJob(job, JUNIT_CALL_METADATA);
jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), jobUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), jobUpdateEvent));
JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE);
Expand All @@ -195,22 +220,22 @@ public void testObserveJobs() throws InterruptedException {

// Task update event
TaskUpdateEvent taskUpdateEvent = TaskUpdateEvent.newTask(job, task1, JUNIT_CALL_METADATA);
jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent));
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE);

// Job replicator re-sends events if there is nothing new to keep the stream active. Make sure that
// we filter the keep alive events.
TaskUpdateEvent taskUpdateEvent2 = TaskUpdateEvent.newTask(job, task2, JUNIT_CALL_METADATA);
jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.keepAliveEvent()), Sinks.EmitFailureHandler.FAIL_FAST);
jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent2), Sinks.EmitFailureHandler.FAIL_FAST);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobManagerEvent.keepAliveEvent()));
emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent2));
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task2.getId());

// Now repeat taskUpdateEvent which this time should go through.
jobDataReplicatorSink.emitNext(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent), Sinks.EmitFailureHandler.FAIL_FAST);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent));
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task1.getId());
Expand All @@ -220,6 +245,50 @@ public void testObserveJobs() throws InterruptedException {
assertThat(subscriber.getError()).isInstanceOf(RuntimeException.class);
}

@Test
public void testObserveJob() throws InterruptedException {
ExtTestSubscriber<JobChangeNotification> subscriber = new ExtTestSubscriber<>();
processor.observeJob("job1").subscribe(subscriber);

Pair<Job<?>, List<Task>> jobAndTasks = addToJobDataReplicator(newJobAndTasks("job1", 2));
Job<?> job2 = addToJobDataReplicator(newJobAndTasks("job2", 2)).getLeft();
Job<?> job1 = jobAndTasks.getLeft();
Task task1 = jobAndTasks.getRight().get(0);
Task task2 = jobAndTasks.getRight().get(1);

// Job update event, which also triggers snapshot
emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobUpdateEvent.newJob(job2, JUNIT_CALL_METADATA)));
emitEvent(Pair.of(jobDataReplicator.getCurrent(), JobUpdateEvent.newJob(job1, JUNIT_CALL_METADATA)));

JobChangeNotification receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE);
assertThat(receivedEvent.getJobUpdate().getJob().getId()).isEqualTo(job1.getId());
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE);
assertThat(receivedEvent.getTaskUpdate().getTask().getJobId()).isEqualTo(job1.getId());
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE);
assertThat(receivedEvent.getTaskUpdate().getTask().getJobId()).isEqualTo(job1.getId());
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.SNAPSHOTEND);
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.JOBUPDATE);
assertThat(receivedEvent.getJobUpdate().getJob().getId()).isEqualTo(job1.getId());

// Task update event
TaskUpdateEvent taskUpdateEvent = TaskUpdateEvent.newTask(job1, task1, JUNIT_CALL_METADATA);
emitEvent(Pair.of(jobDataReplicator.getCurrent(), taskUpdateEvent));
receivedEvent = subscriber.takeNext(30, TimeUnit.SECONDS);
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getNotificationCase()).isEqualTo(JobChangeNotification.NotificationCase.TASKUPDATE);
assertThat(receivedEvent.getTaskUpdate().getTask().getId()).isEqualTo(task1.getId());

// Check that is correctly terminated
jobDataReplicatorSink.tryEmitError(new RuntimeException("simulated stream error"));
assertThat(subscriber.getError()).isInstanceOf(RuntimeException.class);
}

private Pair<Job<?>, List<Task>> addToJobDataReplicator(Pair<Job<?>, List<Task>> jobAndTasks) {
JobSnapshot updated = jobDataReplicator.getCurrent().updateJob(jobAndTasks.getLeft()).orElse(jobDataReplicator.getCurrent());
for (Task task : jobAndTasks.getRight()) {
Expand All @@ -237,4 +306,8 @@ private static Pair<Job<?>, List<Task>> newJobAndTasks(String jobId, int taskCou
List<Task> tasks = (List) JobGenerator.batchTasks(job).getValues(taskCount);
return Pair.of(job, tasks);
}

private void emitEvent(Pair<JobSnapshot, JobManagerEvent<?>> e) {
jobDataReplicatorSink.emitNext(e, Sinks.EmitFailureHandler.FAIL_FAST);
}
}

0 comments on commit 633eefa

Please sign in to comment.