Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18618: Improve leader change handling of acknowledgements [1/N] #18672

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

AndrewJSchofield
Copy link
Member

There is a problem in the share consumer where records fetched from node A might be acknowledged with node B, if node B used to be the leader of the topic-partition and A becomes unavailable.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@AndrewJSchofield AndrewJSchofield marked this pull request as draft January 22, 2025 17:50
@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Jan 22, 2025
@clolov
Copy link
Contributor

clolov commented Jan 23, 2025

I know this is still marked as a Draft, but how difficult will it be to add a test which tests the edge case which uncovered this? I believe I can follow the reasoning otherwise i.e. you only want to send the acknowledgements to the nodes from which the in-flight records originated.

@AndrewJSchofield
Copy link
Member Author

I know this is still marked as a Draft, but how difficult will it be to add a test which tests the edge case which uncovered this? I believe I can follow the reasoning otherwise i.e. you only want to send the acknowledgements to the nodes from which the in-flight records originated.

Not that difficult. Might be a follow-on PR to complete the testing, but we have two pieces of work in progress. First, we are developing tests using MockClient which can pretend to be a cluster of multiple brokers in which we can very deliberately introduce specific changes of leadership to exercise all of the code paths. Then, we are starting to run system tests which roll the cluster while performing a long-running workload. It was a rolling restart system test that showed the problem initially.

@clolov
Copy link
Contributor

clolov commented Jan 23, 2025

Awesome! Looking forward to seeing those 😊

@AndrewJSchofield AndrewJSchofield marked this pull request as ready for review January 26, 2025 21:31
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me. Left some minor comments.

*
* @return the error code
*/
public Errors getAcknowledgeErrorCode() {
return acknowledgeErrorCode;
public KafkaException getAcknowledgeException() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally avoid get prefix in methods so should it be as like below?

Suggested change
public KafkaException getAcknowledgeException() {
public KafkaException acknowledgeException() {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In public classes, yes, I agree. However, here there are many accessor methods already present and I don't think renaming them all improves things.

this.acknowledgements = acknowledgements;
}

public int getNodeId() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public int getNodeId() {
public int nodeId() {

return nodeId;
}

public Acknowledgements getAcknowledgements() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public Acknowledgements getAcknowledgements() {
public Acknowledgements acknowledgements() {

/**
* This class combines Acknowledgements with the id of the node to use for acknowledging.
*/
public class NodeAcknowledgements {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seems this can be a record class itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think that's allowed in the client. Records were introduced in Java 14 and can be used in the broker which builds with Java 17.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right, client is not yet there.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one query.

* <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called
* <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called
* <li> {@link KafkaException} for any other unrecoverable errors
* </ul>
* <p>Note that if the exception is a retriable exception, the acknowledgement could not be completed and the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For non-retribale exceptions it will fetch anyways again so I expect we mean that even for retriable exception, the fetch will happen again?

Suggested change
* <p>Note that if the exception is a retriable exception, the acknowledgement could not be completed and the
* <p>Note that if the exception is even a retriable exception, the acknowledgement could not be completed and the

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will reword.

metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend);
Acknowledgements acknowledgementsToSend = null;
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query: Can there be ever some acknowledgments which can starve in the fetchAcknowledgementsToSend when some nodeId is never received from broker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there will need to be a follow-on PR for this. In a way, you're correct, but I think to maintain the ordering of things, we need to fail some acknowledgements more aggressively from the poll loop. I had been avoiding that, but I think it's inevitable actually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);
NodeAcknowledgements acknowledgements = acknowledgementsMap.get(tip);
if ((acknowledgements != null) && (acknowledgements.nodeId() == node.id())) {
Copy link
Contributor

@ShivsundarR ShivsundarR Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, should we add a null check for the actual Acknowledgements class inside NodeAcknowledgements.
Although ideally as we already have a null check for NodeAcknowledgements, the case shouldn't ever arise. Just for it be consistent with other parts of the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename the variable acknowledgements to nodeAcknowledgements to make it a bit clearer to read. I don't believe that nodeAcknowledgements.acknowledgements() can be null because the empty case is Acknowledgements.empty() and the fetching code in the application thread only sends the acks to be processed if they are non-empty. I can put a null check in the constructor for NodeAcknowledgements.

Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
metricsManager.recordAcknowledgementSent(acks.size());
Copy link
Contributor

@ShivsundarR ShivsundarR Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the mapping (tip, acks) from nodeAcksFromFetchMap(effectively removing them from fetchAcknowledgementsToSend) so that we do not process these acknowledgements again in the next iteration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point.

Acknowledgements acknowledgements = Acknowledgements.empty();
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.get(tip);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a nodeAcksFromFetchMap.remove(tip) here? Although here as we are closing, we would not be using fetchAcknowledgementsToSend again ideally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters because this is on the close path, but I think you're right that it's an improvement.

Copy link
Contributor

@ShivsundarR ShivsundarR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield Thanks for the PR!! Just had a couple of comments.

Copy link
Contributor

@ShivsundarR ShivsundarR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! LGTM!

@AndrewJSchofield AndrewJSchofield changed the title KAFKA-18618: Improve leader change handling of acknowledgements KAFKA-18618: Improve leader change handling of acknowledgements [1/N] Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants