Skip to content

Commit

Permalink
Added complex share consumer test.
Browse files Browse the repository at this point in the history
  • Loading branch information
smjn committed Jan 23, 2025
1 parent d5e90f9 commit 769df3a
Showing 1 changed file with 140 additions and 0 deletions.
140 changes: 140 additions & 0 deletions core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -49,6 +50,7 @@
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
Expand Down Expand Up @@ -82,6 +84,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1907,9 +1911,83 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(20, records.count());
}

verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest(
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
}
)
public void testComplexShareConsumer() throws Exception {
setup();
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
TopicPartition multiTp = new TopicPartition(topicName, 0);

ExecutorService executer = Executors.newCachedThreadPool();

AtomicBoolean prodDone = new AtomicBoolean(false);
AtomicInteger sentCount = new AtomicInteger(0);

// produce messages until we want
executer.execute(() -> {
while (!prodDone.get()) {
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
sentCount.incrementAndGet();
}
}
});

// init a complex share consumer
ComplexShareConsumer complexCons1 = new ComplexShareConsumer(
cluster.bootstrapServers(),
topicName,
groupId,
Map.of()
);

executer.execute(complexCons1);

// let the complex consumer read the messages
executer.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10L);
prodDone.set(true);
} catch (InterruptedException e) {
// ignore
}
});

// all messages which can be read are read, some would be redelivered
TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did not close!");
assertTrue(sentCount.get() < complexCons1.recordsRead());

executer.shutdown();
executer.shutdownNow();

verifyShareGroupStateTopicRecordsProduced();
}

private int produceMessages(int messageCount) {
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
Expand Down Expand Up @@ -2123,4 +2201,66 @@ private void alterShareAutoOffsetReset(String groupId, String newValue) {
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
}
}

private static class ComplexShareConsumer implements Runnable {
public static final int POLL_TIMEOUT_MS = 15000;
public static final int MAX_DELIVERY_COUNT = 5;

private final String topicName;
private final Map<String, Object> configs = new HashMap<>();
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final AtomicBoolean shouldLoop = new AtomicBoolean(true);
private final AtomicInteger readCount = new AtomicInteger(0);

ComplexShareConsumer(
String bootstrapServers,
String topicName,
String groupId,
Map<String, Object> additionalProperties
) {
this.topicName = topicName;
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.putAll(additionalProperties);
configs.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
configs.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
}

void stop() {
shouldLoop.set(false);
}

@Override
public void run() {
try (KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(configs)) {
consumer.subscribe(Set.of(this.topicName));
while (shouldLoop.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
readCount.addAndGet(records.count());
if (records.isEmpty()) {
break;
}
for (ConsumerRecord<String, String> record : records) {
short deliveryCountBeforeAccept = (short) ((record.offset() + record.offset() / (MAX_DELIVERY_COUNT + 2)) % (MAX_DELIVERY_COUNT + 2));
if (deliveryCountBeforeAccept == 0) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
} else if (record.deliveryCount().get() == deliveryCountBeforeAccept) {
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} else {
consumer.acknowledge(record, AcknowledgeType.RELEASE);
}
}
}
}
isDone.set(true);
}

boolean isDone() {
return isDone.get();
}

int recordsRead() {
return readCount.get();
}
}
}

0 comments on commit 769df3a

Please sign in to comment.