-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18619: New consumer topic metadata events should set requireMetadata flag #18668
base: trunk
Are you sure you want to change the base?
Conversation
Hi @lianetm, I have enabled the However, I believe this is a separate issue that needs investigation and is not related to this patch. |
Thanks for the patch @frankvicky! Could you enable here all the tests that pass so we can have validation of the change? We can investigate a bit the one that still fails, if it's related to another issue we can track and address it separately. Makes sense? |
2446388
to
4c1f6dd
Compare
private def verifyWithRetryPredicate(predicate: => Boolean): Unit = { | ||
var attempts = 0 | ||
TestUtils.waitUntilTrue(() => { | ||
try { | ||
attempts += 1 | ||
predicate | ||
} catch { | ||
case _: SaslAuthenticationException => false | ||
} | ||
}, s"Operation did not succeed within timeout after $attempts") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm
I have found that the cause of testConsumerWithAuthenticationFailure
failing is due to a non-retriable assertion.
kafka/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
Line 157 in 94a1bfb
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count)) |
If we pass the assertion as an argument to verifyWithRetry
, it will fail because we don't catch AssertionFailedError
(and we shouldn't do it). The subscribe
call needs to wait for metadata updates and it's not as quick as assign
, so there's a high chance it will fail on the first poll
. As mentioned, when we pass the assertion as an argument, it will fail with AssertionFailedError
and cannot retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, makes sense to me.
But then, couldn't we just change the
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
to
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)).count == 1)
(just to avoid introducing this new verifyWithRetryPredicate
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, i'll give it a try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @frankvicky !
@@ -112,7 +112,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { | |||
} | |||
|
|||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | |||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) | |||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | |||
def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it does not use a consumer. Could you double check and if so, then no need to run it for both consumers.
} | ||
|
||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar as above, looks like it does need to run for both consumers?
private def verifyWithRetryPredicate(predicate: => Boolean): Unit = { | ||
var attempts = 0 | ||
TestUtils.waitUntilTrue(() => { | ||
try { | ||
attempts += 1 | ||
predicate | ||
} catch { | ||
case _: SaslAuthenticationException => false | ||
} | ||
}, s"Operation did not succeed within timeout after $attempts") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, makes sense to me.
But then, couldn't we just change the
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
to
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)).count == 1)
(just to avoid introducing this new verifyWithRetryPredicate
)
4c1f6dd
to
e7ba1be
Compare
JIRA: KAFKA-18619
In short, the new async consumer's topic metadata operations are unwared of metadata errors because of not overriding
requireSubscriptionMetadata
.For further details, please refer to the jira ticket.
Committer Checklist (excluded from commit message)