From a1df57f51b1e13b8eb58f3faf42993330f9c3cc2 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 20:36:59 +0800 Subject: [PATCH 1/6] addressed handle fatal error --- .../AbstractHeartbeatRequestManager.java | 6 +++ .../internals/CommitRequestManager.java | 16 +++++++- .../internals/CoordinatorRequestManager.java | 23 +++++++---- .../clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/CommitRequestManagerTest.java | 17 +++++++++ .../CoordinatorRequestManagerTest.java | 38 +++++++++---------- .../kafka/api/AuthorizerIntegrationTest.scala | 6 +-- .../kafka/server/QuorumTestHarness.scala | 4 -- 8 files changed, 76 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 1f8ddc725b58d..e6dbf846eacb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -161,6 +161,7 @@ public abstract class AbstractHeartbeatRequestManager backgroundEventHandler.add(new ErrorEvent(fatalError))); + } + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 4f0deef5bf890..5e94ec22b8b75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -176,9 +176,11 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - // poll only when the coordinator node is known. - if (coordinatorRequestManager.coordinator().isEmpty()) + // poll when the coordinator node is known and fatal error is not present + if (coordinatorRequestManager.coordinator().isEmpty()) { + pendingRequests.maybeFailOnCoordinatorFatalError(); return EMPTY; + } if (closing) { return drainPendingOffsetCommitRequests(); @@ -1248,6 +1250,16 @@ private List drainPendingCommits() { } } + private void maybeFailOnCoordinatorFatalError() { + coordinatorRequestManager.fatalError().ifPresent(error -> { + log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); + unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); + unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); + clearAll(); + } + ); + } + /** * Encapsulates the state of auto-committing and manages the auto-commit timer. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 4664267a0e858..2c94de0769d83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; @@ -53,24 +51,27 @@ public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; - private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; private Node coordinator; + // Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take + // appropriate actions. + // For example: + // - AbstractHeartbeatRequestManager propagates the error event to the application thread. + // - CommitRequestManager fail pending requests. + private Optional fatalError = Optional.empty(); public CoordinatorRequestManager( final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, - final BackgroundEventHandler errorHandler, final String groupId ) { Objects.requireNonNull(groupId); this.log = logContext.logger(this.getClass()); - this.backgroundEventHandler = errorHandler; this.groupId = groupId; this.coordinatorRequestState = new RequestState( logContext, @@ -200,12 +201,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); + fatalError = Optional.of(groupAuthorizationException); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - backgroundEventHandler.add(new ErrorEvent(exception)); + fatalError = Optional.of(exception); } /** @@ -244,4 +245,12 @@ private void onResponse( public Optional coordinator() { return Optional.ofNullable(this.coordinator); } + + private void clearFatalError() { + this.fatalError = Optional.empty(); + } + + public Optional fatalError() { + return fatalError; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 33ca2844305c7..2749563df2742 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In // by the background thread, so it can realize there is authentication fail and then // throw the AuthenticationException assertPollEventuallyThrows(consumer, AuthenticationException.class, - "he consumer was not able to discover metadata errors during continuous polling."); + "this consumer was not able to discover metadata errors during continuous polling."); } else { assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 252d5a7ccbd08..1b7ed8599238d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -1499,6 +1499,23 @@ public void testSignalClose() { assertEquals("topic", data.topics().get(0).name()); } + @Test + public void testPollWithFatalErrorShouldFailAllUnsentRequests() { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200); + assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size()); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); + + assertEmptyPendingRequests(commitRequestManager); + } + private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 7e805dc3cd3b6..0ed902d7f278d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -18,9 +18,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -35,6 +33,8 @@ import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.List; @@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; public class CoordinatorRequestManagerTest { @@ -191,23 +189,10 @@ public void testBackoffAfterRetriableFailure() { } @Test - public void testPropagateAndBackoffAfterFatalError() { + public void testBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - verify(backgroundEventHandler).add(argThat(backgroundEvent -> { - if (!(backgroundEvent instanceof ErrorEvent)) - return false; - - RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); - - if (!(exception instanceof GroupAuthorizationException)) - return false; - - GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; - return groupAuthException.groupId().equals(GROUP_ID); - })); - time.sleep(RETRY_BACKOFF_MS - 1); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); @@ -254,6 +239,22 @@ public void testNetworkTimeout() { assertEquals(1, res2.unsentRequests.size()); } + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"}) + public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) { + CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); + expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); + assertTrue(coordinatorManager.fatalError().isPresent()); + + time.sleep(RETRY_BACKOFF_MS); + // there are no successful responses, so the fatal error should persist + assertTrue(coordinatorManager.fatalError().isPresent()); + + // receiving a successful response should clear the fatal error + expectFindCoordinatorRequest(coordinatorManager, error); + assertTrue(coordinatorManager.fatalError().isEmpty()); + } + private void expectFindCoordinatorRequest( CoordinatorRequestManager coordinatorManager, Errors error @@ -273,7 +274,6 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) { new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, - this.backgroundEventHandler, groupId ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 3ee420f2c4992..8ec6b3c532776 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) @@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) val consumer = createConsumer() @@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index a8a66dbd54f5c..d666bee6a41c2 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -441,8 +441,4 @@ object QuorumTestHarness { // The following is for tests that only work with the classic group protocol because of relying on Zookeeper def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT))) - - // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer - // implementation that would otherwise cause tests to fail. - def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly } From c1ac5563658238a122f054b46735da616a01fa49 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 20:44:28 +0800 Subject: [PATCH 2/6] addressed handle fatal error --- .../internals/CommitRequestManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 5e94ec22b8b75..1d3503886a9c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1248,16 +1248,16 @@ private List drainPendingCommits() { clearAll(); return res; } - } - private void maybeFailOnCoordinatorFatalError() { - coordinatorRequestManager.fatalError().ifPresent(error -> { - log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); - unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); - unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); - clearAll(); - } - ); + private void maybeFailOnCoordinatorFatalError() { + coordinatorRequestManager.fatalError().ifPresent(error -> { + log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); + unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); + unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); + clearAll(); + } + ); + } } /** From b3f579ac8438afe8e5e17b3aef6c32b04053cbb9 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 20:48:22 +0800 Subject: [PATCH 3/6] addressed handle fatal error --- .../kafka/clients/consumer/internals/RequestManagers.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 304f0fffd4ad5..960567ac96bec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -194,7 +194,6 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); commitRequestManager = new CommitRequestManager( time, @@ -295,7 +294,6 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, From 7314509061707536fc7f98eef55aa24c882cd7fc Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 16 Jan 2025 01:28:21 +0800 Subject: [PATCH 4/6] add clear fatal error --- .../kafka/clients/consumer/internals/CommitRequestManager.java | 1 + .../clients/consumer/internals/CoordinatorRequestManager.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 1d3503886a9c4..50c5c57c83249 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1254,6 +1254,7 @@ private void maybeFailOnCoordinatorFatalError() { log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); + coordinatorRequestManager.clearFatalError(); clearAll(); } ); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 2c94de0769d83..911b9d7ab5a08 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -115,6 +115,7 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren ); return unsentRequest.whenComplete((clientResponse, throwable) -> { + clearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -246,7 +247,7 @@ public Optional coordinator() { return Optional.ofNullable(this.coordinator); } - private void clearFatalError() { + public void clearFatalError() { this.fatalError = Optional.empty(); } From 9f623e63318cf39867cb11ab5e6da77c77792dd7 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 17 Jan 2025 22:19:51 +0800 Subject: [PATCH 5/6] gpb --- .../consumer/internals/AbstractHeartbeatRequestManager.java | 1 + .../kafka/clients/consumer/internals/CommitRequestManager.java | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index e6dbf846eacb8..165c809928aca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -267,6 +267,7 @@ public void resetPollTimer(final long pollMs) { private void maybePropagateCoordinatorFatalErrorEvent() { coordinatorRequestManager.fatalError() .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); + coordinatorRequestManager.clearFatalError(); } private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 50c5c57c83249..1d3503886a9c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1254,7 +1254,6 @@ private void maybeFailOnCoordinatorFatalError() { log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); - coordinatorRequestManager.clearFatalError(); clearAll(); } ); From 1c565c41d0d3d024dac44e2c4fec86146153c84f Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 25 Jan 2025 20:59:07 +0800 Subject: [PATCH 6/6] update the method name --- .../consumer/internals/AbstractHeartbeatRequestManager.java | 3 +-- .../consumer/internals/CoordinatorRequestManager.java | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 165c809928aca..eb7611f7b8a73 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -265,9 +265,8 @@ public void resetPollTimer(final long pollMs) { } private void maybePropagateCoordinatorFatalErrorEvent() { - coordinatorRequestManager.fatalError() + coordinatorRequestManager.getAndClearFatalError() .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); - coordinatorRequestManager.clearFatalError(); } private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 911b9d7ab5a08..5aa8f4d2c148d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -115,7 +115,7 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren ); return unsentRequest.whenComplete((clientResponse, throwable) -> { - clearFatalError(); + getAndClearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -247,8 +247,10 @@ public Optional coordinator() { return Optional.ofNullable(this.coordinator); } - public void clearFatalError() { + public Optional getAndClearFatalError() { + Optional fatalError = this.fatalError; this.fatalError = Optional.empty(); + return fatalError; } public Optional fatalError() {