From 7654fe2bf6530b15a81fd8e7bd51592311ec84fd Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 16 Aug 2024 01:56:22 +0800 Subject: [PATCH 1/5] fix: safety iterating #2935 --- .../lettuce/core/internal/AsyncConnectionProvider.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java index 335450d5eb..163299711e 100644 --- a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java +++ b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java @@ -142,11 +142,10 @@ public CompletableFuture close() { List> futures = new ArrayList<>(); - forEach((connectionKey, closeable) -> { - - futures.add(closeable.closeAsync()); - connections.remove(connectionKey); - }); + for (K k : connections.keySet()) { + Sync remove = connections.remove(k); + remove.doWithConnection(e -> futures.add(e.closeAsync())); + } return Futures.allOf(futures); } From 2ae4fcca33ecbe057f12effff33dd0e2365fc6a5 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 16 Aug 2024 02:06:06 +0800 Subject: [PATCH 2/5] null check --- .../io/lettuce/core/internal/AsyncConnectionProvider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java index 163299711e..4614405ade 100644 --- a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java +++ b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java @@ -144,7 +144,9 @@ public CompletableFuture close() { for (K k : connections.keySet()) { Sync remove = connections.remove(k); - remove.doWithConnection(e -> futures.add(e.closeAsync())); + if (remove != null) { + remove.doWithConnection(e -> futures.add(e.closeAsync())); + } } return Futures.allOf(futures); From cae9d88bacf4c57cdfbd75d51e098a36b0164b86 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Sat, 31 Aug 2024 05:51:21 +0800 Subject: [PATCH 3/5] found the reason why and added test case --- .../internal/AsyncConnectionProvider.java | 5 +- .../internal/AsyncConnectionProviderTest.java | 99 +++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/lettuce/core/internal/AsyncConnectionProviderTest.java diff --git a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java index 4614405ade..02e5b36133 100644 --- a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java +++ b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java @@ -145,7 +145,9 @@ public CompletableFuture close() { for (K k : connections.keySet()) { Sync remove = connections.remove(k); if (remove != null) { - remove.doWithConnection(e -> futures.add(e.closeAsync())); + CompletionStage closeFuture = remove.future.thenAccept(AsyncCloseable::closeAsync); + // always synchronously add the future, made it immutably in Futures.allOf() + futures.add(closeFuture.toCompletableFuture()); } } @@ -218,7 +220,6 @@ static class Sync> { @SuppressWarnings("unchecked") public Sync(K key, F future) { - this.key = key; this.future = (F) future.whenComplete((connection, throwable) -> { diff --git a/src/test/java/io/lettuce/core/internal/AsyncConnectionProviderTest.java b/src/test/java/io/lettuce/core/internal/AsyncConnectionProviderTest.java new file mode 100644 index 0000000000..80e7f9ee10 --- /dev/null +++ b/src/test/java/io/lettuce/core/internal/AsyncConnectionProviderTest.java @@ -0,0 +1,99 @@ +package io.lettuce.core.internal; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class AsyncConnectionProviderTest { + + @Test + public void testFutureListLength() throws InterruptedException, ExecutionException, TimeoutException { + + CountDownLatch slowCreate = new CountDownLatch(1); + CountDownLatch slowShutdown = new CountDownLatch(1); + + // create a provider with a slow connection creation + AsyncConnectionProvider> provider = new AsyncConnectionProvider<>( + key -> { + return countDownFuture(slowCreate, new io.lettuce.core.api.AsyncCloseable() { + + @Override + public CompletableFuture closeAsync() { + return CompletableFuture.completedFuture(null); + } + + }); + }); + + // add slow shutdown connection first + SlowCloseFuture slowCloseFuture = new SlowCloseFuture(slowShutdown); + provider.register("slowShutdown", new io.lettuce.core.api.AsyncCloseable() { + + @Override + public CompletableFuture closeAsync() { + return slowCloseFuture; + } + + }); + + // add slow creation connection + CompletableFuture createFuture = provider.getConnection("slowCreate"); + + // close the connection. + CompletableFuture closeFuture = provider.close(); + + // the connection has not been created yet, so the close futures array always has 1 element + // we block the iterator on the slowCloseFuture + // then we count down the creation, the close future will be added to the list + slowCreate.countDown(); + + // the close future is added to the list, we unblock the iterator + slowShutdown.countDown(); + + // assert close future is completed, and no exceptions are thrown + closeFuture.get(10, TimeUnit.SECONDS); + Assert.assertTrue(createFuture.isDone()); + } + + private CompletableFuture countDownFuture(CountDownLatch countDownLatch, T value) { + return CompletableFuture.runAsync(() -> { + try { + countDownLatch.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).thenApply(v -> value); + } + + static class SlowCloseFuture extends CompletableFuture { + + private final CountDownLatch countDownLatch; + + SlowCloseFuture(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public CompletableFuture toCompletableFuture() { + // we block the iterator on here + try { + countDownLatch.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return super.toCompletableFuture(); + } + + @Override + public Void get() { + return null; + } + + } + +} From 58249e6ecd4333b110c0b19216b24382c2ef6535 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Mon, 2 Sep 2024 23:35:40 +0800 Subject: [PATCH 4/5] catch connection timeout --- .../java/io/lettuce/core/internal/AsyncConnectionProvider.java | 3 +-- .../core/cluster/AsyncConnectionProviderIntegrationTests.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java index 02e5b36133..0591186598 100644 --- a/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java +++ b/src/main/java/io/lettuce/core/internal/AsyncConnectionProvider.java @@ -163,9 +163,8 @@ public void close(K key) { LettuceAssert.notNull(key, "ConnectionKey must not be null!"); - Sync sync = connections.get(key); + Sync sync = connections.remove(key); if (sync != null) { - connections.remove(key); sync.doWithConnection(AsyncCloseable::closeAsync); } } diff --git a/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java index c2352a7898..23d76b61d0 100644 --- a/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java @@ -125,7 +125,7 @@ void shouldCloseConnections() throws IOException { ConnectionKey connectionKey = new ConnectionKey(ConnectionIntent.READ, TestSettings.host(), TestSettings.port()); sut.getConnection(connectionKey); - TestFutures.awaitOrTimeout(sut.close()); + assertThatThrownBy(() -> TestFutures.awaitOrTimeout(sut.close())).hasCauseInstanceOf(IllegalStateException.class); assertThat(sut.getConnectionCount()).isEqualTo(0); TestFutures.awaitOrTimeout(sut.close()); From 17a8546fcd13b18ac3737bcb875e0bffb3966edc Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Sun, 8 Sep 2024 05:01:03 +0800 Subject: [PATCH 5/5] fix wrong method --- .../core/cluster/AsyncConnectionProviderIntegrationTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java index 23d76b61d0..8b531ac787 100644 --- a/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/AsyncConnectionProviderIntegrationTests.java @@ -32,7 +32,9 @@ import javax.inject.Inject; import org.apache.commons.lang3.time.StopWatch; +import org.junit.Assert; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -125,7 +127,7 @@ void shouldCloseConnections() throws IOException { ConnectionKey connectionKey = new ConnectionKey(ConnectionIntent.READ, TestSettings.host(), TestSettings.port()); sut.getConnection(connectionKey); - assertThatThrownBy(() -> TestFutures.awaitOrTimeout(sut.close())).hasCauseInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> TestFutures.awaitOrTimeout(sut.close())).isInstanceOf(IllegalStateException.class); assertThat(sut.getConnectionCount()).isEqualTo(0); TestFutures.awaitOrTimeout(sut.close());