Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Disconnect from the job event stream if no event is received in a tim…
Browse files Browse the repository at this point in the history
…eout threshold (#1143)

* Disconnect from the job event stream if no event is received in a timeout threshold

* Add missing Guice configuration

* Fix flaky test
  • Loading branch information
tbak authored Oct 15, 2021
1 parent e63c1f2 commit d8c4365
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@
package com.netflix.titus.runtime.connector.jobmanager;

import com.netflix.titus.api.jobmanager.service.ReadOnlyJobOperations;
import com.netflix.titus.common.environment.MyEnvironment;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.archaius2.Archaius2Ext;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorConfiguration;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobManagementDataReplicationComponent {

@Bean
public JobDataReplicatorConfiguration getJobDataReplicatorConfiguration(MyEnvironment environment) {
return Archaius2Ext.newConfiguration(JobDataReplicatorConfiguration.class, environment);
}

@Bean
public JobSnapshotFactory getJobSnapshotFactory() {
return JobSnapshotFactories.newDefault();
}

@Bean
public JobDataReplicator getJobDataReplicator(JobManagementClient jobManagementClient,
public JobDataReplicator getJobDataReplicator(JobDataReplicatorConfiguration configuration,
JobManagementClient jobManagementClient,
JobSnapshotFactory jobSnapshotFactory,
TitusRuntime titusRuntime) {
return new JobDataReplicatorProvider(jobManagementClient, jobSnapshotFactory, titusRuntime).get();
return new JobDataReplicatorProvider(configuration, jobManagementClient, jobSnapshotFactory, titusRuntime).get();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package com.netflix.titus.runtime.connector.jobmanager;

import javax.inject.Singleton;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.netflix.archaius.ConfigProxyFactory;
import com.netflix.titus.api.jobmanager.service.ReadOnlyJobOperations;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorConfiguration;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorProvider;

public class JobManagerDataReplicationModule extends AbstractModule {
Expand All @@ -26,4 +31,10 @@ protected void configure() {
bind(JobDataReplicator.class).toProvider(JobDataReplicatorProvider.class);
bind(ReadOnlyJobOperations.class).to(CachedReadOnlyJobOperations.class);
}

@Provides
@Singleton
public JobDataReplicatorConfiguration getJobDataReplicatorConfiguration(ConfigProxyFactory factory) {
return factory.newProxy(JobDataReplicatorConfiguration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.netflix.titus.runtime.connector.jobmanager.replicator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -48,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

Expand All @@ -62,27 +65,32 @@ public class GrpcJobReplicatorEventStream extends AbstractReplicatorEventStream<
private final JobManagementClient client;
private final Map<String, String> filteringCriteria;
private final JobSnapshotFactory jobSnapshotFactory;
private final JobDataReplicatorConfiguration configuration;

private final ValueRangeCounter eventProcessingLatencies;
private final AtomicInteger subscriptionCounter = new AtomicInteger();

public GrpcJobReplicatorEventStream(JobManagementClient client,
JobSnapshotFactory jobSnapshotFactory,
JobDataReplicatorConfiguration configuration,
DataReplicatorMetrics metrics,
TitusRuntime titusRuntime,
Scheduler scheduler) {
this(client, Collections.emptyMap(), jobSnapshotFactory, metrics, titusRuntime, scheduler);
this(client, Collections.emptyMap(), jobSnapshotFactory, configuration, metrics, titusRuntime, scheduler);
}

public GrpcJobReplicatorEventStream(JobManagementClient client,
Map<String, String> filteringCriteria,
JobSnapshotFactory jobSnapshotFactory,
JobDataReplicatorConfiguration configuration,
DataReplicatorMetrics metrics,
TitusRuntime titusRuntime,
Scheduler scheduler) {
super(metrics, titusRuntime, scheduler);
this.client = client;
this.filteringCriteria = filteringCriteria;
this.jobSnapshotFactory = jobSnapshotFactory;
this.configuration = configuration;

PolledMeter.using(titusRuntime.getRegistry()).withName(METRICS_ROOT + "activeSubscriptions").monitorValue(subscriptionCounter);
this.eventProcessingLatencies = SpectatorExt.newValueRangeCounter(
Expand All @@ -98,7 +106,21 @@ protected Flux<ReplicatorEvent<JobSnapshot, JobManagerEvent<?>>> newConnection()
.<ReplicatorEvent<JobSnapshot, JobManagerEvent<?>>>create(sink -> {
CacheUpdater cacheUpdater = new CacheUpdater(jobSnapshotFactory, titusRuntime);
logger.info("Connecting to the job event stream (filteringCriteria={})...", filteringCriteria);
Disposable disposable = client.observeJobs(filteringCriteria).subscribe(

ConnectableFlux<JobManagerEvent<?>> connectableStream = client.observeJobs(filteringCriteria).publish();
Flux<JobManagerEvent<?>> augmentedStream;
if (configuration.isConnectionTimeoutEnabled()) {
augmentedStream = Flux.merge(
connectableStream.take(1).timeout(Duration.ofMillis(configuration.getConnectionTimeoutMs())).ignoreElements()
.onErrorMap(TimeoutException.class, error ->
new TimeoutException(String.format("No event received from stream in %sms", configuration.getConnectionTimeoutMs()))
),
connectableStream
);
} else {
augmentedStream = connectableStream;
}
Disposable disposable = augmentedStream.subscribe(
jobEvent -> {
long started = titusRuntime.getClock().wallTime();
try {
Expand All @@ -114,6 +136,7 @@ protected Flux<ReplicatorEvent<JobSnapshot, JobManagerEvent<?>>> newConnection()
() -> ExceptionExt.silent(sink::complete)
);
sink.onDispose(disposable);
connectableStream.connect();
})
.doOnSubscribe(subscription -> subscriptionCounter.incrementAndGet())
.doFinally(signal -> subscriptionCounter.decrementAndGet());
Expand All @@ -134,6 +157,20 @@ static class CacheUpdater {
}

Optional<ReplicatorEvent<JobSnapshot, JobManagerEvent<?>>> onEvent(JobManagerEvent<?> event) {
if (logger.isDebugEnabled()) {
if (event instanceof JobUpdateEvent) {
Job job = ((JobUpdateEvent) event).getCurrent();
logger.info("Received job update event: jobId={}, state={}, version={}", job.getId(), job.getStatus(), job.getVersion());
} else if (event instanceof TaskUpdateEvent) {
Task task = ((TaskUpdateEvent) event).getCurrent();
logger.info("Received task update event: taskId={}, state={}, version={}", task.getId(), task.getStatus(), task.getVersion());
} else if (event.equals(JobManagerEvent.snapshotMarker())) {
logger.info("Received snapshot marker");
} else {
logger.info("Received unrecognized event type: {}", event);
}
}

if (lastJobSnapshotRef.get() != null) {
return processCacheUpdate(event);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.runtime.connector.jobmanager.replicator;

import com.netflix.archaius.api.annotations.Configuration;
import com.netflix.archaius.api.annotations.DefaultValue;

@Configuration(prefix = "titus.connector.jobService")
public interface JobDataReplicatorConfiguration {

/**
* Set to true to enable connection timeout if the first event is not emitted in the configured amount of time.
*/
@DefaultValue("false")
boolean isConnectionTimeoutEnabled();

/**
* See {@link #isConnectionTimeoutEnabled()}.
*/
@DefaultValue("30000")
long getConnectionTimeoutMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,20 @@ public class JobDataReplicatorProvider implements Provider<JobDataReplicator> {
private final JobDataReplicatorImpl replicator;

@Inject
public JobDataReplicatorProvider(JobManagementClient client, JobSnapshotFactory jobSnapshotFactory, TitusRuntime titusRuntime) {
this(client, Collections.emptyMap(), jobSnapshotFactory, titusRuntime);
public JobDataReplicatorProvider(JobDataReplicatorConfiguration configuration,
JobManagementClient client,
JobSnapshotFactory jobSnapshotFactory,
TitusRuntime titusRuntime) {
this(configuration, client, Collections.emptyMap(), jobSnapshotFactory, titusRuntime);
}

public JobDataReplicatorProvider(JobManagementClient client,
public JobDataReplicatorProvider(JobDataReplicatorConfiguration configuration,
JobManagementClient client,
Map<String, String> filteringCriteria,
JobSnapshotFactory jobSnapshotFactory,
TitusRuntime titusRuntime) {
StreamDataReplicator<JobSnapshot, JobManagerEvent<?>> original = StreamDataReplicator.newStreamDataReplicator(
newReplicatorEventStream(client, filteringCriteria, jobSnapshotFactory, titusRuntime),
newReplicatorEventStream(configuration, client, filteringCriteria, jobSnapshotFactory, titusRuntime),
new JobDataReplicatorMetrics(JOB_REPLICATOR, titusRuntime),
titusRuntime
).blockFirst(Duration.ofMillis(JOB_BOOTSTRAP_TIMEOUT_MS));
Expand All @@ -78,14 +82,16 @@ public JobDataReplicator get() {
return replicator;
}

private static RetryableReplicatorEventStream<JobSnapshot, JobManagerEvent<?>> newReplicatorEventStream(JobManagementClient client,
private static RetryableReplicatorEventStream<JobSnapshot, JobManagerEvent<?>> newReplicatorEventStream(JobDataReplicatorConfiguration configuration,
JobManagementClient client,
Map<String, String> filteringCriteria,
JobSnapshotFactory jobSnapshotFactory,
TitusRuntime titusRuntime) {
GrpcJobReplicatorEventStream grpcEventStream = new GrpcJobReplicatorEventStream(
client,
filteringCriteria,
jobSnapshotFactory,
configuration,
new JobDataReplicatorMetrics(JOB_REPLICATOR_GRPC_STREAM, titusRuntime),
titusRuntime,
Schedulers.parallel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.titus.runtime.connector.jobmanager.JobDataReplicator;
import com.netflix.titus.runtime.connector.jobmanager.JobManagementClient;
import com.netflix.titus.runtime.connector.jobmanager.JobSnapshotFactories;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorConfiguration;
import com.netflix.titus.runtime.connector.jobmanager.replicator.JobDataReplicatorProvider;
import com.netflix.titus.testkit.model.job.JobGenerator;
import org.mockito.ArgumentMatchers;
Expand All @@ -44,6 +45,7 @@ public class StreamDataReplicatorPerf {

public static void main(String[] args) throws InterruptedException {
JobManagementClient client = Mockito.mock(JobManagementClient.class);
JobDataReplicatorConfiguration configuration = Mockito.mock(JobDataReplicatorConfiguration.class);

Mockito.when(client.observeJobs(ArgumentMatchers.any())).thenAnswer(invocation -> Flux.defer(() -> {

Expand All @@ -55,7 +57,7 @@ public static void main(String[] args) throws InterruptedException {
.concatWith(Flux.interval(Duration.ofSeconds(1)).take(1).flatMap(tick -> Flux.error(new RuntimeException("Simulated error"))));
}));

JobDataReplicator replicator = new JobDataReplicatorProvider(client, JobSnapshotFactories.newDefault(), TitusRuntimes.internal()).get();
JobDataReplicator replicator = new JobDataReplicatorProvider(configuration, client, JobSnapshotFactories.newDefault(), TitusRuntimes.internal()).get();
replicator.events().subscribe(System.out::println);

Thread.sleep(3600_000);
Expand Down
Loading

0 comments on commit d8c4365

Please sign in to comment.