diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index bf57daae16dc..57ffcfc341b3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -727,7 +728,11 @@ public void notifyReAuthentication() { if (_messageDispatcher == null) { return; } - _messageDispatcher.notifyReAuthentication(); + + // use another thread to do the notification so that the server operation won't be blocked + ExecutorService threadPool = + _cache.getDistributionManager().getExecutors().getWaitingThreadPool(); + threadPool.submit(() -> _messageDispatcher.notifyReAuthentication()); } /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java index 3b4c63f82ec7..9b150be3f3d3 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java @@ -20,7 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -30,12 +32,18 @@ import java.net.InetAddress; import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.shiro.subject.Subject; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.mockito.stubbing.Answer; import org.apache.geode.StatisticsFactory; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.OperationExecutors; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory; @@ -43,6 +51,7 @@ import org.apache.geode.internal.security.SecurityService; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.statistics.StatisticsClock; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; public class CacheClientProxyTest { private CacheClientProxy proxyWithSingleUser; @@ -71,6 +80,7 @@ public void before() throws Exception { when(socket.getInetAddress()).thenReturn(inetAddress); when(notifier.getAcceptorStats()).thenReturn(stats); id = mock(ClientProxyMembershipID.class); + when(id.getDurableId()).thenReturn("proxy_id"); version = KnownVersion.TEST_VERSION; securityService = mock(SecurityService.class); subject = mock(Subject.class); @@ -175,4 +185,37 @@ public void close_multiUser_calls_ClientUserAuthsCleanUp() { verify(subject, never()).logout(); verify(clientUserAuths, times(1)).cleanup(anyBoolean()); } + + @Rule + public ExecutorServiceRule executorService = new ExecutorServiceRule(); + + @Test + public void notifyReAuthenticationIsNotBlocked() { + CacheClientProxy spy = spy(proxyWithSingleUser); + MessageDispatcher dispatcher = mock(MessageDispatcher.class); + doReturn(dispatcher).when(spy).createMessageDispatcher(any()); + spy.initializeMessageDispatcher(); + DistributionManager manager = mock(DistributionManager.class); + OperationExecutors executors = mock(OperationExecutors.class); + ExecutorService executor = executorService.getExecutorService(); + when(cache.getDistributionManager()).thenReturn(manager); + when(manager.getExecutors()).thenReturn(executors); + when(executors.getWaitingThreadPool()).thenReturn(executor); + + AtomicBoolean updated = new AtomicBoolean(false); + + // simulating a blocked message dispatcher when notify reauth + doAnswer((Answer) invocation -> { + while (!updated.get()) { + Thread.sleep(200); + } + return null; + }).when(dispatcher).notifyReAuthentication(); + + // proxy.notifyReauthentication won't be blocked + spy.notifyReAuthentication(); + assertThat(updated.get()).isFalse(); + } + + }