Skip to content

Commit

Permalink
[feat] Use producer name and sequence number as fallback key in Key_S…
Browse files Browse the repository at this point in the history
…hared implementation (#23219)
  • Loading branch information
pdolif authored Oct 1, 2024
1 parent e0b754d commit 5b98d37
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public byte[] getStickyKey() {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
} else if (metadata.hasProducerName() && metadata.hasSequenceId()) {
String fallbackKey = metadata.getProducerName() + "-" + metadata.getSequenceId();
return fallbackKey.getBytes(StandardCharsets.UTF_8);
}
}
return "NONE_KEY".getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -338,11 +339,11 @@ public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsum
}

@Test(dataProvider = "data")
public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
public void testNoKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
String topicType,
boolean enableBatch
) throws PulsarClientException {
String topic = topicType + "://public/default/key_shared_none_key-" + UUID.randomUUID();
String topic = topicType + "://public/default/key_shared_no_key-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = createConsumer(topic);
Expand All @@ -362,13 +363,13 @@ public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelec
.send();
}

receive(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 100);
}

@Test(dataProvider = "batch")
public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
public void testNoKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
throws PulsarClientException {
String topic = "persistent://public/default/key_shared_none_key_exclusive-" + UUID.randomUUID();
String topic = "persistent://public/default/key_shared_no_key_exclusive-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
Expand All @@ -385,21 +386,32 @@ public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelec
@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);

int consumer1ExpectMessages = 0;
int consumer2ExpectMessages = 0;
int consumer3ExpectMessages = 0;

for (int i = 0; i < 100; i++) {
producer.newMessage()
.value(i)
.send();

String fallbackKey = producer.getProducerName() + "-" + producer.getLastSequenceId();
int slot = Murmur3_32Hash.getInstance().makeHash(fallbackKey.getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
if (slot <= 20000) {
consumer1ExpectMessages++;
} else if (slot <= 40000) {
consumer2ExpectMessages++;
} else {
consumer3ExpectMessages++;
}
}
int slot = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;

List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
if (slot <= 20000) {
checkList.add(new KeyValue<>(consumer1, 100));
} else if (slot <= 40000) {
checkList.add(new KeyValue<>(consumer2, 100));
} else {
checkList.add(new KeyValue<>(consumer3, 100));
}
checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));

receiveAndCheck(checkList);
}

