Skip to content

Commit

Permalink
KAFKA-18619: New consumer topic metadata events should set requireMet…
Browse files Browse the repository at this point in the history
…adata flag
  • Loading branch information
frankvicky committed Jan 24, 2025
1 parent ad79b4a commit d82baf5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs);
}

@Override
public boolean requireSubscriptionMetadata() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
createProducer(configOverrides = prop)
else
producer
verifyWithRetry(sendOneRecord(producer2))
verifyWithRetry(sendOneRecord(producer2))()
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions())
Expand All @@ -122,23 +122,23 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
verifyConsumerWithAuthenticationFailure(consumer)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
verifyConsumerWithAuthenticationFailure(consumer)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
val consumer = createConsumer()
Expand All @@ -153,12 +153,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {

createClientCredential()
val producer = createProducer()
verifyWithRetry(sendOneRecord(producer))
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
verifyWithRetry(sendOneRecord(producer))()
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
Expand All @@ -180,7 +180,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyAuthenticationException(describeTopic())

createClientCredential()
verifyWithRetry(describeTopic())
verifyWithRetry(describeTopic())()
} finally {
adminClient.close()
}
Expand Down Expand Up @@ -209,13 +209,12 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
}

private def verifyWithRetry(action: => Unit): Unit = {
private def verifyWithRetry[T](operation: => T)(predicate: T => Boolean = (_: T) => true): Unit = {
var attempts = 0
TestUtils.waitUntilTrue(() => {
try {
attempts += 1
action
true
predicate(operation)
} catch {
case _: SaslAuthenticationException => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void tearDown() {

// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, String groupProtocol) throws Exception {
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
try (Consumer<byte[], byte[]> consumer = createConsumer()) {
Expand All @@ -148,8 +148,9 @@ public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, Str
}
}

// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
public void testConsumerGroupServiceWithAuthenticationSuccess(String quorum, String groupProtocol) throws Exception {
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();
Expand Down

0 comments on commit d82baf5

Please sign in to comment.