diff --git a/java/algorithm/src/test/java/org/apache/arrow/algorithm/search/TestParallelSearcher.java b/java/algorithm/src/test/java/org/apache/arrow/algorithm/search/TestParallelSearcher.java index 24a9a6ed694c7..9dbe1d195cb61 100644 --- a/java/algorithm/src/test/java/org/apache/arrow/algorithm/search/TestParallelSearcher.java +++ b/java/algorithm/src/test/java/org/apache/arrow/algorithm/search/TestParallelSearcher.java @@ -79,7 +79,7 @@ public void shutdown() { @MethodSource("getComparatorName") public void testParallelIntSearch(ComparatorType comparatorType, int threadCount) throws ExecutionException, InterruptedException { - threadPool = Executors.newFixedThreadPool(threadCount); + threadPool = Executors.newVirtualThreadPerTaskExecutor(); try (IntVector targetVector = new IntVector("targetVector", allocator); IntVector keyVector = new IntVector("keyVector", allocator)) { targetVector.allocateNew(VECTOR_LENGTH); @@ -118,7 +118,7 @@ public void testParallelIntSearch(ComparatorType comparatorType, int threadCount @MethodSource("getComparatorName") public void testParallelStringSearch(ComparatorType comparatorType, int threadCount) throws ExecutionException, InterruptedException { - threadPool = Executors.newFixedThreadPool(threadCount); + threadPool = Executors.newVirtualThreadPerTaskExecutor(); try (VarCharVector targetVector = new VarCharVector("targetVector", allocator); VarCharVector keyVector = new VarCharVector("keyVector", allocator)) { targetVector.allocateNew(VECTOR_LENGTH); diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java index ac761457f57fd..cbcd03a010e06 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java @@ -16,7 +16,6 @@ */ package org.apache.arrow.flight; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.Server; import io.grpc.ServerInterceptors; import io.grpc.netty.GrpcSslContexts; @@ -312,11 +311,8 @@ public FlightServer build() { grpcExecutor = null; } else { exec = - Executors.newCachedThreadPool( - // Name threads for better debuggability - new ThreadFactoryBuilder() - .setNameFormat("flight-server-default-executor-%d") - .build()); + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("flight-server-default-executor-", 0).factory()); grpcExecutor = exec; } diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java index b82e9ee26b15d..b9fbad9a7d379 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java @@ -52,7 +52,7 @@ public void setup() throws IOException { allocator = new RootAllocator(Integer.MAX_VALUE); final NoOpFlightProducer producer = new NoOpFlightProducer(); final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP; - final ExecutorService exec = Executors.newCachedThreadPool(); + final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); final BindableService flightBindingService = FlightGrpcUtils.createFlightService(allocator, producer, authHandler, exec); diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/perf/TestPerf.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/perf/TestPerf.java index 338771286d462..2d862d6dee94f 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/perf/TestPerf.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/perf/TestPerf.java @@ -85,7 +85,7 @@ public static void main(String[] args) throws Exception { public void throughput() throws Exception { final int numRuns = 10; ListeningExecutorService pool = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)); + MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor()); double[] throughPuts = new double[numRuns]; for (int i = 0; i < numRuns; i++) { diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index 8419432c66227..3ea65debf16fe 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.arrow.flight.integration.tests; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -103,10 +102,8 @@ void testScenario(String scenarioName) throws Exception { TestBufferAllocationListener listener = new TestBufferAllocationListener(); try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) { final ExecutorService exec = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("integration-test-flight-server-executor-%d") - .build()); + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("integration-test-flight-server-executor-", 0).factory()); final FlightServer.Builder builder = FlightServer.builder() .executor(exec) diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java index c1b1c8f8e6add..a0e608d5b61ef 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java @@ -158,6 +158,7 @@ ArrowFlightSqlClientHandler getClientHandler() { synchronized ExecutorService getExecutorService() { return executorService = executorService == null + // Refactoring this would require defining a new custom thread factory class ? Executors.newFixedThreadPool( config.threadPoolSize(), new DefaultThreadFactory(getClass().getSimpleName())) : executorService; diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java index f9d0551a3aa22..56d3b5c076903 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java @@ -165,7 +165,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { public static final String DB_NAME = "derbyDB"; private final String databaseUri; // ARROW-15315: Use ExecutorService to simulate an async scenario - private final ExecutorService executorService = Executors.newFixedThreadPool(10); + private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); private final Location location; protected final PoolingDataSource dataSource; protected final BufferAllocator rootAllocator = new RootAllocator(); diff --git a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java index 0d86bd9e72923..383d4ebd55bf7 100644 --- a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java +++ b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java @@ -126,7 +126,7 @@ private void testMakeProjectorParallel(ConfigurationBuilder.ConfigOptions config // build projectors in parallel choosing schema at random // this should hit the same cache entry thus exposing // any threading issues. - ExecutorService executors = Executors.newFixedThreadPool(16); + ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor(); IntStream.range(0, 1000) .forEach( @@ -348,7 +348,7 @@ public void testDivZeroParallel() throws GandivaException, InterruptedException ExpressionTree expr = TreeBuilder.makeExpression("divide", args, c); List exprs = Lists.newArrayList(expr); - ExecutorService executors = Executors.newFixedThreadPool(16); + ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor(); AtomicInteger errorCount = new AtomicInteger(0); AtomicInteger errorCountExp = new AtomicInteger(0); diff --git a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAccountant.java b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAccountant.java index 07663b4ab67ed..9ffaf03c31240 100644 --- a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAccountant.java +++ b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/TestAccountant.java @@ -47,20 +47,18 @@ public void multiThread() throws InterruptedException { for (int i = 0; i < numberOfThreads; i++) { Thread t = - new Thread() { - - @Override - public void run() { - try { - for (int i = 0; i < loops; i++) { - ensureAccurateReservations(parent); - } - } catch (Exception ex) { - ex.printStackTrace(); - fail(ex.getMessage()); - } - } - }; + Thread.ofVirtual() + .unstarted( + () -> { + try { + for (int j = 0; j < loops; j++) { + ensureAccurateReservations(parent); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + }); threads[i] = t; t.start(); } diff --git a/java/performance/src/main/java/org/apache/arrow/algorithm/search/ParallelSearcherBenchmarks.java b/java/performance/src/main/java/org/apache/arrow/algorithm/search/ParallelSearcherBenchmarks.java index 34a09ee9b1efe..e40cd24210e7d 100644 --- a/java/performance/src/main/java/org/apache/arrow/algorithm/search/ParallelSearcherBenchmarks.java +++ b/java/performance/src/main/java/org/apache/arrow/algorithm/search/ParallelSearcherBenchmarks.java @@ -67,7 +67,7 @@ public void prepare() { targetVector.allocateNew(VECTOR_LENGTH); keyVector = new IntVector("key vector", allocator); keyVector.allocateNew(1); - threadPool = Executors.newFixedThreadPool(numThreads); + threadPool = Executors.newVirtualThreadPerTaskExecutor(); for (int i = 0; i < VECTOR_LENGTH; i++) { targetVector.set(i, i);