Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiber refactorings #3

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void shutdown() {
@MethodSource("getComparatorName")
public void testParallelIntSearch(ComparatorType comparatorType, int threadCount)
throws ExecutionException, InterruptedException {
threadPool = Executors.newFixedThreadPool(threadCount);
threadPool = Executors.newVirtualThreadPerTaskExecutor();
try (IntVector targetVector = new IntVector("targetVector", allocator);
IntVector keyVector = new IntVector("keyVector", allocator)) {
targetVector.allocateNew(VECTOR_LENGTH);
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testParallelIntSearch(ComparatorType comparatorType, int threadCount
@MethodSource("getComparatorName")
public void testParallelStringSearch(ComparatorType comparatorType, int threadCount)
throws ExecutionException, InterruptedException {
threadPool = Executors.newFixedThreadPool(threadCount);
threadPool = Executors.newVirtualThreadPerTaskExecutor();
try (VarCharVector targetVector = new VarCharVector("targetVector", allocator);
VarCharVector keyVector = new VarCharVector("keyVector", allocator)) {
targetVector.allocateNew(VECTOR_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.arrow.flight;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.netty.GrpcSslContexts;
Expand Down Expand Up @@ -312,11 +311,8 @@ public FlightServer build() {
grpcExecutor = null;
} else {
exec =
Executors.newCachedThreadPool(
// Name threads for better debuggability
new ThreadFactoryBuilder()
.setNameFormat("flight-server-default-executor-%d")
.build());
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("flight-server-default-executor-", 0).factory());
grpcExecutor = exec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void setup() throws IOException {
allocator = new RootAllocator(Integer.MAX_VALUE);
final NoOpFlightProducer producer = new NoOpFlightProducer();
final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP;
final ExecutorService exec = Executors.newCachedThreadPool();
final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
final BindableService flightBindingService =
FlightGrpcUtils.createFlightService(allocator, producer, authHandler, exec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void main(String[] args) throws Exception {
public void throughput() throws Exception {
final int numRuns = 10;
ListeningExecutorService pool =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor());
double[] throughPuts = new double[numRuns];

for (int i = 0; i < numRuns; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.arrow.flight.integration.tests;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,10 +102,8 @@ void testScenario(String scenarioName) throws Exception {
TestBufferAllocationListener listener = new TestBufferAllocationListener();
try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) {
final ExecutorService exec =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("integration-test-flight-server-executor-%d")
.build());
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("integration-test-flight-server-executor-", 0).factory());
final FlightServer.Builder builder =
FlightServer.builder()
.executor(exec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ ArrowFlightSqlClientHandler getClientHandler() {
synchronized ExecutorService getExecutorService() {
return executorService =
executorService == null
// Refactoring this would require defining a new custom thread factory class
? Executors.newFixedThreadPool(
config.threadPoolSize(), new DefaultThreadFactory(getClass().getSimpleName()))
: executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
public static final String DB_NAME = "derbyDB";
private final String databaseUri;
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
private final Location location;
protected final PoolingDataSource<PoolableConnection> dataSource;
protected final BufferAllocator rootAllocator = new RootAllocator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void testMakeProjectorParallel(ConfigurationBuilder.ConfigOptions config
// build projectors in parallel choosing schema at random
// this should hit the same cache entry thus exposing
// any threading issues.
ExecutorService executors = Executors.newFixedThreadPool(16);
ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor();

IntStream.range(0, 1000)
.forEach(
Expand Down Expand Up @@ -348,7 +348,7 @@ public void testDivZeroParallel() throws GandivaException, InterruptedException
ExpressionTree expr = TreeBuilder.makeExpression("divide", args, c);
List<ExpressionTree> exprs = Lists.newArrayList(expr);

ExecutorService executors = Executors.newFixedThreadPool(16);
ExecutorService executors = Executors.newVirtualThreadPerTaskExecutor();

AtomicInteger errorCount = new AtomicInteger(0);
AtomicInteger errorCountExp = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@ public void multiThread() throws InterruptedException {

for (int i = 0; i < numberOfThreads; i++) {
Thread t =
new Thread() {

@Override
public void run() {
try {
for (int i = 0; i < loops; i++) {
ensureAccurateReservations(parent);
}
} catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
}
};
Thread.ofVirtual()
.unstarted(
() -> {
try {
for (int j = 0; j < loops; j++) {
ensureAccurateReservations(parent);
}
} catch (Exception ex) {
ex.printStackTrace();
fail(ex.getMessage());
}
});
threads[i] = t;
t.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void prepare() {
targetVector.allocateNew(VECTOR_LENGTH);
keyVector = new IntVector("key vector", allocator);
keyVector.allocateNew(1);
threadPool = Executors.newFixedThreadPool(numThreads);
threadPool = Executors.newVirtualThreadPerTaskExecutor();

for (int i = 0; i < VECTOR_LENGTH; i++) {
targetVector.set(i, i);
Expand Down
Loading