Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix silent failures in dispatch loop from stalling the pipeline #32922

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ public static <T extends DataflowWorkerHarnessOptions> T initializeGlobalStateAn

@SuppressWarnings("Slf4jIllegalPassedClass")
public static void initializeLogging(Class<?> workerHarnessClass) {
/* Set up exception handling tied to the workerHarnessClass. */
// Set up exception handling for raw Threads tied to the workerHarnessClass.
// Does NOT handle exceptions thrown by threads created by Executors/ExecutorServices.
// To prevent silent/hidden exceptions, it is important to either set
// WorkerUncaughtExceptionHandler on the ThreadFactory used to create the
// Executor/ExecutorService or use util/TerminatingExecutors.
Thread.setDefaultUncaughtExceptionHandler(
new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
* This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and
* exits the application with status code 1.
*/
class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
public class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
private final JvmRuntime runtime;
private final Logger logger;

WorkerUncaughtExceptionHandler(Logger logger) {
public WorkerUncaughtExceptionHandler(Logger logger) {
this(JvmRuntime.INSTANCE, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.api.services.dataflow.model.WorkItem;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,6 +34,7 @@
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.BackOff;
Expand Down Expand Up @@ -97,8 +97,8 @@ public static StreamingEngineComputationConfigFetcher create(
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
new StreamingGlobalConfigHandleImpl(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()));
TerminatingExecutors.newSingleThreadedScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME), LOG));
}

@VisibleForTesting
Expand Down Expand Up @@ -157,6 +157,19 @@ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
}
}

private static Optional<ComputationConfig> createComputationConfig(StreamingConfigTask config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we revert the unrelated change, to keep the diff small and on topic?

return Optional.ofNullable(config.getStreamingComputationConfigs())
.map(Iterables::getOnlyElement)
.map(
streamingComputationConfig ->
ComputationConfig.create(
createMapTask(streamingComputationConfig),
streamingComputationConfig.getTransformUserNameToStateFamily(),
config.getUserStepToStateFamilyNameMap() != null
? config.getUserStepToStateFamilyNameMap()
: ImmutableMap.of()));
}

private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) {
StreamingGlobalConfig.Builder pipelineConfig = StreamingGlobalConfig.builder();
OperationalLimits.Builder operationalLimits = OperationalLimits.builder();
Expand Down Expand Up @@ -215,19 +228,6 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) {
return pipelineConfig.build();
}

private static Optional<ComputationConfig> createComputationConfig(StreamingConfigTask config) {
return Optional.ofNullable(config.getStreamingComputationConfigs())
.map(Iterables::getOnlyElement)
.map(
streamingComputationConfig ->
ComputationConfig.create(
createMapTask(streamingComputationConfig),
streamingComputationConfig.getTransformUserNameToStateFamily(),
config.getUserStepToStateFamilyNameMap() != null
? config.getUserStepToStateFamilyNameMap()
: ImmutableMap.of()));
}

@Override
public void start() {
fetchInitialPipelineGlobalConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -39,6 +38,7 @@
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
Expand Down Expand Up @@ -134,11 +134,11 @@ private FanOutStreamingEngineWorkerHarness(
this.dispatcherClient = dispatcherClient;
this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
this.windmillStreamManager =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
TerminatingExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME), LOG);
this.workerMetadataConsumer =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
TerminatingExecutors.newSingleThreadedExecutor(
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME), LOG);
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
this.activeMetadataVersion = Long.MIN_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
import com.google.auto.value.AutoOneOf;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
Expand Down Expand Up @@ -82,12 +84,13 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
this.waitForResources = waitForResources;
this.computationStateFetcher = computationStateFetcher;
this.workProviderExecutor =
Executors.newSingleThreadScheduledExecutor(
TerminatingExecutors.newSingleThreadedExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.setNameFormat("DispatchThread")
.build());
.setNameFormat("DispatchThread"),
LOG);

this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
}
Expand All @@ -103,11 +106,22 @@ public void start() {
"Multiple calls to {}.start() are not allowed.",
getClass());
workCommitter.start();
workProviderExecutor.execute(
() -> {
getDispatchLoop().run();
LOG.info("Dispatch done");
});
while (isRunning.get()) {
Future<?> dispatchLoopFuture =
workProviderExecutor.submit(
() -> {
getDispatchLoop().run();
LOG.info("Dispatch done");
});

try {
dispatchLoopFuture.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is changing start() to be blocking.

If we want that we shoudl have clearer method name and also just get rid of the workProviderExecutor and use this thread directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new AssertionError("GetWork failed with error.", e);
}
}
}

private Runnable getDispatchLoop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -42,6 +41,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -116,8 +116,8 @@ public final class StreamingWorkerStatusPages {
public static StreamingWorkerStatusPages.Builder builder() {
return new AutoBuilder_StreamingWorkerStatusPages_Builder()
.setStatusPageDumper(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR).build()));
TerminatingExecutors.newSingleThreadedScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR), LOG));
}

public void start(DataflowWorkerHarnessOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -51,6 +50,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -151,8 +151,8 @@ public static StreamingWorkerStatusReporter create(
memoryMonitor,
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
TerminatingExecutors.newSingleThreadedScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName), LOG),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}
Expand Down Expand Up @@ -228,6 +228,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we revert the unrelated change, to keep the diff small and on topic?

// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
reportHarnessStartup();
Expand Down Expand Up @@ -276,22 +292,6 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;

/**
* Utility class for {@link java.util.concurrent.ExecutorService}s that will terminate the JVM on
* uncaught exceptions.
*
* @implNote Ensures that all threads produced by the {@link ExecutorService}s have a {@link
* WorkerUncaughtExceptionHandler} attached to prevent hidden/silent exceptions and errors.
*/
public final class TerminatingExecutors {
private TerminatingExecutors() {}

public static ExecutorService newSingleThreadedExecutor(
ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
return Executors.newSingleThreadExecutor(
terminatingThreadFactory(threadFactoryBuilder, logger));
}

public static ScheduledExecutorService newSingleThreadedScheduledExecutor(
ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
return Executors.newSingleThreadScheduledExecutor(
terminatingThreadFactory(threadFactoryBuilder, logger));
}

public static ExecutorService newCachedThreadPool(
ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
return Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, logger));
}

public static ExecutorService newFixedThreadPool(
int numThreads, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
return Executors.newFixedThreadPool(
numThreads, terminatingThreadFactory(threadFactoryBuilder, logger));
}

private static ThreadFactory terminatingThreadFactory(
ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
return threadFactoryBuilder
.setUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(logger))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is working if the executor thread is catchign the exception and putting it in the future.

I think we might need instead to wrap the executor so that scheduled tasks are wrapped and then catch exceptions and call termination method like com/google/common/util/concurrent/WrappingExecutorService.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to test to ensure that we are halting the JVM

.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
Expand Down Expand Up @@ -98,11 +98,11 @@ protected AbstractWindmillStream(
String backendWorkerToken) {
this.backendWorkerToken = backendWorkerToken;
this.executor =
Executors.newSingleThreadExecutor(
TerminatingExecutors.newSingleThreadedExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(createThreadName(debugStreamType, backendWorkerToken))
.build());
.setNameFormat(createThreadName(debugStreamType, backendWorkerToken)),
LOG);
this.backoff = backoff;
this.streamRegistry = streamRegistry;
this.logEveryNStreamFailures = logEveryNStreamFailures;
Expand Down
Loading
Loading