From 2bfce4436956c0fe8317d23947e0c9793cc90ece Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 22 Jan 2025 09:10:30 +0800 Subject: [PATCH] KAFKA-18310: Flaky AbstractCoordinatorTest Signed-off-by: PoAn Yang --- .../internals/AbstractCoordinatorTest.java | 84 +++++++++++++++---- 1 file changed, 69 insertions(+), 15 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 969de83c328ff..d415e59fbb553 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -59,7 +59,6 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -1435,7 +1434,6 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio awaitFirstHeartbeat(heartbeatReceived); } - @Tag("flaky") // "KAFKA-18310" @Test public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception { setupCoordinator(); @@ -1456,14 +1454,34 @@ public boolean matches(AbstractRequest body) { }, syncGroupResponse(Errors.NONE)); AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); - assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()"); + // If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException, + // because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest. + boolean exceptionCaught = false; + try { + coordinator.ensureActiveGroup(); + } catch (WakeupException ignored) { + exceptionCaught = true; + } assertEquals(1, coordinator.onJoinPrepareInvokes); - assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + if (exceptionCaught) { + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + } else { + // Although the first ensureActiveGroup() doesn't throw WakeupException, + // heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest. + // The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll. + TestUtils.waitForCondition(() -> { + try { + consumerClient.poll(mockTime.timer(0)); + return false; + } catch (WakeupException e) { + return true; + } + }, "Should have WakeupException from poll"); + } - // the join group completes in this poll() - consumerClient.poll(mockTime.timer(0)); + // the join group completes in this ensureActiveGroup() or it's already completed if exceptionCaught is false coordinator.ensureActiveGroup(); assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -1472,7 +1490,6 @@ public boolean matches(AbstractRequest body) { awaitFirstHeartbeat(heartbeatReceived); } - @Tag("flaky") // "KAFKA-18310" @Test public void testWakeupAfterSyncGroupReceived() throws Exception { setupCoordinator(); @@ -1488,15 +1505,32 @@ public void testWakeupAfterSyncGroupReceived() throws Exception { }, syncGroupResponse(Errors.NONE)); AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + // If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException, + // because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest. + boolean exceptionCaught = false; try { coordinator.ensureActiveGroup(); - fail("Should have woken up from ensureActiveGroup()"); } catch (WakeupException ignored) { + exceptionCaught = true; } assertEquals(1, coordinator.onJoinPrepareInvokes); - assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + if (exceptionCaught) { + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + } else { + // Although the first ensureActiveGroup() doesn't throw WakeupException, + // heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest. + // The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll. + TestUtils.waitForCondition(() -> { + try { + consumerClient.poll(mockTime.timer(0)); + return false; + } catch (WakeupException e) { + return true; + } + }, "Should have WakeupException from poll"); + } coordinator.ensureActiveGroup(); @@ -1506,7 +1540,6 @@ public void testWakeupAfterSyncGroupReceived() throws Exception { awaitFirstHeartbeat(heartbeatReceived); } - @Tag("flaky") // KAFKA-15474 and KAFKA-18310 @Test public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception { setupCoordinator(); @@ -1522,11 +1555,32 @@ public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exceptio }, syncGroupResponse(Errors.NONE)); AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); - assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()"); + // If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException, + // because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest. + boolean exceptionCaught = false; + try { + coordinator.ensureActiveGroup(); + } catch (WakeupException ignored) { + exceptionCaught = true; + } assertEquals(1, coordinator.onJoinPrepareInvokes); - assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + if (exceptionCaught) { + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + } else { + // Although the first ensureActiveGroup() doesn't throw WakeupException, + // heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest. + // The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll. + TestUtils.waitForCondition(() -> { + try { + consumerClient.poll(mockTime.timer(0)); + return false; + } catch (WakeupException e) { + return true; + } + }, "Should have WakeupException from poll"); + } coordinator.ensureActiveGroup();