Skip to content

Commit

Permalink
fix: ensure hasOngoingSendLoop.exitSafe()
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 7, 2024
1 parent 9f2edeb commit ed60773
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,11 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread");

// Schedule directly
scheduleSendJobInEventLoopIfNeeded(chan);
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
scheduleSendJobInEventLoopIfNeeded(chan);
}
// Otherwise:
// someone will do the job for us
}

private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
Expand All @@ -618,11 +622,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
}

final EventLoop eventLoop = chan.eventLoop();
if (eventLoop.inEventLoop()) {
scheduleSendJobInEventLoopIfNeeded(chan);
return;
}

if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
Expand All @@ -644,8 +643,11 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {

private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) {
// Guarantee only 1 send loop.
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterUnsafe()) {
BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop;
if (hasOngoingSendLoop.tryEnterUnsafe()) {
loopSend(chan);
} else {
hasOngoingSendLoop.exitSafe();
}
}

Expand All @@ -657,11 +659,11 @@ private void loopSend(final ContextualChannel chan) {
}

LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null");
loopSend0(batchFlushEndPointContext, chan, writeSpinCount, true);
loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false);
}

private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan,
int remainingSpinnCount, final boolean firstCall) {
int remainingSpinnCount, final boolean exitedSafe) {
do {
final int count = pollBatch(batchFlushEndPointContext, chan);
if (count < 0) {
Expand All @@ -686,14 +688,16 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
return;
}

if (firstCall) {
if (exitedSafe) {
// The send loop will be triggered later when a new task is added,
batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe();
} else {
// // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
batchFlushEndPointContext.hasOngoingSendLoop.exitSafe();
// // Guarantee thread-safety: no dangling tasks in the queue.
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false);
} else {
// The send loop will be triggered later when a new task is added,
batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe();
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true);
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
// TimeUnit.NANOSECONDS);
}
}

Expand Down

0 comments on commit ed60773

Please sign in to comment.