From 1f82ee8f934019470d729548c99329845f45f84c Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 24 Oct 2024 01:40:00 -0700 Subject: [PATCH 1/3] fix silent failures in dispatch loop from stalling the pipeline --- .../harness/SingleSourceWorkerHarness.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index bc93e6d89c41..7100ba13c7e1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -23,8 +23,10 @@ import com.google.auto.value.AutoOneOf; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -103,11 +105,25 @@ public void start() { "Multiple calls to {}.start() are not allowed.", getClass()); workCommitter.start(); - workProviderExecutor.execute( - () -> { - getDispatchLoop().run(); - LOG.info("Dispatch done"); - }); + while (isRunning.get()) { + Future dispatchLoopFuture = + workProviderExecutor.submit( + () -> { + getDispatchLoop().run(); + LOG.info("Dispatch done"); + }); + try { + dispatchLoopFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + // We can decide what we do here. + // 1. If we want to crash the worker we can throw an exception like below + // 2. or if we want to just retry/restart running the dispatch loop, we can LOG here and the + // loop will restart the dispatch loop. + throw new AssertionError(e); + } + } } private Runnable getDispatchLoop() { From d3c06ac6b8d5bca62ac7762da265a0a9ebd89b82 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 24 Oct 2024 22:15:43 -0700 Subject: [PATCH 2/3] convert uncaught exceptions thrown by executors used by streaming worker harness to crash the JVM --- .../worker/DataflowWorkerHarnessHelper.java | 6 +- .../WorkerUncaughtExceptionHandler.java | 4 +- ...reamingEngineComputationConfigFetcher.java | 32 +++---- .../FanOutStreamingEngineWorkerHarness.java | 10 +-- .../harness/SingleSourceWorkerHarness.java | 16 ++-- .../harness/StreamingWorkerStatusPages.java | 6 +- .../StreamingWorkerStatusReporter.java | 38 ++++----- .../worker/util/TerminatingExecutors.java | 67 +++++++++++++++ .../client/AbstractWindmillStream.java | 8 +- .../StreamingApplianceWorkCommitter.java | 8 +- .../commits/StreamingEngineWorkCommitter.java | 8 +- .../client/grpc/stubs/ChannelCache.java | 6 +- .../work/budget/GetWorkBudgetRefresher.java | 8 +- .../processing/StreamingWorkScheduler.java | 2 +- .../work/refresh/ActiveWorkRefresher.java | 6 +- .../SingleSourceWorkerHarnessTest.java | 84 +++++++++++++++++++ 16 files changed, 231 insertions(+), 78 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java index 94c894608a47..c8e2f9c7d275 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java @@ -82,7 +82,11 @@ public static T initializeGlobalStateAn @SuppressWarnings("Slf4jIllegalPassedClass") public static void initializeLogging(Class workerHarnessClass) { - /* Set up exception handling tied to the workerHarnessClass. */ + // Set up exception handling for raw Threads tied to the workerHarnessClass. + // Does NOT handle exceptions thrown by threads created by Executors/ExecutorServices. + // To prevent silent/hidden exceptions, it is important to either set + // WorkerUncaughtExceptionHandler on the ThreadFactory used to create the + // Executor/ExecutorService or use util/TerminatingExecutors. Thread.setDefaultUncaughtExceptionHandler( new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass))); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java index 5a8e87d23ab9..b926c8eb324e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java @@ -28,11 +28,11 @@ * This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and * exits the application with status code 1. */ -class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { +public class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { private final JvmRuntime runtime; private final Logger logger; - WorkerUncaughtExceptionHandler(Logger logger) { + public WorkerUncaughtExceptionHandler(Logger logger) { this(JvmRuntime.INSTANCE, logger); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 22b0dac6eb22..5a37a242ccf0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -26,7 +26,6 @@ import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; import java.util.Optional; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +34,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.WorkUnitClient; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.util.BackOff; @@ -97,8 +97,8 @@ public static StreamingEngineComputationConfigFetcher create( globalConfigRefreshPeriodMillis, dataflowServiceClient, new StreamingGlobalConfigHandleImpl(), - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build())); + TerminatingExecutors.newSingleThreadedScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME), LOG)); } @VisibleForTesting @@ -157,6 +157,19 @@ private static Optional fetchConfigWithRetry( } } + private static Optional createComputationConfig(StreamingConfigTask config) { + return Optional.ofNullable(config.getStreamingComputationConfigs()) + .map(Iterables::getOnlyElement) + .map( + streamingComputationConfig -> + ComputationConfig.create( + createMapTask(streamingComputationConfig), + streamingComputationConfig.getTransformUserNameToStateFamily(), + config.getUserStepToStateFamilyNameMap() != null + ? config.getUserStepToStateFamilyNameMap() + : ImmutableMap.of())); + } + private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { StreamingGlobalConfig.Builder pipelineConfig = StreamingGlobalConfig.builder(); OperationalLimits.Builder operationalLimits = OperationalLimits.builder(); @@ -215,19 +228,6 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { return pipelineConfig.build(); } - private static Optional createComputationConfig(StreamingConfigTask config) { - return Optional.ofNullable(config.getStreamingComputationConfigs()) - .map(Iterables::getOnlyElement) - .map( - streamingComputationConfig -> - ComputationConfig.create( - createMapTask(streamingComputationConfig), - streamingComputationConfig.getTransformUserNameToStateFamily(), - config.getUserStepToStateFamilyNameMap() != null - ? config.getUserStepToStateFamilyNameMap() - : ImmutableMap.of())); - } - @Override public void start() { fetchInitialPipelineGlobalConfig(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 458cf57ca8e7..f153e5fdba33 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -39,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; @@ -134,11 +134,11 @@ private FanOutStreamingEngineWorkerHarness( this.dispatcherClient = dispatcherClient; this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); this.windmillStreamManager = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build()); + TerminatingExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME), LOG); this.workerMetadataConsumer = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()); + TerminatingExecutors.newSingleThreadedExecutor( + new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME), LOG); this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; this.activeMetadataVersion = Long.MIN_VALUE; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 7100ba13c7e1..ac045dbb9990 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; @@ -84,12 +84,13 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { this.waitForResources = waitForResources; this.computationStateFetcher = computationStateFetcher; this.workProviderExecutor = - Executors.newSingleThreadScheduledExecutor( + TerminatingExecutors.newSingleThreadedExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setPriority(Thread.MIN_PRIORITY) - .setNameFormat("DispatchThread") - .build()); + .setNameFormat("DispatchThread"), + LOG); + this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; } @@ -112,16 +113,13 @@ public void start() { getDispatchLoop().run(); LOG.info("Dispatch done"); }); + try { dispatchLoopFuture.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { - // We can decide what we do here. - // 1. If we want to crash the worker we can throw an exception like below - // 2. or if we want to just retry/restart running the dispatch loop, we can LOG here and the - // loop will restart the dispatch loop. - throw new AssertionError(e); + throw new AssertionError("GetWork failed with error.", e); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 6981312eff1d..20f0d7b7ec2a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Collection; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +41,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -116,8 +116,8 @@ public final class StreamingWorkerStatusPages { public static StreamingWorkerStatusPages.Builder builder() { return new AutoBuilder_StreamingWorkerStatusPages_Builder() .setStatusPageDumper( - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR).build())); + TerminatingExecutors.newSingleThreadedScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR), LOG)); } public void start(DataflowWorkerHarnessOptions options) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index ba77d8e1ce26..a03abc4bdfea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -34,7 +34,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -51,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -151,8 +151,8 @@ public static StreamingWorkerStatusReporter create( memoryMonitor, workExecutor, threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()), + TerminatingExecutors.newSingleThreadedScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(threadName), LOG), windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis); } @@ -228,6 +228,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private static long getPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + return 0; + } + return LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + @SuppressWarnings("FutureReturnValueIgnored") public void start() { reportHarnessStartup(); @@ -276,22 +292,6 @@ private void reportHarnessStartup() { } } - // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the - // WorkerMessages RPC schedule. The desired reporting period - // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple - // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). - private static long getPerWorkerMetricsUpdateFrequency( - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - if (windmillHarnessUpdateReportingPeriodMillis == 0) { - return 0; - } - return LongMath.divide( - perWorkerMetricsUpdateReportingPeriodMillis, - windmillHarnessUpdateReportingPeriodMillis, - RoundingMode.CEILING); - } - /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java new file mode 100644 index 000000000000..e064df813392 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; + +/** + * Utility class for {@link java.util.concurrent.ExecutorService}s that will terminate the JVM on + * uncaught exceptions. + * + * @implNote Ensures that all threads produced by the {@link ExecutorService}s have a {@link + * WorkerUncaughtExceptionHandler} attached to prevent hidden/silent exceptions and errors. + */ +public final class TerminatingExecutors { + private TerminatingExecutors() {} + + public static ExecutorService newSingleThreadedExecutor( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return Executors.newSingleThreadExecutor( + terminatingThreadFactory(threadFactoryBuilder, logger)); + } + + public static ScheduledExecutorService newSingleThreadedScheduledExecutor( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return Executors.newSingleThreadScheduledExecutor( + terminatingThreadFactory(threadFactoryBuilder, logger)); + } + + public static ExecutorService newCachedThreadPool( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, logger)); + } + + public static ExecutorService newFixedThreadPool( + int numThreads, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return Executors.newFixedThreadPool( + numThreads, terminatingThreadFactory(threadFactoryBuilder, logger)); + } + + private static ThreadFactory terminatingThreadFactory( + ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return threadFactoryBuilder + .setUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(logger)) + .build(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 58aecfc71e00..628a03428755 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -22,7 +22,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status; @@ -98,11 +98,11 @@ protected AbstractWindmillStream( String backendWorkerToken) { this.backendWorkerToken = backendWorkerToken; this.executor = - Executors.newSingleThreadExecutor( + TerminatingExecutors.newSingleThreadedExecutor( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat(createThreadName(debugStreamType, backendWorkerToken)) - .build()); + .setNameFormat(createThreadName(debugStreamType, backendWorkerToken)), + LOG); this.backoff = backoff; this.streamRegistry = streamRegistry; this.logEveryNStreamFailures = logEveryNStreamFailures; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java index d092ebf53fc1..a41222382cb8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -29,6 +28,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; import org.apache.beam.sdk.annotations.Internal; @@ -57,12 +57,12 @@ private StreamingApplianceWorkCommitter( WeightedBoundedQueue.create( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); this.commitWorkers = - Executors.newSingleThreadScheduledExecutor( + TerminatingExecutors.newSingleThreadedExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setPriority(Thread.MAX_PRIORITY) - .setNameFormat("CommitThread-%d") - .build()); + .setNameFormat("CommitThread-%d"), + LOG); this.activeCommitBytes = new AtomicLong(); this.onCommitComplete = onCommitComplete; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java index bf1007bc4bfb..7ec95b97eead 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java @@ -19,7 +19,6 @@ import com.google.auto.value.AutoBuilder; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +28,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.sdk.annotations.Internal; @@ -67,7 +67,7 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { WeightedBoundedQueue.create( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); this.commitSenders = - Executors.newFixedThreadPool( + TerminatingExecutors.newFixedThreadPool( numCommitSenders, new ThreadFactoryBuilder() .setDaemon(true) @@ -75,8 +75,8 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { .setNameFormat( backendWorkerToken.isEmpty() ? "CommitThread-%d" - : "CommitThread-" + backendWorkerToken + "-%d") - .build()); + : "CommitThread-" + backendWorkerToken + "-%d"), + LOG); this.activeCommitBytes = new AtomicLong(); this.onCommitComplete = onCommitComplete; this.numCommitSenders = numCommitSenders; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java index db012c6bb412..affae7386b01 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java @@ -18,11 +18,11 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; import java.io.PrintWriter; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -56,8 +56,8 @@ private ChannelCache( .removalListener( RemovalListeners.asynchronous( onChannelRemoved, - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build()))) + TerminatingExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser"), LOG))) .build( new CacheLoader() { @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java index e39aa8dbc8a5..34c813ec4be1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java @@ -18,11 +18,11 @@ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.fn.stream.AdvancingPhaser; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -51,7 +51,7 @@ public GetWorkBudgetRefresher( Supplier isBudgetRefreshPaused, Runnable redistributeBudget) { this.budgetRefreshTrigger = new AdvancingPhaser(1); this.budgetRefreshExecutor = - Executors.newSingleThreadScheduledExecutor( + TerminatingExecutors.newSingleThreadedExecutor( new ThreadFactoryBuilder() .setNameFormat(BUDGET_REFRESH_THREAD) .setUncaughtExceptionHandler( @@ -59,8 +59,8 @@ public GetWorkBudgetRefresher( LOG.error( "{} failed due to uncaught exception during execution. ", t.getName(), - e)) - .build()); + e)), + LOG); this.isBudgetRefreshPaused = isBudgetRefreshPaused; this.redistributeBudget = redistributeBudget; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 9a3e6eb6b099..c74874c465a6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -70,7 +70,7 @@ */ @Internal @ThreadSafe -public final class StreamingWorkScheduler { +public class StreamingWorkScheduler { private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkScheduler.class); private final DataflowWorkerHarnessOptions options; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java index 781285def020..e23c57bda574 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -35,6 +34,7 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; @@ -81,8 +81,8 @@ public ActiveWorkRefresher( this.activeWorkRefreshExecutor = activeWorkRefreshExecutor; this.heartbeatTracker = heartbeatTracker; this.fanOutActiveWorkRefreshExecutor = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat(FAN_OUT_REFRESH_WORK_EXECUTOR_NAME).build()); + TerminatingExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(FAN_OUT_REFRESH_WORK_EXECUTOR_NAME), LOG); } @SuppressWarnings("FutureReturnValueIgnored") diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java new file mode 100644 index 000000000000..57e2d1e14e32 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; + +import java.util.Optional; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; +import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SingleSourceWorkerHarnessTest { + private final WorkCommitter workCommitter = mock(WorkCommitter.class); + private final GetDataClient getDataClient = mock(GetDataClient.class); + private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class); + private final Runnable waitForResources = () -> {}; + private final Function> computationStateFetcher = + ignored -> Optional.empty(); + private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class); + + private SingleSourceWorkerHarness createWorkerHarness( + SingleSourceWorkerHarness.GetWorkSender getWorkSender) { + return SingleSourceWorkerHarness.builder() + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setHeartbeatSender(heartbeatSender) + .setWaitForResources(waitForResources) + .setStreamingWorkScheduler(streamingWorkScheduler) + .setComputationStateFetcher(computationStateFetcher) + .setGetWorkSender(getWorkSender) + .build(); + } + + @Test + public void testDispatchLoopFailureThrowsException_appliance() { + RuntimeException expected = new RuntimeException("something bad happened"); + SingleSourceWorkerHarness.GetWorkSender getWorkSender = + SingleSourceWorkerHarness.GetWorkSender.forAppliance( + () -> { + throw expected; + }); + + SingleSourceWorkerHarness harness = createWorkerHarness(getWorkSender); + Throwable actual = assertThrows(AssertionError.class, harness::start); + assertThat(actual).hasCauseThat().hasCauseThat().isEqualTo(expected); + } + + @Test + public void testDispatchLoopFailureThrowsException_streamingEngine() { + RuntimeException expected = new RuntimeException("something bad happened"); + SingleSourceWorkerHarness.GetWorkSender getWorkSender = + SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine( + workItemReceiver -> { + throw expected; + }); + SingleSourceWorkerHarness harness = createWorkerHarness(getWorkSender); + Throwable actual = assertThrows(AssertionError.class, harness::start); + assertThat(actual).hasCauseThat().hasCauseThat().isEqualTo(expected); + } +} From a28bf79debb143331e27e2fc9acb55c53b0ec47b Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Fri, 25 Oct 2024 12:00:20 -0700 Subject: [PATCH 3/3] address PR feedback --- .../worker/StreamingDataflowWorker.java | 17 +- .../WorkerUncaughtExceptionHandler.java | 8 +- ...reamingEngineComputationConfigFetcher.java | 2 +- .../FanOutStreamingEngineWorkerHarness.java | 8 +- .../harness/SingleSourceWorkerHarness.java | 52 +++--- .../harness/StreamingWorkerStatusPages.java | 2 +- .../StreamingWorkerStatusReporter.java | 2 +- .../worker/util/TerminatingExecutors.java | 161 ++++++++++++++++-- .../client/AbstractWindmillStream.java | 2 +- .../StreamingApplianceWorkCommitter.java | 6 +- .../commits/StreamingEngineWorkCommitter.java | 4 +- .../work/budget/GetWorkBudgetRefresher.java | 6 +- .../work/refresh/ActiveWorkRefresher.java | 9 +- ...ingEngineComputationConfigFetcherTest.java | 11 +- .../SingleSourceWorkerHarnessTest.java | 69 ++++++-- .../work/refresh/ActiveWorkRefresherTest.java | 2 +- 16 files changed, 270 insertions(+), 91 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 524906023722..6ac3429a6846 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.runners.core.metrics.MetricsLogger; @@ -59,6 +60,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; @@ -174,7 +176,7 @@ private StreamingDataflowWorker( StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, GrpcWindmillStreamFactory windmillStreamFactory, - Function executorSupplier, + BiFunction activeWorkRefreshExecutorFn, ConcurrentMap stageInfoMap) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); @@ -284,7 +286,7 @@ private StreamingDataflowWorker( stuckCommitDurationMillis, computationStateCache::getAllPresentComputations, sampler, - executorSupplier.apply("RefreshWork"), + logger -> activeWorkRefreshExecutorFn.apply("RefreshWork", logger), getDataMetricTracker::trackHeartbeats); this.statusPages = @@ -346,10 +348,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) .build(); - Function executorSupplier = - threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()); + GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = createGrpcwindmillStreamFactoryBuilder(options, clientId); @@ -416,7 +415,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o streamingCounters, memoryMonitor, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), - executorSupplier, + (threadName, logger) -> + TerminatingExecutors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(threadName), logger), stageInfo); } @@ -594,7 +595,7 @@ static StreamingDataflowWorker forTesting( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build() : windmillStreamFactory.build(), - executorSupplier, + (threadName, ignored) -> executorSupplier.apply(threadName), stageInfo); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java index b926c8eb324e..b4ec170099d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java @@ -28,7 +28,8 @@ * This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and * exits the application with status code 1. */ -public class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { +public final class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { + @VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1; private final JvmRuntime runtime; private final Logger logger; @@ -36,8 +37,7 @@ public WorkerUncaughtExceptionHandler(Logger logger) { this(JvmRuntime.INSTANCE, logger); } - @VisibleForTesting - WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) { + public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) { this.runtime = runtime; this.logger = logger; } @@ -59,7 +59,7 @@ public void uncaughtException(Thread thread, Throwable e) { t.printStackTrace(originalStdErr); } } finally { - runtime.halt(1); + runtime.halt(JVM_TERMINATED_STATUS_CODE); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 5a37a242ccf0..45009ef01079 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -97,7 +97,7 @@ public static StreamingEngineComputationConfigFetcher create( globalConfigRefreshPeriodMillis, dataflowServiceClient, new StreamingGlobalConfigHandleImpl(), - TerminatingExecutors.newSingleThreadedScheduledExecutor( + TerminatingExecutors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME), LOG)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index f153e5fdba33..d4c94d8e7515 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -39,6 +38,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; @@ -95,8 +95,8 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker private final ThrottleTimer getWorkerMetadataThrottleTimer; private final Function workCommitterFactory; private final ThrottlingGetDataMetricTracker getDataMetricTracker; - private final ExecutorService windmillStreamManager; - private final ExecutorService workerMetadataConsumer; + private final TerminatingExecutorService windmillStreamManager; + private final TerminatingExecutorService workerMetadataConsumer; private final Object metadataLock = new Object(); /** Writes are guarded by synchronization, reads are lock free. */ @@ -137,7 +137,7 @@ private FanOutStreamingEngineWorkerHarness( TerminatingExecutors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME), LOG); this.workerMetadataConsumer = - TerminatingExecutors.newSingleThreadedExecutor( + TerminatingExecutors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME), LOG); this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index ac045dbb9990..7e79a28ebc25 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -23,9 +23,6 @@ import com.google.auto.value.AutoOneOf; import java.util.Collections; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -34,6 +31,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; @@ -43,6 +41,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -66,7 +65,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final StreamingWorkScheduler streamingWorkScheduler; private final Runnable waitForResources; private final Function> computationStateFetcher; - private final ExecutorService workProviderExecutor; + private final TerminatingExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; SingleSourceWorkerHarness( @@ -76,27 +75,28 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { StreamingWorkScheduler streamingWorkScheduler, Runnable waitForResources, Function> computationStateFetcher, - GetWorkSender getWorkSender) { + GetWorkSender getWorkSender, + TerminatingExecutorService workProviderExecutor) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; this.streamingWorkScheduler = streamingWorkScheduler; this.waitForResources = waitForResources; this.computationStateFetcher = computationStateFetcher; - this.workProviderExecutor = - TerminatingExecutors.newSingleThreadedExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setPriority(Thread.MIN_PRIORITY) - .setNameFormat("DispatchThread"), - LOG); - + this.workProviderExecutor = workProviderExecutor; this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; } public static SingleSourceWorkerHarness.Builder builder() { - return new AutoBuilder_SingleSourceWorkerHarness_Builder(); + return new AutoBuilder_SingleSourceWorkerHarness_Builder() + .setWorkProviderExecutor( + TerminatingExecutors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .setNameFormat("DispatchThread"), + LOG)); } @Override @@ -106,22 +106,11 @@ public void start() { "Multiple calls to {}.start() are not allowed.", getClass()); workCommitter.start(); - while (isRunning.get()) { - Future dispatchLoopFuture = - workProviderExecutor.submit( - () -> { - getDispatchLoop().run(); - LOG.info("Dispatch done"); - }); - - try { - dispatchLoopFuture.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - throw new AssertionError("GetWork failed with error.", e); - } - } + workProviderExecutor.execute( + () -> { + getDispatchLoop().run(); + LOG.info("Dispatch done"); + }); } private Runnable getDispatchLoop() { @@ -268,6 +257,9 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); + @VisibleForTesting + Builder setWorkProviderExecutor(TerminatingExecutorService workProviderExecutor); + SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 20f0d7b7ec2a..7b8050298982 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -116,7 +116,7 @@ public final class StreamingWorkerStatusPages { public static StreamingWorkerStatusPages.Builder builder() { return new AutoBuilder_StreamingWorkerStatusPages_Builder() .setStatusPageDumper( - TerminatingExecutors.newSingleThreadedScheduledExecutor( + TerminatingExecutors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR), LOG)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index a03abc4bdfea..3f1ad2f5f621 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -151,7 +151,7 @@ public static StreamingWorkerStatusReporter create( memoryMonitor, workExecutor, threadName -> - TerminatingExecutors.newSingleThreadedScheduledExecutor( + TerminatingExecutors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(threadName), LOG), windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java index e064df813392..c41925e1a2ed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java @@ -17,12 +17,22 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler; +import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; /** @@ -35,27 +45,37 @@ public final class TerminatingExecutors { private TerminatingExecutors() {} - public static ExecutorService newSingleThreadedExecutor( + public static TerminatingExecutorService newSingleThreadExecutor( ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { - return Executors.newSingleThreadExecutor( - terminatingThreadFactory(threadFactoryBuilder, logger)); + return new TerminatingExecutorService( + Executors.newSingleThreadExecutor(terminatingThreadFactory(threadFactoryBuilder, logger))); } - public static ScheduledExecutorService newSingleThreadedScheduledExecutor( + public static TerminatingScheduledExecutorService newSingleThreadScheduledExecutor( ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { - return Executors.newSingleThreadScheduledExecutor( - terminatingThreadFactory(threadFactoryBuilder, logger)); + return new TerminatingScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor( + terminatingThreadFactory(threadFactoryBuilder, logger))); } - public static ExecutorService newCachedThreadPool( + public static TerminatingExecutorService newCachedThreadPool( ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { - return Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, logger)); + return new TerminatingExecutorService( + Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, logger))); } - public static ExecutorService newFixedThreadPool( + public static TerminatingExecutorService newFixedThreadPool( int numThreads, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { - return Executors.newFixedThreadPool( - numThreads, terminatingThreadFactory(threadFactoryBuilder, logger)); + return new TerminatingExecutorService( + Executors.newFixedThreadPool( + numThreads, terminatingThreadFactory(threadFactoryBuilder, logger))); + } + + public static TerminatingExecutorService newSingleThreadedExecutorForTesting( + JvmRuntime jvmRuntime, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return new TerminatingExecutorService( + Executors.newSingleThreadExecutor( + terminatingThreadFactoryForTesting(jvmRuntime, threadFactoryBuilder, logger))); } private static ThreadFactory terminatingThreadFactory( @@ -64,4 +84,123 @@ private static ThreadFactory terminatingThreadFactory( .setUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(logger)) .build(); } + + private static ThreadFactory terminatingThreadFactoryForTesting( + JvmRuntime jvmRuntime, ThreadFactoryBuilder threadFactoryBuilder, Logger logger) { + return threadFactoryBuilder + .setUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(jvmRuntime, logger)) + .build(); + } + + /** Wrapper for {@link ScheduledExecutorService}(s) created by {@link TerminatingExecutors}. */ + public static final class TerminatingScheduledExecutorService extends TerminatingExecutorService + implements ScheduledExecutorService { + private final ScheduledExecutorService delegate; + + private TerminatingScheduledExecutorService(ScheduledExecutorService delegate) { + super(delegate); + this.delegate = delegate; + } + + @Override + public ScheduledFuture<@Nullable ?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture<@Nullable ?> scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<@Nullable ?> scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + } + + /** Wrapper for {@link ExecutorService}(s) created by {@link TerminatingExecutors}. */ + public static class TerminatingExecutorService implements ExecutorService { + private final ExecutorService delegate; + + private TerminatingExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future<@Nullable ?> submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 628a03428755..90a0e774817a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -98,7 +98,7 @@ protected AbstractWindmillStream( String backendWorkerToken) { this.backendWorkerToken = backendWorkerToken; this.executor = - TerminatingExecutors.newSingleThreadedExecutor( + TerminatingExecutors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat(createThreadName(debugStreamType, backendWorkerToken)), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java index a41222382cb8..accc1beaba9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java @@ -19,7 +19,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -29,6 +28,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.WorkId; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; import org.apache.beam.sdk.annotations.Internal; @@ -46,7 +46,7 @@ public final class StreamingApplianceWorkCommitter implements WorkCommitter { private final Consumer commitWorkFn; private final WeightedBoundedQueue commitQueue; - private final ExecutorService commitWorkers; + private final TerminatingExecutorService commitWorkers; private final AtomicLong activeCommitBytes; private final Consumer onCommitComplete; @@ -57,7 +57,7 @@ private StreamingApplianceWorkCommitter( WeightedBoundedQueue.create( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); this.commitWorkers = - TerminatingExecutors.newSingleThreadedExecutor( + TerminatingExecutors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setPriority(Thread.MAX_PRIORITY) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java index 7ec95b97eead..1a78ae2286ec 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.commits; import com.google.auto.value.AutoBuilder; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +28,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.sdk.annotations.Internal; @@ -51,7 +51,7 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { private final Supplier> commitWorkStreamFactory; private final WeightedBoundedQueue commitQueue; - private final ExecutorService commitSenders; + private final TerminatingExecutorService commitSenders; private final AtomicLong activeCommitBytes; private final Consumer onCommitComplete; private final int numCommitSenders; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java index 34c813ec4be1..11eb8475e1b0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java @@ -17,12 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.fn.stream.AdvancingPhaser; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -43,7 +43,7 @@ public final class GetWorkBudgetRefresher { private static final Logger LOG = LoggerFactory.getLogger(GetWorkBudgetRefresher.class); private final AdvancingPhaser budgetRefreshTrigger; - private final ExecutorService budgetRefreshExecutor; + private final TerminatingExecutorService budgetRefreshExecutor; private final Supplier isBudgetRefreshPaused; private final Runnable redistributeBudget; @@ -51,7 +51,7 @@ public GetWorkBudgetRefresher( Supplier isBudgetRefreshPaused, Runnable redistributeBudget) { this.budgetRefreshTrigger = new AdvancingPhaser(1); this.budgetRefreshExecutor = - TerminatingExecutors.newSingleThreadedExecutor( + TerminatingExecutors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setNameFormat(BUDGET_REFRESH_THREAD) .setUncaughtExceptionHandler( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java index e23c57bda574..f2cf08ba390c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork; import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors.TerminatingExecutorService; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; @@ -63,7 +64,7 @@ public final class ActiveWorkRefresher { private final int stuckCommitDurationMillis; private final HeartbeatTracker heartbeatTracker; private final ScheduledExecutorService activeWorkRefreshExecutor; - private final ExecutorService fanOutActiveWorkRefreshExecutor; + private final TerminatingExecutorService fanOutActiveWorkRefreshExecutor; public ActiveWorkRefresher( Supplier clock, @@ -71,14 +72,14 @@ public ActiveWorkRefresher( int stuckCommitDurationMillis, Supplier> computations, DataflowExecutionStateSampler sampler, - ScheduledExecutorService activeWorkRefreshExecutor, + Function activeWorkRefreshExecutor, HeartbeatTracker heartbeatTracker) { this.clock = clock; this.activeWorkRefreshPeriodMillis = activeWorkRefreshPeriodMillis; this.stuckCommitDurationMillis = stuckCommitDurationMillis; this.computations = computations; this.sampler = sampler; - this.activeWorkRefreshExecutor = activeWorkRefreshExecutor; + this.activeWorkRefreshExecutor = activeWorkRefreshExecutor.apply(LOG); this.heartbeatTracker = heartbeatTracker; this.fanOutActiveWorkRefreshExecutor = TerminatingExecutors.newCachedThreadPool( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 9fa17588c94d..40d213ac4e74 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -33,21 +33,25 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.WorkUnitClient; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.internal.stubbing.answers.Returns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class StreamingEngineComputationConfigFetcherTest { - + private static final Logger LOG = + LoggerFactory.getLogger(StreamingEngineComputationConfigFetcherTest.class); private final WorkUnitClient mockDataflowServiceClient = mock(WorkUnitClient.class, new Returns(Optional.empty())); private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher; @@ -61,7 +65,8 @@ private StreamingEngineComputationConfigFetcher createConfigFetcher( globalConfigRefreshPeriod, mockDataflowServiceClient, globalConfigHandle, - ignored -> Executors.newSingleThreadScheduledExecutor()); + ignored -> + TerminatingExecutors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder(), LOG)); } @After diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index 57e2d1e14e32..0b8aa705e703 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -18,22 +18,31 @@ package org.apache.beam.runners.dataflow.worker.streaming.harness; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.util.TerminatingExecutors; +import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class SingleSourceWorkerHarnessTest { + private static final Logger LOG = LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class); private final WorkCommitter workCommitter = mock(WorkCommitter.class); private final GetDataClient getDataClient = mock(GetDataClient.class); private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class); @@ -43,7 +52,7 @@ public class SingleSourceWorkerHarnessTest { private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class); private SingleSourceWorkerHarness createWorkerHarness( - SingleSourceWorkerHarness.GetWorkSender getWorkSender) { + SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime runtime) { return SingleSourceWorkerHarness.builder() .setWorkCommitter(workCommitter) .setGetDataClient(getDataClient) @@ -52,33 +61,65 @@ private SingleSourceWorkerHarness createWorkerHarness( .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) .setGetWorkSender(getWorkSender) + .setWorkProviderExecutor( + TerminatingExecutors.newSingleThreadedExecutorForTesting( + runtime, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .setNameFormat("DispatchThread"), + LOG)) .build(); } @Test - public void testDispatchLoopFailureThrowsException_appliance() { - RuntimeException expected = new RuntimeException("something bad happened"); + public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() { SingleSourceWorkerHarness.GetWorkSender getWorkSender = SingleSourceWorkerHarness.GetWorkSender.forAppliance( () -> { - throw expected; + throw new RuntimeException("something bad happened"); }); - SingleSourceWorkerHarness harness = createWorkerHarness(getWorkSender); - Throwable actual = assertThrows(AssertionError.class, harness::start); - assertThat(actual).hasCauseThat().hasCauseThat().isEqualTo(expected); + FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime(); + createWorkerHarness(getWorkSender, fakeJvmRuntime).start(); + assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS)); + fakeJvmRuntime.assertJvmTerminated(); } @Test - public void testDispatchLoopFailureThrowsException_streamingEngine() { - RuntimeException expected = new RuntimeException("something bad happened"); + public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() { SingleSourceWorkerHarness.GetWorkSender getWorkSender = SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine( workItemReceiver -> { - throw expected; + throw new RuntimeException("something bad happened"); }); - SingleSourceWorkerHarness harness = createWorkerHarness(getWorkSender); - Throwable actual = assertThrows(AssertionError.class, harness::start); - assertThat(actual).hasCauseThat().hasCauseThat().isEqualTo(expected); + + FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime(); + createWorkerHarness(getWorkSender, fakeJvmRuntime).start(); + assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS)); + fakeJvmRuntime.assertJvmTerminated(); + } + + private static class FakeJvmRuntime implements JvmRuntime { + private final CountDownLatch haltedLatch = new CountDownLatch(1); + private volatile int exitStatus = 0; + + @Override + public void halt(int status) { + exitStatus = status; + haltedLatch.countDown(); + } + + public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) { + try { + return haltedLatch.await(timeout, unit); + } catch (InterruptedException e) { + return false; + } + } + + private void assertJvmTerminated() { + assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java index 5efb2421fe60..3d1c8afec119 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java @@ -107,7 +107,7 @@ private ActiveWorkRefresher createActiveWorkRefresher( stuckCommitDurationMillis, computations, DataflowExecutionStateSampler.instance(), - Executors.newSingleThreadScheduledExecutor(), + ignored -> Executors.newSingleThreadScheduledExecutor(), heartbeatTracker); }