Skip to content

Commit

Permalink
Add lock to ensure no failed test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Oct 28, 2024
1 parent aa18acb commit 05c8718
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ protected synchronized SharedKafkaConsumer pickConsumerForPartition(

// Safeguard logic, avoid infinite loops for searching consumer.
if (consumersChecked == consumerToConsumptionTask.size()) {
for (int consumerIdx = 0; consumerIdx < consumerToConsumptionTask.size(); consumerIdx++) {
LOGGER.info(
"Consumer: {} is consuming for: {}",
consumerIdx,
consumerToConsumptionTask.getByIndex(consumerIdx).getValue().getDestinationIdentifier(topicPartition));
}
throw new VeniceException(
"Can not find consumer for topic: " + topicPartition.getPubSubTopic().getName() + " and partition: "
+ topicPartition.getPartitionNumber() + " from the ingestion task belonging to version topic: "
Expand Down Expand Up @@ -145,7 +151,7 @@ private boolean alreadySubscribedRealtimeTopicPartition(
}

@Override
void handleUnsubscription(SharedKafkaConsumer consumer, PubSubTopicPartition pubSubTopicPartition) {
synchronized void handleUnsubscription(SharedKafkaConsumer consumer, PubSubTopicPartition pubSubTopicPartition) {
if (pubSubTopicPartition.getPubSubTopic().isRealTime()) {
Set<PubSubConsumerAdapter> rtTopicConsumers = rtTopicPartitionToConsumerMap.get(pubSubTopicPartition);
if (rtTopicConsumers != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,16 @@ private void verifyConsumerServiceStartConsumptionIntoDataReceiver(
@Test(invocationCount = 5)
public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception {
ApacheKafkaConsumerAdapter consumer1 = mock(ApacheKafkaConsumerAdapter.class);
when(consumer1.hasAnySubscription()).thenReturn(true);
ApacheKafkaConsumerAdapter consumer2 = mock(ApacheKafkaConsumerAdapter.class);
ApacheKafkaConsumerAdapter consumer3 = mock(ApacheKafkaConsumerAdapter.class);
ApacheKafkaConsumerAdapter consumer4 = mock(ApacheKafkaConsumerAdapter.class);
ApacheKafkaConsumerAdapter consumer5 = mock(ApacheKafkaConsumerAdapter.class);
ApacheKafkaConsumerAdapter consumer6 = mock(ApacheKafkaConsumerAdapter.class);
// when(consumer1.hasAnySubscription()).thenReturn(true);

PubSubConsumerAdapterFactory factory = mock(PubSubConsumerAdapterFactory.class);
when(factory.create(any(), anyBoolean(), any(), any())).thenReturn(consumer1);
when(factory.create(any(), anyBoolean(), any(), any()))
.thenReturn(consumer1, consumer2, consumer3, consumer4, consumer5, consumer6);

Properties properties = new Properties();
String testKafkaUrl = "test_kafka_url";
Expand Down Expand Up @@ -576,6 +582,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
Assert.assertFalse(
raceConditionFound,
"Found race condition in KafkaConsumerService with time passed in milliseconds: " + elapsedTime);
delegator.close();
}

private Runnable getResubscriptionRunnableFor(
Expand Down

0 comments on commit 05c8718

Please sign in to comment.