Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors #18548

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
Expand Down Expand Up @@ -263,6 +264,12 @@ public void resetPollTimer(final long pollMs) {
pollTimer.reset(maxPollIntervalMs);
}

private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.fatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
coordinatorRequestManager.clearFatalError();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @kirktrue and @lianetm , I think the main reason is that we should clear the fatal error after we offer exception into background thread. I am still tracing which path would lead test timeout

Copy link
Member

@lianetm lianetm Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, I'll take a closer look too asap. In the meantime I re-triggered the build so we can keep validating it.

}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,11 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll only when the coordinator node is known.
if (coordinatorRequestManager.coordinator().isEmpty())
// poll when the coordinator node is known and fatal error is not present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
return EMPTY;
}

if (closing) {
return drainPendingOffsetCommitRequests();
Expand Down Expand Up @@ -1246,6 +1248,16 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
clearAll();
return res;
}

private void maybeFailOnCoordinatorFatalError() {
coordinatorRequestManager.fatalError().ifPresent(error -> {
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
clearAll();
}
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
Expand Down Expand Up @@ -53,24 +51,27 @@
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;

private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
private long totalDisconnectedMin = 0;
private Node coordinator;
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
// appropriate actions.
// For example:
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
// - CommitRequestManager fail pending requests.
private Optional<Throwable> fatalError = Optional.empty();

public CoordinatorRequestManager(
final LogContext logContext,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId
) {
Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId;
this.coordinatorRequestState = new RequestState(
logContext,
Expand Down Expand Up @@ -114,6 +115,7 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren
);

return unsentRequest.whenComplete((clientResponse, throwable) -> {
clearFatalError();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response);
Expand Down Expand Up @@ -200,12 +202,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException));
fatalError = Optional.of(groupAuthorizationException);
return;
}

log.warn("FindCoordinator request failed due to fatal exception", exception);
backgroundEventHandler.add(new ErrorEvent(exception));
fatalError = Optional.of(exception);
}

/**
Expand Down Expand Up @@ -244,4 +246,12 @@ private void onResponse(
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}

public void clearFatalError() {
this.fatalError = Optional.empty();
}

public Optional<Throwable> fatalError() {
return fatalError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager(
time,
Expand Down Expand Up @@ -295,7 +294,6 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
"he consumer was not able to discover metadata errors during continuous polling.");
"this consumer was not able to discover metadata errors during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,23 @@ public void testSignalClose() {
assertEquals("topic", data.topics().get(0).name());
}

@Test
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));

commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());

when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));

assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));

assertEmptyPendingRequests(commitRequestManager);
}

private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -35,6 +33,8 @@
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Collections;
import java.util.List;
Expand All @@ -49,9 +49,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class CoordinatorRequestManagerTest {
Expand Down Expand Up @@ -191,23 +189,10 @@ public void testBackoffAfterRetriableFailure() {
}

@Test
public void testPropagateAndBackoffAfterFatalError() {
public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);

verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
if (!(backgroundEvent instanceof ErrorEvent))
return false;

RuntimeException exception = ((ErrorEvent) backgroundEvent).error();

if (!(exception instanceof GroupAuthorizationException))
return false;

GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
return groupAuthException.groupId().equals(GROUP_ID);
}));

time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);

Expand Down Expand Up @@ -254,6 +239,22 @@ public void testNetworkTimeout() {
assertEquals(1, res2.unsentRequests.size());
}

@ParameterizedTest
@EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
assertTrue(coordinatorManager.fatalError().isPresent());

time.sleep(RETRY_BACKOFF_MS);
// there are no successful responses, so the fatal error should persist
assertTrue(coordinatorManager.fatalError().isPresent());

// receiving a successful response should clear the fatal error
expectFindCoordinatorRequest(coordinatorManager, error);
assertTrue(coordinatorManager.fatalError().isEmpty());
}

private void expectFindCoordinatorRequest(
CoordinatorRequestManager coordinatorManager,
Errors error
Expand All @@ -273,7 +274,6 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) {
new LogContext(),
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS,
this.backgroundEventHandler,
groupId
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
Expand Down Expand Up @@ -1302,7 +1302,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer()
Expand All @@ -1328,7 +1328,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,4 @@ object QuorumTestHarness {

// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))

// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
}
Loading