Skip to content

Commit

Permalink
Merge pull request #1062 from Ladicek/virtual-threads-queueing
Browse files Browse the repository at this point in the history
add FaultTolerance.BulkheadBuilder.enableVirtualThreadsQueueing()
  • Loading branch information
Ladicek authored Oct 22, 2024
2 parents f8cc830 + 46ded2b commit 8bbb6a2
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,18 @@ interface BulkheadBuilder<T, R> {
*/
BulkheadBuilder<T, R> queueSize(int value);

/**
* Enables bulkhead queueing for synchronous actions executed on virtual threads.
* This makes it possible to call {@link #queueSize(int)} even if this builder does
* not configure fault tolerance for asynchronous actions.
* <p>
* If you use this method, you <strong>have to ensure</strong> that the guard
* is executed on a virtual thread.
*
* @return this bulkhead builder
*/
BulkheadBuilder<T, R> enableVirtualThreadsQueueing();

/**
* Sets a callback that will be invoked when this bulkhead accepts an invocation.
* In case of asynchronous actions, accepting into bulkhead doesn't mean the action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.smallrye.faulttolerance.core.async.RememberEventLoop;
import io.smallrye.faulttolerance.core.bulkhead.CompletionStageThreadPoolBulkhead;
import io.smallrye.faulttolerance.core.bulkhead.SemaphoreBulkhead;
import io.smallrye.faulttolerance.core.bulkhead.SemaphoreThreadPoolBulkhead;
import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreakerEvents;
import io.smallrye.faulttolerance.core.circuit.breaker.CompletionStageCircuitBreaker;
Expand Down Expand Up @@ -313,7 +314,10 @@ private FaultToleranceStrategy<T> buildSyncStrategy(BuilderLazyDependencies lazy
FaultToleranceStrategy<T> result = invocation();

if (lazyDependencies.ftEnabled() && bulkheadBuilder != null) {
result = new SemaphoreBulkhead<>(result, description, bulkheadBuilder.limit);
result = bulkheadBuilder.queueingEnabled
? new SemaphoreThreadPoolBulkhead<>(result, description, bulkheadBuilder.limit,
bulkheadBuilder.queueSize)
: new SemaphoreBulkhead<>(result, description, bulkheadBuilder.limit);
}

if (lazyDependencies.ftEnabled() && timeoutBuilder != null) {
Expand Down Expand Up @@ -523,13 +527,15 @@ static class BulkheadBuilderImpl<T, R> implements BulkheadBuilder<T, R> {

private int limit = 10;
private int queueSize = 10;
private boolean queueingEnabled;

private Runnable onAccepted;
private Runnable onRejected;
private Runnable onFinished;

BulkheadBuilderImpl(BuilderImpl<T, R> parent) {
this.parent = parent;
this.queueingEnabled = parent.isAsync;
}

@Override
Expand All @@ -540,14 +546,20 @@ public BulkheadBuilder<T, R> limit(int value) {

@Override
public BulkheadBuilder<T, R> queueSize(int value) {
if (!parent.isAsync) {
if (!queueingEnabled) {
throw new IllegalStateException("Bulkhead queue size may only be set for asynchronous invocations");
}

this.queueSize = Preconditions.check(value, value >= 1, "Queue size must be >= 1");
return this;
}

@Override
public BulkheadBuilder<T, R> enableVirtualThreadsQueueing() {
this.queueingEnabled = true;
return this;
}

@Override
public BulkheadBuilder<T, R> onAccepted(Runnable callback) {
this.onAccepted = Preconditions.checkNotNull(callback, "Accepted callback must be set");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
package io.smallrye.faulttolerance.core.bulkhead;

import static io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger.LOG;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.async.FutureCancellationEvent;

/**
* Thread pool style bulkhead for {@code Future} asynchronous executions.
Expand All @@ -21,92 +13,8 @@
* but is the easiest way to implement them for {@code Future}. We do it properly
* for {@code CompletionStage}, which is much more useful anyway.
*/
public class FutureThreadPoolBulkhead<V> extends BulkheadBase<Future<V>> {
private final int queueSize;
private final Semaphore capacitySemaphore;
private final Semaphore workSemaphore;

public class FutureThreadPoolBulkhead<V> extends SemaphoreThreadPoolBulkhead<Future<V>> {
public FutureThreadPoolBulkhead(FaultToleranceStrategy<Future<V>> delegate, String description, int size, int queueSize) {
super(description, delegate);
this.queueSize = queueSize;
this.capacitySemaphore = new Semaphore(size + queueSize, true);
this.workSemaphore = new Semaphore(size, true);
}

@Override
public Future<V> apply(InvocationContext<Future<V>> ctx) throws Exception {
LOG.trace("ThreadPoolBulkhead started");
try {
return doApply(ctx);
} finally {
LOG.trace("ThreadPoolBulkhead finished");
}
}

private Future<V> doApply(InvocationContext<Future<V>> ctx) throws Exception {
if (capacitySemaphore.tryAcquire()) {
LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);

AtomicBoolean cancellationInvalid = new AtomicBoolean(false);
AtomicBoolean cancelled = new AtomicBoolean(false);
AtomicReference<Thread> executingThread = new AtomicReference<>(Thread.currentThread());
ctx.registerEventHandler(FutureCancellationEvent.class, event -> {
if (cancellationInvalid.get()) {
// in case of retries, multiple handlers of FutureCancellationEvent may be registered,
// need to make sure that a handler belonging to an older bulkhead task doesn't do anything
return;
}

if (LOG.isTraceEnabled()) {
LOG.tracef("Cancelling bulkhead task,%s interrupting executing thread",
(event.interruptible ? "" : " NOT"));
}
cancelled.set(true);
if (event.interruptible) {
executingThread.get().interrupt();
}
});

try {
workSemaphore.acquire();
LOG.trace("Work semaphore acquired, running task");
} catch (InterruptedException e) {
cancellationInvalid.set(true);

capacitySemaphore.release();
LOG.trace("Capacity semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
throw new CancellationException();
}

ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
if (cancelled.get()) {
throw new CancellationException();
}
return delegate.apply(ctx);
} finally {
cancellationInvalid.set(true);

workSemaphore.release();
LOG.trace("Work semaphore released, task finished");
capacitySemaphore.release();
LOG.trace("Capacity semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
}
} else {
LOG.debugOrTrace(description + " invocation prevented by bulkhead",
"Capacity semaphore not acquired, rejecting task from bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
throw bulkheadRejected();
}
}

// only for tests
int getQueueSize() {
return Math.max(0, queueSize - capacitySemaphore.availablePermits());
super(delegate, description, size, queueSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.smallrye.faulttolerance.core.bulkhead;

import static io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger.LOG;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.async.FutureCancellationEvent;

/**
* Assumes that this bulkhead is already executed on an extra thread.
* Under that assumption, we don't have to submit tasks to another executor
* or use an actual queue. We just limit the task execution by semaphores.
*/
public class SemaphoreThreadPoolBulkhead<V> extends BulkheadBase<V> {
private final int queueSize;
private final Semaphore capacitySemaphore;
private final Semaphore workSemaphore;

public SemaphoreThreadPoolBulkhead(FaultToleranceStrategy<V> delegate, String description, int size, int queueSize) {
super(description, delegate);
this.queueSize = queueSize;
this.capacitySemaphore = new Semaphore(size + queueSize, true);
this.workSemaphore = new Semaphore(size, true);
}

@Override
public V apply(InvocationContext<V> ctx) throws Exception {
LOG.trace("ThreadPoolBulkhead started");
try {
return doApply(ctx);
} finally {
LOG.trace("ThreadPoolBulkhead finished");
}
}

private V doApply(InvocationContext<V> ctx) throws Exception {
if (capacitySemaphore.tryAcquire()) {
LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);

AtomicBoolean cancellationInvalid = new AtomicBoolean(false);
AtomicBoolean cancelled = new AtomicBoolean(false);
AtomicReference<Thread> executingThread = new AtomicReference<>(Thread.currentThread());
ctx.registerEventHandler(FutureCancellationEvent.class, event -> {
if (cancellationInvalid.get()) {
// in case of retries, multiple handlers of FutureCancellationEvent may be registered,
// need to make sure that a handler belonging to an older bulkhead task doesn't do anything
return;
}

if (LOG.isTraceEnabled()) {
LOG.tracef("Cancelling bulkhead task,%s interrupting executing thread",
(event.interruptible ? "" : " NOT"));
}
cancelled.set(true);
if (event.interruptible) {
executingThread.get().interrupt();
}
});

try {
workSemaphore.acquire();
LOG.trace("Work semaphore acquired, running task");
} catch (InterruptedException e) {
cancellationInvalid.set(true);

capacitySemaphore.release();
LOG.trace("Capacity semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
throw new CancellationException();
}

ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
if (cancelled.get()) {
throw new CancellationException();
}
return delegate.apply(ctx);
} finally {
cancellationInvalid.set(true);

workSemaphore.release();
LOG.trace("Work semaphore released, task finished");
capacitySemaphore.release();
LOG.trace("Capacity semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
}
} else {
LOG.debugOrTrace(description + " invocation prevented by bulkhead",
"Capacity semaphore not acquired, rejecting task from bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
throw bulkheadRejected();
}
}

// only for tests
int getQueueSize() {
return Math.max(0, queueSize - capacitySemaphore.availablePermits());
}
}

0 comments on commit 8bbb6a2

Please sign in to comment.