diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java index 70643d5de2a3f..efa89a8ff16f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java @@ -55,6 +55,9 @@ public byte[] getStickyKey() { return metadata.getOrderingKey(); } else if (metadata.hasPartitionKey()) { return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8); + } else if (metadata.hasProducerName() && metadata.hasSequenceId()) { + String fallbackKey = metadata.getProducerName() + "-" + metadata.getSequenceId(); + return fallbackKey.getBytes(StandardCharsets.UTF_8); } } return "NONE_KEY".getBytes(StandardCharsets.UTF_8); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index ddf7b0f1d5ee2..c08c37b413f4f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -44,6 +44,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -338,11 +339,11 @@ public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsum } @Test(dataProvider = "data") - public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector( + public void testNoKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector( String topicType, boolean enableBatch ) throws PulsarClientException { - String topic = topicType + "://public/default/key_shared_none_key-" + UUID.randomUUID(); + String topic = topicType + "://public/default/key_shared_no_key-" + UUID.randomUUID(); @Cleanup Consumer consumer1 = createConsumer(topic); @@ -362,13 +363,13 @@ public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelec .send(); } - receive(Lists.newArrayList(consumer1, consumer2, consumer3)); + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 100); } @Test(dataProvider = "batch") - public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) + public void testNoKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException { - String topic = "persistent://public/default/key_shared_none_key_exclusive-" + UUID.randomUUID(); + String topic = "persistent://public/default/key_shared_no_key_exclusive-" + UUID.randomUUID(); @Cleanup Consumer consumer1 = createConsumer(topic, KeySharedPolicy.stickyHashRange() @@ -385,21 +386,32 @@ public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelec @Cleanup Producer producer = createProducer(topic, enableBatch); + int consumer1ExpectMessages = 0; + int consumer2ExpectMessages = 0; + int consumer3ExpectMessages = 0; + for (int i = 0; i < 100; i++) { producer.newMessage() .value(i) .send(); + + String fallbackKey = producer.getProducerName() + "-" + producer.getLastSequenceId(); + int slot = Murmur3_32Hash.getInstance().makeHash(fallbackKey.getBytes()) + % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + if (slot <= 20000) { + consumer1ExpectMessages++; + } else if (slot <= 40000) { + consumer2ExpectMessages++; + } else { + consumer3ExpectMessages++; + } } - int slot = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes()) - % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + List, Integer>> checkList = new ArrayList<>(); - if (slot <= 20000) { - checkList.add(new KeyValue<>(consumer1, 100)); - } else if (slot <= 40000) { - checkList.add(new KeyValue<>(consumer2, 100)); - } else { - checkList.add(new KeyValue<>(consumer3, 100)); - } + checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages)); + checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages)); + checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages)); + receiveAndCheck(checkList); } @@ -1740,19 +1752,17 @@ private void receiveAndCheckDistribution(List> consumers, int expect private void receiveAndCheck(List, Integer>> checkList) throws PulsarClientException { Map> consumerKeys = new HashMap<>(); for (KeyValue, Integer> check : checkList) { - if (check.getValue() % 2 != 0) { - throw new IllegalArgumentException(); - } + Consumer consumer = check.getKey(); int received = 0; Map> lastMessageForKey = new HashMap<>(); for (Integer i = 0; i < check.getValue(); i++) { - Message message = check.getKey().receive(); + Message message = consumer.receive(); if (i % 2 == 0) { - check.getKey().acknowledge(message); + consumer.acknowledge(message); } String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey(); log.info("[{}] Receive message key: {} value: {} messageId: {}", - check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId()); + consumer.getConsumerName(), key, message.getValue(), message.getMessageId()); // check messages is order by key if (lastMessageForKey.get(key) == null) { Assert.assertNotNull(message); @@ -1761,8 +1771,8 @@ private void receiveAndCheck(List, Integer>> checkLis .compareTo(lastMessageForKey.get(key).getValue()) > 0); } lastMessageForKey.put(key, message); - consumerKeys.putIfAbsent(check.getKey(), new HashSet<>()); - consumerKeys.get(check.getKey()).add(key); + consumerKeys.putIfAbsent(consumer, new HashSet<>()); + consumerKeys.get(consumer).add(key); received++; } Assert.assertEquals(check.getValue().intValue(), received); @@ -1771,12 +1781,12 @@ private void receiveAndCheck(List, Integer>> checkLis // messages not acked, test redelivery lastMessageForKey = new HashMap<>(); for (int i = 0; i < redeliveryCount; i++) { - Message message = check.getKey().receive(); + Message message = consumer.receive(); received++; - check.getKey().acknowledge(message); + consumer.acknowledge(message); String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey(); log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}", - check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId()); + consumer.getConsumerName(), key, message.getValue(), message.getMessageId()); // check redelivery messages is order by key if (lastMessageForKey.get(key) == null) { Assert.assertNotNull(message); @@ -1788,16 +1798,16 @@ private void receiveAndCheck(List, Integer>> checkLis } Message noMessages = null; try { - noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS); + noMessages = consumer.receive(100, TimeUnit.MILLISECONDS); } catch (PulsarClientException ignore) { } Assert.assertNull(noMessages, "redeliver too many messages."); Assert.assertEquals((check.getValue() + redeliveryCount), received); } Set allKeys = new HashSet<>(); - consumerKeys.forEach((k, v) -> v.forEach(key -> { + consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> { assertTrue(allKeys.add(key), - "Key "+ key + "is distributed to multiple consumers." ); + "Key " + key + " is distributed to multiple consumers." ); })); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 111cbdb8a8ef3..3073f3a833487 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1192,11 +1192,13 @@ protected void callMessageListener(Message msg) { static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8); protected byte[] peekMessageKey(Message msg) { byte[] key = NONE_KEY; - if (msg.hasKey()) { - key = msg.getKeyBytes(); - } if (msg.hasOrderingKey()) { key = msg.getOrderingKey(); + } else if (msg.hasKey()) { + key = msg.getKeyBytes(); + } else if (msg.getProducerName() != null) { + String fallbackKey = msg.getProducerName() + "-" + msg.getSequenceId(); + key = fallbackKey.getBytes(StandardCharsets.UTF_8); } return key; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 3fb2fd5ad3d25..15b5676094ec1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1983,6 +1983,9 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str return Base64.getDecoder().decode(metadata.getPartitionKey()); } return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8); + } else if (metadata.hasProducerName() && metadata.hasSequenceId()) { + String fallbackKey = metadata.getProducerName() + "-" + metadata.getSequenceId(); + return fallbackKey.getBytes(StandardCharsets.UTF_8); } } catch (Throwable t) { log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index 42f1a58100283..a1f79b7ae7faf 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -98,9 +98,11 @@ private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPaylo public void testPeekStickyKey() { String message = "msg-1"; String partitionedKey = "key1"; + String producerName = "testProducer"; + int sequenceId = 1; MessageMetadata messageMetadata2 = new MessageMetadata() - .setSequenceId(1) - .setProducerName("testProducer") + .setSequenceId(sequenceId) + .setProducerName(producerName) .setPartitionKey(partitionedKey) .setPartitionKeyB64Encoded(false) .setPublishTime(System.currentTimeMillis()); @@ -113,16 +115,28 @@ public void testPeekStickyKey() { // test 64 encoded String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8)); MessageMetadata messageMetadata = new MessageMetadata() - .setSequenceId(1) - .setProducerName("testProducer") + .setSequenceId(sequenceId) + .setProducerName(producerName) .setPartitionKey(partitionedKey2) .setPartitionKeyB64Encoded(true) .setPublishTime(System.currentTimeMillis()); ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8))); byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2"); - String key2 = Base64.getEncoder().encodeToString(bytes2);; + String key2 = Base64.getEncoder().encodeToString(bytes2); Assert.assertEquals(partitionedKey2, key2); ReferenceCountUtil.safeRelease(byteBuf2); + // test fallback key if no key given in message metadata + String fallbackPartitionedKey = producerName + "-" + sequenceId; + MessageMetadata messageMetadataWithoutKey = new MessageMetadata() + .setSequenceId(sequenceId) + .setProducerName(producerName) + .setPublishTime(System.currentTimeMillis()); + ByteBuf byteBuf3 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadataWithoutKey, + Unpooled.copiedBuffer(message.getBytes(UTF_8))); + byte[] bytes3 = Commands.peekStickyKey(byteBuf3, "topic-3", "sub-3"); + String key3 = new String(bytes3); + Assert.assertEquals(fallbackPartitionedKey, key3); + ReferenceCountUtil.safeRelease(byteBuf3); } } diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index d0b25c6971697..519bed6cdb5ae 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -98,26 +98,20 @@ public void testMsgKey() throws Exception { thread.start(); - int count1 = 0; - int count2 = 0; - for (int i = 0; i < 10; i++) { - Message message = consumer1.receive(1, TimeUnit.SECONDS); - if (message == null) { - break; - } - count1++; - consumer1.acknowledge(message); - } - for (int i = 0; i < 10; i++) { - Message message = consumer2.receive(1, TimeUnit.SECONDS); - if (message == null) { - break; - } - count2++; - consumer2.acknowledge(message); - } - //in key_share mode, only one consumer can get msg - Assert.assertTrue(count1 == 0 || count2 == 0); + // in key_shared mode if no message key is set, both consumers should receive messages + Awaitility.await() + .untilAsserted(() -> { + Message message = consumer1.receive(1, TimeUnit.SECONDS); + assertNotNull(message); + consumer1.acknowledge(message); + }); + + Awaitility.await() + .untilAsserted(() -> { + Message message = consumer2.receive(1, TimeUnit.SECONDS); + assertNotNull(message); + consumer2.acknowledge(message); + }); consumer1.close(); consumer2.close(); @@ -149,19 +143,15 @@ public void testMsgKey() throws Exception { Awaitility.await() .untilAsserted(() -> { Message message = newConsumer1.receive(1, TimeUnit.SECONDS); - if (message != null) { - newConsumer1.acknowledge(message); - } assertNotNull(message); + newConsumer1.acknowledge(message); }); Awaitility.await() .untilAsserted(() -> { Message message = newConsumer2.receive(1, TimeUnit.SECONDS); - if (message != null) { - newConsumer2.acknowledge(message); - } assertNotNull(message); + newConsumer2.acknowledge(message); }); thread2.interrupt(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java index 0e7106ef65ea1..ddedacc531a7c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -150,11 +151,11 @@ protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, i } } } - // Make sure key will not be distributed to multiple consumers + // Make sure key will not be distributed to multiple consumers (except null key) Set allKeys = Sets.newHashSet(); - consumerKeys.forEach((k, v) -> v.forEach(key -> { + consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> { assertTrue(allKeys.add(key), - "Key "+ key + "is distributed to multiple consumers" ); + "Key " + key + " is distributed to multiple consumers" ); })); assertEquals(messagesReceived.size(), messagesToReceive); }