Skip to content

Commit

Permalink
fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always b…
Browse files Browse the repository at this point in the history
…e before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null
  • Loading branch information
okg-cxf committed Aug 29, 2024
1 parent a80453e commit 5ad30d0
Showing 1 changed file with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@
*/
package io.lettuce.core.protocol;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.ContextualChannel;
Expand All @@ -55,11 +37,29 @@
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

/**
* Default {@link Endpoint} implementation.
*
Expand Down Expand Up @@ -161,7 +161,7 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {

private final boolean canFire;

private volatile EventLoop lastEventLoop = null;
private volatile EventExecutor lastEventExecutor;

private volatile Throwable connectionError;

Expand Down Expand Up @@ -202,6 +202,7 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou
this.callbackOnClose = callbackOnClose;
this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount();
this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize();
this.lastEventExecutor = clientResources.eventExecutorGroup().next();
}

@Override
Expand Down Expand Up @@ -322,7 +323,7 @@ public void notifyChannelActive(Channel channel) {
return;
}

this.lastEventLoop = channel.eventLoop();
this.lastEventExecutor = channel.eventLoop();
this.connectionError = null;
this.inProtectMode = false;
this.logPrefix = null;
Expand Down Expand Up @@ -585,7 +586,7 @@ private void resetInternal() {
if (chan.context.initialState.isConnected()) {
chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
}
LettuceAssert.assertState(lastEventLoop.inEventLoop(), "must be called in lastEventLoop thread");
LettuceAssert.assertState(lastEventExecutor.inEventLoop(), "must be called in lastEventLoop thread");
cancelCommands("resetInternal");
}

Expand Down Expand Up @@ -727,22 +728,23 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
}

if (o instanceof RedisCommand<?, ?, ?>) {
autoBatchFlushEndPointContext.add(1);
RedisCommand<?, ?, ?> cmd = (RedisCommand<?, ?, ?>) o;
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
count++;
} else {
@SuppressWarnings("unchecked")
Collection<? extends RedisCommand<?, ?, ?>> commands = (Collection<? extends RedisCommand<?, ?, ?>>) o;
final int commandsSize = commands.size(); // size() could be expensive for some collections so cache it!
autoBatchFlushEndPointContext.add(commandsSize);
for (RedisCommand<?, ?, ?> cmd : commands) {
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
}
count += commands.size();
count += commandsSize;
}
}

if (count > 0) {
autoBatchFlushEndPointContext.add(count);

channelFlush(chan);
if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) {
// Wait for onConnectionClose event()
Expand All @@ -755,7 +757,7 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
private void trySetEndpointQuiescence(ContextualChannel chan) {
final EventLoop eventLoop = chan.eventLoop();
LettuceAssert.isTrue(eventLoop.inEventLoop(), "unexpected: not in event loop");
LettuceAssert.isTrue(eventLoop == lastEventLoop, "unexpected: lastEventLoop not match");
LettuceAssert.isTrue(eventLoop == lastEventExecutor, "unexpected: lastEventLoop not match");

final ConnectionContext connectionContext = chan.context;
final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus();
Expand Down Expand Up @@ -1019,14 +1021,14 @@ private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> comman
* is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED)
*/
private void syncAfterTerminated(Runnable runnable) {
final EventLoop localLastEventLoop = lastEventLoop;
LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated");
if (localLastEventLoop.inEventLoop()) {
final EventExecutor localLastEventExecutor = lastEventExecutor;
if (localLastEventExecutor.inEventLoop()) {
runnable.run();
} else {
localLastEventLoop.execute(() -> {
localLastEventExecutor.execute(() -> {
runnable.run();
LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated");
LettuceAssert.isTrue(lastEventExecutor == localLastEventExecutor,
"lastEventLoop must not be changed after terminated");
});
}
}
Expand Down

0 comments on commit 5ad30d0

Please sign in to comment.