Expand Down Expand Up @@ -1740,19 +1752,17 @@ private void receiveAndCheckDistribution(List<Consumer<?>> consumers, int expect
private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
if (check.getValue() % 2 != 0) {
throw new IllegalArgumentException();
}
Consumer<Integer> consumer = check.getKey();
int received = 0;
Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
for (Integer i = 0; i < check.getValue(); i++) {
Message<Integer> message = check.getKey().receive();
Message<Integer> message = consumer.receive();
if (i % 2 == 0) {
check.getKey().acknowledge(message);
consumer.acknowledge(message);
}
String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
log.info("[{}] Receive message key: {} value: {} messageId: {}",
check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
consumer.getConsumerName(), key, message.getValue(), message.getMessageId());
// check messages is order by key
if (lastMessageForKey.get(key) == null) {
Assert.assertNotNull(message);
Expand All @@ -1761,8 +1771,8 @@ private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkLis
.compareTo(lastMessageForKey.get(key).getValue()) > 0);
}
lastMessageForKey.put(key, message);
consumerKeys.putIfAbsent(check.getKey(), new HashSet<>());
consumerKeys.get(check.getKey()).add(key);
consumerKeys.putIfAbsent(consumer, new HashSet<>());
consumerKeys.get(consumer).add(key);
received++;
}
Assert.assertEquals(check.getValue().intValue(), received);
Expand All @@ -1771,12 +1781,12 @@ private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkLis
// messages not acked, test redelivery
lastMessageForKey = new HashMap<>();
for (int i = 0; i < redeliveryCount; i++) {
Message<Integer> message = check.getKey().receive();
Message<Integer> message = consumer.receive();
received++;
check.getKey().acknowledge(message);
consumer.acknowledge(message);
String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}",
check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
consumer.getConsumerName(), key, message.getValue(), message.getMessageId());
// check redelivery messages is order by key
if (lastMessageForKey.get(key) == null) {
Assert.assertNotNull(message);
Expand All @@ -1788,16 +1798,16 @@ private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkLis
}
Message noMessages = null;
try {
noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS);
noMessages = consumer.receive(100, TimeUnit.MILLISECONDS);
} catch (PulsarClientException ignore) {
}
Assert.assertNull(noMessages, "redeliver too many messages.");
Assert.assertEquals((check.getValue() + redeliveryCount), received);
}
Set<String> allKeys = new HashSet<>();
consumerKeys.forEach((k, v) -> v.forEach(key -> {
consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> {
assertTrue(allKeys.add(key),
"Key "+ key + "is distributed to multiple consumers." );
"Key " + key + " is distributed to multiple consumers." );
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,11 +1192,13 @@ protected void callMessageListener(Message<T> msg) {
static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
protected byte[] peekMessageKey(Message<?> msg) {
byte[] key = NONE_KEY;
if (msg.hasKey()) {
key = msg.getKeyBytes();
}
if (msg.hasOrderingKey()) {
key = msg.getOrderingKey();
} else if (msg.hasKey()) {
key = msg.getKeyBytes();
} else if (msg.getProducerName() != null) {
String fallbackKey = msg.getProducerName() + "-" + msg.getSequenceId();
key = fallbackKey.getBytes(StandardCharsets.UTF_8);
}
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,9 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str
return Base64.getDecoder().decode(metadata.getPartitionKey());
}
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
} else if (metadata.hasProducerName() && metadata.hasSequenceId()) {
String fallbackKey = metadata.getProducerName() + "-" + metadata.getSequenceId();
return fallbackKey.getBytes(StandardCharsets.UTF_8);
}
} catch (Throwable t) {
log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPaylo
public void testPeekStickyKey() {
String message = "msg-1";
String partitionedKey = "key1";
String producerName = "testProducer";
int sequenceId = 1;
MessageMetadata messageMetadata2 = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setSequenceId(sequenceId)
.setProducerName(producerName)
.setPartitionKey(partitionedKey)
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis());
Expand All @@ -113,16 +115,28 @@ public void testPeekStickyKey() {
// test 64 encoded
String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setSequenceId(sequenceId)
.setProducerName(producerName)
.setPartitionKey(partitionedKey2)
.setPartitionKeyB64Encoded(true)
.setPublishTime(System.currentTimeMillis());
ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
String key2 = Base64.getEncoder().encodeToString(bytes2);;
String key2 = Base64.getEncoder().encodeToString(bytes2);
Assert.assertEquals(partitionedKey2, key2);
ReferenceCountUtil.safeRelease(byteBuf2);
// test fallback key if no key given in message metadata
String fallbackPartitionedKey = producerName + "-" + sequenceId;
MessageMetadata messageMetadataWithoutKey = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName(producerName)
.setPublishTime(System.currentTimeMillis());
ByteBuf byteBuf3 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadataWithoutKey,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
byte[] bytes3 = Commands.peekStickyKey(byteBuf3, "topic-3", "sub-3");
String key3 = new String(bytes3);
Assert.assertEquals(fallbackPartitionedKey, key3);
ReferenceCountUtil.safeRelease(byteBuf3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,20 @@ public void testMsgKey() throws Exception {

thread.start();

int count1 = 0;
int count2 = 0;
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer1.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count1++;
consumer1.acknowledge(message);
}
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count2++;
consumer2.acknowledge(message);
}
//in key_share mode, only one consumer can get msg
Assert.assertTrue(count1 == 0 || count2 == 0);
// in key_shared mode if no message key is set, both consumers should receive messages
Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = consumer1.receive(1, TimeUnit.SECONDS);
assertNotNull(message);
consumer1.acknowledge(message);
});

Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
assertNotNull(message);
consumer2.acknowledge(message);
});

consumer1.close();
consumer2.close();
Expand Down Expand Up @@ -149,19 +143,15 @@ public void testMsgKey() throws Exception {
Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = newConsumer1.receive(1, TimeUnit.SECONDS);
if (message != null) {
newConsumer1.acknowledge(message);
}
assertNotNull(message);
newConsumer1.acknowledge(message);
});

Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = newConsumer2.receive(1, TimeUnit.SECONDS);
if (message != null) {
newConsumer2.acknowledge(message);
}
assertNotNull(message);
newConsumer2.acknowledge(message);
});

thread2.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -150,11 +151,11 @@ protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, i
}
}
}
// Make sure key will not be distributed to multiple consumers
// Make sure key will not be distributed to multiple consumers (except null key)
Set<String> allKeys = Sets.newHashSet();
consumerKeys.forEach((k, v) -> v.forEach(key -> {
consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> {
assertTrue(allKeys.add(key),
"Key "+ key + "is distributed to multiple consumers" );
"Key " + key + " is distributed to multiple consumers" );
}));
assertEquals(messagesReceived.size(), messagesToReceive);
}
Expand Down

0 comments on commit 5b98d37

Please sign in to comment.