From 1ab604e6cbcb5d8eaf79c550ce5a85ef8ff7ea1b Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sun, 10 Sep 2023 20:31:51 -0500 Subject: [PATCH] Update Pulsar binder to use new Spring Pulsar starter. The PulsarBinder relies on the Spring Pulsar Spring Boot starter. The starter moved out of the spring-pulsar core repo and into Spring Boot proper. This commit updates the Pulsar binder to use the new coordinates for the Spring Boot based starter. * Additionally, the PulsarProperties were greatly reduced in the move to Spring Boot. As such, the binder exposes an extended set of config properties for producer/consumer (the initial set supported before the property reduction). --- .../spring-cloud-stream-binder-pulsar/pom.xml | 8 +- .../pulsar/PulsarBinderHeaderMapper.java | 1 - .../binder/pulsar/PulsarBinderUtils.java | 172 +++---- .../pulsar/PulsarMessageChannelBinder.java | 24 +- .../config/PulsarBinderConfiguration.java | 10 +- .../properties/ConsumerConfigProperties.java | 447 ++++++++++++++++++ .../properties/ProducerConfigProperties.java | 333 +++++++++++++ .../PulsarBinderConfigurationProperties.java | 2 - .../properties/PulsarConsumerProperties.java | 1 - .../PulsarExtendedBindingProperties.java | 1 - .../properties/PulsarProducerProperties.java | 1 - .../pulsar/ConsumerConfigPropertiesTests.java | 180 +++++++ .../pulsar/ProducerConfigPropertiesTests.java | 163 +++++++ ...sarBinderConfigurationPropertiesTests.java | 81 ++-- .../pulsar/PulsarBinderIntegrationTests.java | 198 +++++--- .../binder/pulsar/PulsarBinderTests.java | 7 +- .../binder/pulsar/PulsarBinderUtilsTests.java | 323 ++++++------- .../PulsarExtendedBindingPropertiesTests.java | 95 ++-- .../pulsar/PulsarTestContainerSupport.java | 19 +- 19 files changed, 1540 insertions(+), 526 deletions(-) create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ConsumerConfigProperties.java create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ProducerConfigProperties.java create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ConsumerConfigPropertiesTests.java create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ProducerConfigPropertiesTests.java diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/pom.xml b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/pom.xml index 5f473cca72..bfe1c43cd8 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/pom.xml +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/pom.xml @@ -5,7 +5,7 @@ spring-cloud-stream-binder-pulsar jar spring-cloud-stream-binder-pulsar - PUlsar binder implementation + Pulsar binder implementation org.springframework.cloud @@ -15,9 +15,9 @@ - org.springframework.pulsar - spring-pulsar-spring-boot-starter - 0.2.1-SNAPSHOT + org.springframework.boot + spring-boot-starter-pulsar + 3.2.0-SNAPSHOT org.springframework.cloud diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java index 32b1bc02e1..c209863dec 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java @@ -26,7 +26,6 @@ import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.pulsar.support.header.PulsarHeaderMapper; - /** * A delegating {@code PulsarHeaderMapper} that ensures the delegate mapper never includes * internal binder specific headers during outbound mapping. diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtils.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtils.java index 229372d7c2..96cf8102de 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtils.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtils.java @@ -16,20 +16,17 @@ package org.springframework.cloud.stream.binder.pulsar; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; -import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties; +import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties; import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.core.log.LogAccessor; -import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties; -import org.springframework.pulsar.autoconfigure.ProducerConfigProperties; import org.springframework.util.StringUtils; -import org.springframework.util.unit.DataSize; /** * Binder utility methods. @@ -55,12 +52,74 @@ private PulsarBinderUtils() { * consumer properties */ static String subscriptionName(PulsarConsumerProperties consumerProps, ConsumerDestination consumerDestination) { - if (StringUtils.hasText(consumerProps.getSubscriptionName())) { - return consumerProps.getSubscriptionName(); + if (StringUtils.hasText(consumerProps.getSubscription().getName())) { + return consumerProps.getSubscription().getName(); } return SUBSCRIPTION_NAME_FORMAT_STR.formatted(consumerDestination.getName(), UUID.randomUUID()); } + /** + * Merges base and extended producer properties defined at the binder and binding + * level. Only properties whose value has changed from the default are considered. If + * a property is defined at both the binder and binding level, the binding level + * property value is given precedence. + * @param binderProducerProps the binder level producer config properties (eg. + * 'spring.cloud.stream.pulsar.binder.producer.*') + * @param bindingProducerProps the binding level config properties (eg. + * 'spring.cloud.stream.pulsar.bindings.myBinding-out-0.producer.*') + * @return map of modified merged binder and binding producer properties + */ + static Map mergeModifiedProducerProperties(ProducerConfigProperties binderProducerProps, + ProducerConfigProperties bindingProducerProps) { + // Layer the base props for common -> binder -> bindings + var baseProducerProps = new ProducerConfigProperties().toBaseProducerPropertiesMap(); + var binderBaseProducerProps = binderProducerProps.toBaseProducerPropertiesMap(); + var bindingBaseProducerProps = bindingProducerProps.toBaseProducerPropertiesMap(); + var layeredBaseProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseProducerProps, + binderBaseProducerProps, bindingBaseProducerProps); + // Layer the extended props for binder -> bindings + var extProducerProps = new ProducerConfigProperties().toExtendedProducerPropertiesMap(); + var binderExtProducerProps = binderProducerProps.toExtendedProducerPropertiesMap(); + var bindingExtProducerProps = bindingProducerProps.toExtendedProducerPropertiesMap(); + var layeredExtProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(extProducerProps, + binderExtProducerProps, bindingExtProducerProps); + // Combine both base and extended layers + var layeredProducerProps = new HashMap<>(layeredBaseProducerProps); + layeredProducerProps.putAll(layeredExtProducerProps); + return layeredProducerProps; + } + + /** + * Merges base and extended consumer properties defined at the binder and binding + * level. Only properties whose value has changed from the default are considered. If + * a property is defined at both the binder and binding level, the binding level + * property value is given precedence. + * @param binderConsumerProps the binder level consumer config properties (eg. + * 'spring.cloud.stream.pulsar.binder.consumer.*') + * @param bindingConsumerProps the binding level config properties (eg. + * 'spring.cloud.stream.pulsar.bindings.myBinding-in-0.consumer.*') + * @return map of modified merged binder and binding consumer properties + */ + static Map mergeModifiedConsumerProperties(ConsumerConfigProperties binderConsumerProps, + ConsumerConfigProperties bindingConsumerProps) { + // Layer the base props for common -> binder -> bindings + var baseConsumerProps = new ConsumerConfigProperties().toBaseConsumerPropertiesMap(); + var binderBaseConsumerProps = binderConsumerProps.toBaseConsumerPropertiesMap(); + var bindingBaseConsumerProps = bindingConsumerProps.toBaseConsumerPropertiesMap(); + var layeredBaseConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps, + binderBaseConsumerProps, bindingBaseConsumerProps); + // Layer the extended props for binder -> bindings + var extConsumerProps = new ConsumerConfigProperties().toExtendedConsumerPropertiesMap(); + var binderExtConsumerProps = binderConsumerProps.toExtendedConsumerPropertiesMap(); + var bindingExtConsumerProps = bindingConsumerProps.toExtendedConsumerPropertiesMap(); + var layeredExtConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(extConsumerProps, + binderExtConsumerProps, bindingExtConsumerProps); + // Combine both base and extended layers + var layeredConsumerProps = new HashMap<>(layeredBaseConsumerProps); + layeredConsumerProps.putAll(layeredExtConsumerProps); + return layeredConsumerProps; + } + /** * Merges properties defined at the binder and binding level (binding properties * override binder properties). @@ -105,103 +164,4 @@ private static Map extractNewOrModifiedProperties(Map convertProducerPropertiesToMap(ProducerConfigProperties producerProps) { - var properties = new PulsarBinderUtils.Properties(); - var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(producerProps::getTopicName).to(properties.in("topicName")); - map.from(producerProps::getProducerName).to(properties.in("producerName")); - map.from(producerProps::getSendTimeout).asInt(Duration::toMillis).to(properties.in("sendTimeoutMs")); - map.from(producerProps::getBlockIfQueueFull).to(properties.in("blockIfQueueFull")); - map.from(producerProps::getMaxPendingMessages).to(properties.in("maxPendingMessages")); - map.from(producerProps::getMaxPendingMessagesAcrossPartitions) - .to(properties.in("maxPendingMessagesAcrossPartitions")); - map.from(producerProps::getMessageRoutingMode).to(properties.in("messageRoutingMode")); - map.from(producerProps::getHashingScheme).to(properties.in("hashingScheme")); - map.from(producerProps::getCryptoFailureAction).to(properties.in("cryptoFailureAction")); - map.from(producerProps::getBatchingMaxPublishDelay).as(it -> it.toNanos() / 1000) - .to(properties.in("batchingMaxPublishDelayMicros")); - map.from(producerProps::getBatchingPartitionSwitchFrequencyByPublishDelay) - .to(properties.in("batchingPartitionSwitchFrequencyByPublishDelay")); - map.from(producerProps::getBatchingMaxMessages).to(properties.in("batchingMaxMessages")); - map.from(producerProps::getBatchingMaxBytes).asInt(DataSize::toBytes).to(properties.in("batchingMaxBytes")); - map.from(producerProps::getBatchingEnabled).to(properties.in("batchingEnabled")); - map.from(producerProps::getChunkingEnabled).to(properties.in("chunkingEnabled")); - map.from(producerProps::getEncryptionKeys).to(properties.in("encryptionKeys")); - map.from(producerProps::getCompressionType).to(properties.in("compressionType")); - map.from(producerProps::getInitialSequenceId).to(properties.in("initialSequenceId")); - map.from(producerProps::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions")); - map.from(producerProps::getAutoUpdatePartitionsInterval).as(Duration::toSeconds) - .to(properties.in("autoUpdatePartitionsIntervalSeconds")); - map.from(producerProps::getMultiSchema).to(properties.in("multiSchema")); - map.from(producerProps::getProducerAccessMode).to(properties.in("accessMode")); - map.from(producerProps::getLazyStartPartitionedProducers).to(properties.in("lazyStartPartitionedProducers")); - map.from(producerProps::getProperties).to(properties.in("properties")); - return properties; - } - - /** - * Gets a map representation of a {@link ConsumerConfigProperties}. - * @param consumerProps the consumer props - * @return map representation of consumer props where each entry is a field and its - * associated value - */ - static Map convertConsumerPropertiesToMap(ConsumerConfigProperties consumerProps) { - var properties = new PulsarBinderUtils.Properties(); - var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(consumerProps::getTopics).to(properties.in("topicNames")); - map.from(consumerProps::getTopicsPattern).to(properties.in("topicsPattern")); - map.from(consumerProps::getSubscriptionName).to(properties.in("subscriptionName")); - map.from(consumerProps::getSubscriptionType).to(properties.in("subscriptionType")); - map.from(consumerProps::getSubscriptionProperties).to(properties.in("subscriptionProperties")); - map.from(consumerProps::getSubscriptionMode).to(properties.in("subscriptionMode")); - map.from(consumerProps::getReceiverQueueSize).to(properties.in("receiverQueueSize")); - map.from(consumerProps::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000) - .to(properties.in("acknowledgementsGroupTimeMicros")); - map.from(consumerProps::getNegativeAckRedeliveryDelay).as(it -> it.toNanos() / 1000) - .to(properties.in("negativeAckRedeliveryDelayMicros")); - map.from(consumerProps::getMaxTotalReceiverQueueSizeAcrossPartitions) - .to(properties.in("maxTotalReceiverQueueSizeAcrossPartitions")); - map.from(consumerProps::getConsumerName).to(properties.in("consumerName")); - map.from(consumerProps::getAckTimeout).as(Duration::toMillis).to(properties.in("ackTimeoutMillis")); - map.from(consumerProps::getTickDuration).as(Duration::toMillis).to(properties.in("tickDurationMillis")); - map.from(consumerProps::getPriorityLevel).to(properties.in("priorityLevel")); - map.from(consumerProps::getCryptoFailureAction).to(properties.in("cryptoFailureAction")); - map.from(consumerProps::getProperties).to(properties.in("properties")); - map.from(consumerProps::getReadCompacted).to(properties.in("readCompacted")); - map.from(consumerProps::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition")); - map.from(consumerProps::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod")); - map.from(consumerProps::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode")); - map.from(consumerProps::getDeadLetterPolicy).to(properties.in("deadLetterPolicy")); - map.from(consumerProps::getRetryEnable).to(properties.in("retryEnable")); - map.from(consumerProps::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions")); - map.from(consumerProps::getAutoUpdatePartitionsInterval).as(Duration::toSeconds) - .to(properties.in("autoUpdatePartitionsIntervalSeconds")); - map.from(consumerProps::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState")); - map.from(consumerProps::getResetIncludeHead).to(properties.in("resetIncludeHead")); - map.from(consumerProps::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled")); - map.from(consumerProps::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled")); - map.from(consumerProps::getPoolMessages).to(properties.in("poolMessages")); - map.from(consumerProps::getStartPaused).to(properties.in("startPaused")); - map.from(consumerProps::getAutoAckOldestChunkedMessageOnQueueFull) - .to(properties.in("autoAckOldestChunkedMessageOnQueueFull")); - map.from(consumerProps::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage")); - map.from(consumerProps::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis) - .to(properties.in("expireTimeOfIncompleteChunkedMessageMillis")); - return properties; - } - - static class Properties extends HashMap { - - java.util.function.Consumer in(String key) { - return (value) -> put(key, value); - } - - } - } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java index d6eafe2ebd..342b443f56 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java @@ -47,8 +47,6 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties; -import org.springframework.pulsar.autoconfigure.ProducerConfigProperties; import org.springframework.pulsar.core.ProducerBuilderConfigurationUtil; import org.springframework.pulsar.core.ProducerBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; @@ -61,7 +59,6 @@ import org.springframework.pulsar.listener.PulsarRecordMessageListener; import org.springframework.pulsar.support.header.PulsarHeaderMapper; - /** * {@link Binder} implementation for Apache Pulsar. * @@ -112,15 +109,10 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin schema = null; } - var baseProducerProps = PulsarBinderUtils.convertProducerPropertiesToMap(new ProducerConfigProperties()); - var binderProducerProps = PulsarBinderUtils - .convertProducerPropertiesToMap(this.binderConfigProps.getProducer()); - var bindingProducerProps = PulsarBinderUtils.convertProducerPropertiesToMap(producerProperties.getExtension()); - var mergedProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseProducerProps, - binderProducerProps, bindingProducerProps); - + var layeredProducerProps = PulsarBinderUtils.mergeModifiedProducerProperties( + this.binderConfigProps.getProducer(), producerProperties.getExtension()); var handler = new PulsarProducerConfigurationMessageHandler(this.pulsarTemplate, schema, destination.getName(), - (builder) -> ProducerBuilderConfigurationUtil.loadConf(builder, mergedProducerProps), + (builder) -> ProducerBuilderConfigurationUtil.loadConf(builder, layeredProducerProps), determineOutboundHeaderMapper(producerProperties)); handler.setApplicationContext(getApplicationContext()); handler.setBeanFactory(getBeanFactory()); @@ -167,13 +159,9 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination); containerProperties.setSubscriptionName(subscriptionName); - var baseConsumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(new ConsumerConfigProperties()); - var binderConsumerProps = PulsarBinderUtils - .convertConsumerPropertiesToMap(this.binderConfigProps.getConsumer()); - var bindingConsumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(properties.getExtension()); - var mergedConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps, - binderConsumerProps, bindingConsumerProps); - containerProperties.getPulsarConsumerProperties().putAll(mergedConsumerProps); + var layeredConsumerProps = PulsarBinderUtils + .mergeModifiedConsumerProperties(this.binderConfigProps.getConsumer(), properties.getExtension()); + containerProperties.getPulsarConsumerProperties().putAll(layeredConsumerProps); containerProperties.updateContainerProperties(); var container = new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties); diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java index 6e77e06a6e..be135e29ae 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.binder.pulsar.config; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.pulsar.PulsarMessageChannelBinder; @@ -25,7 +26,6 @@ import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.pulsar.autoconfigure.PulsarProperties; import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarTemplate; @@ -48,7 +48,7 @@ public class PulsarBinderConfiguration { @Bean public PulsarTopicProvisioner pulsarTopicProvisioner(PulsarAdministration pulsarAdministration, - PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) { + PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) { return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties); } @@ -63,9 +63,9 @@ public PulsarHeaderMapper pulsarHeaderMapper() { @Bean public PulsarMessageChannelBinder pulsarMessageChannelBinder(PulsarTopicProvisioner pulsarTopicProvisioner, - PulsarTemplate pulsarTemplate, PulsarConsumerFactory pulsarConsumerFactory, - PulsarBinderConfigurationProperties binderConfigProps, PulsarExtendedBindingProperties bindingConfigProps, - SchemaResolver schemaResolver, PulsarHeaderMapper headerMapper) { + PulsarTemplate pulsarTemplate, PulsarConsumerFactory pulsarConsumerFactory, + PulsarBinderConfigurationProperties binderConfigProps, PulsarExtendedBindingProperties bindingConfigProps, + SchemaResolver schemaResolver, PulsarHeaderMapper headerMapper) { PulsarMessageChannelBinder pulsarMessageChannelBinder = new PulsarMessageChannelBinder(pulsarTopicProvisioner, pulsarTemplate, pulsarConsumerFactory, binderConfigProps, schemaResolver, headerMapper); pulsarMessageChannelBinder.setExtendedBindingProperties(bindingConfigProps); diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ConsumerConfigProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ConsumerConfigProperties.java new file mode 100644 index 0000000000..7ba2db0c7a --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ConsumerConfigProperties.java @@ -0,0 +1,447 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar.properties; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.MessageId; + +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.util.Assert; + +/** + * Configuration properties used to specify Pulsar consumers. + * + * @author Chris Bono + */ +public class ConsumerConfigProperties extends PulsarProperties.Consumer { + + private final Acknowledgement ack = new Acknowledgement(); + + private final Chunking chunk = new Chunking(); + + private final Subscription subscription = new Subscription(); + + /** + * Number of messages that can be accumulated before the consumer calls "receive". + */ + private Integer receiverQueueSize = 1000; + + /** + * Maximum number of messages that a consumer can be pushed at once from a broker + * across all partitions. + */ + private Integer maxTotalReceiverQueueSizeAcrossPartitions = 50000; + + /** + * Action the consumer will take in case of decryption failure. + */ + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + + /** + * Map of properties to add to the consumer. + */ + private SortedMap properties = new TreeMap<>(); + + /** + * Auto-discovery period for topics when topic pattern is used in minutes. + */ + private Integer patternAutoDiscoveryPeriod = 1; + + /** + * Whether the consumer auto-subscribes for partition increase. This is only for + * partitioned consumers. + */ + private Boolean autoUpdatePartitions = true; + + /** + * Interval of partitions discovery updates. + */ + private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1); + + /** + * Whether to include the given position of any reset operation like + * {@link org.apache.pulsar.client.api.Consumer#seek(long) or + * {@link ConsumerConfigProperties#seek(MessageId)}}. + */ + private Boolean resetIncludeHead = false; + + /** + * Whether pooling of messages and the underlying data buffers is enabled. + */ + private Boolean poolMessages = false; + + /** + * Whether to start the consumer in a paused state. + */ + private Boolean startPaused = false; + + public Acknowledgement getAck() { + return this.ack; + } + + public Chunking getChunk() { + return this.chunk; + } + + public Subscription getSubscription() { + return this.subscription; + } + + public Integer getReceiverQueueSize() { + return this.receiverQueueSize; + } + + public void setReceiverQueueSize(Integer receiverQueueSize) { + this.receiverQueueSize = receiverQueueSize; + } + + public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() { + return this.maxTotalReceiverQueueSizeAcrossPartitions; + } + + public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer maxTotalReceiverQueueSizeAcrossPartitions) { + this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions; + } + + public ConsumerCryptoFailureAction getCryptoFailureAction() { + return this.cryptoFailureAction; + } + + public void setCryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) { + this.cryptoFailureAction = cryptoFailureAction; + } + + public SortedMap getProperties() { + return this.properties; + } + + public void setProperties(SortedMap properties) { + this.properties = properties; + } + + public Integer getPatternAutoDiscoveryPeriod() { + return this.patternAutoDiscoveryPeriod; + } + + public void setPatternAutoDiscoveryPeriod(Integer patternAutoDiscoveryPeriod) { + this.patternAutoDiscoveryPeriod = patternAutoDiscoveryPeriod; + } + + public Boolean getAutoUpdatePartitions() { + return this.autoUpdatePartitions; + } + + public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) { + this.autoUpdatePartitions = autoUpdatePartitions; + } + + public Duration getAutoUpdatePartitionsInterval() { + return this.autoUpdatePartitionsInterval; + } + + public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) { + this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval; + } + + public Boolean getResetIncludeHead() { + return this.resetIncludeHead; + } + + public void setResetIncludeHead(Boolean resetIncludeHead) { + this.resetIncludeHead = resetIncludeHead; + } + + public Boolean getPoolMessages() { + return this.poolMessages; + } + + public void setPoolMessages(Boolean poolMessages) { + this.poolMessages = poolMessages; + } + + public Boolean getStartPaused() { + return this.startPaused; + } + + public void setStartPaused(Boolean startPaused) { + this.startPaused = startPaused; + } + + /** + * Gets a map representation of the base consumer properties (those defined in parent + * class). + * @return map of base consumer properties and associated values. + */ + public Map toBaseConsumerPropertiesMap() { + var consumerProps = new ConsumerConfigProperties.Properties(); + var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getDeadLetterPolicy).as(this::toPulsarDeadLetterPolicy).to(consumerProps.in("deadLetterPolicy")); + map.from(this::getName).to(consumerProps.in("consumerName")); + map.from(this::getPriorityLevel).to(consumerProps.in("priorityLevel")); + map.from(this::isReadCompacted).to(consumerProps.in("readCompacted")); + map.from(this::isRetryEnable).to(consumerProps.in("retryEnable")); + map.from(this::getTopics).to(consumerProps.in("topicNames")); + map.from(this::getTopicsPattern).to(consumerProps.in("topicsPattern")); + return consumerProps; + } + + /** + * Gets a map representation of the extended consumer properties (those defined in + * this class). + * @return map of extended consumer properties and associated values. + */ + public Map toExtendedConsumerPropertiesMap() { + var consumerProps = new ConsumerConfigProperties.Properties(); + var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getAutoUpdatePartitions).to(consumerProps.in("autoUpdatePartitions")); + map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds) + .to(consumerProps.in("autoUpdatePartitionsIntervalSeconds")); + map.from(this::getCryptoFailureAction).to(consumerProps.in("cryptoFailureAction")); + map.from(this::getMaxTotalReceiverQueueSizeAcrossPartitions) + .to(consumerProps.in("maxTotalReceiverQueueSizeAcrossPartitions")); + map.from(this::getPatternAutoDiscoveryPeriod).to(consumerProps.in("patternAutoDiscoveryPeriod")); + map.from(this::getPoolMessages).to(consumerProps.in("poolMessages")); + map.from(this::getProperties).to(consumerProps.in("properties")); + map.from(this::getReceiverQueueSize).to(consumerProps.in("receiverQueueSize")); + map.from(this::getResetIncludeHead).to(consumerProps.in("resetIncludeHead")); + map.from(this::getStartPaused).to(consumerProps.in("startPaused")); + // Acknowledgement properties + map.from(this::getAck).as(Acknowledgement::getGroupTime).as(it -> it.toNanos() / 1000) + .to(consumerProps.in("acknowledgementsGroupTimeMicros")); + map.from(this::getAck).as(Acknowledgement::getRedeliveryDelay).as(it -> it.toNanos() / 1000) + .to(consumerProps.in("negativeAckRedeliveryDelayMicros")); + map.from(this::getAck).as(Acknowledgement::getTimeout).as(Duration::toMillis) + .to(consumerProps.in("ackTimeoutMillis")); + map.from(this::getAck).as(Acknowledgement::getTimeoutTickDuration).as(Duration::toMillis) + .to(consumerProps.in("tickDurationMillis")); + map.from(this::getAck).as(Acknowledgement::getBatchIndexEnabled).to(consumerProps.in("batchIndexAckEnabled")); + map.from(this::getAck).as(Acknowledgement::getReceiptEnabled).to(consumerProps.in("ackReceiptEnabled")); + // Chunking properties + map.from(this::getChunk).as(Chunking::getExpireTimeIncomplete).as(Duration::toMillis) + .to(consumerProps.in("expireTimeOfIncompleteChunkedMessageMillis")); + map.from(this::getChunk).as(Chunking::getAutoAckOldestOnQueueFull) + .to(consumerProps.in("autoAckOldestChunkedMessageOnQueueFull")); + map.from(this::getChunk).as(Chunking::getMaxPendingMessages).to(consumerProps.in("maxPendingChunkedMessage")); + // Subscription properties + map.from(this::getSubscription).as(Subscription::getName).to(consumerProps.in("subscriptionName")); + map.from(this::getSubscription).as(Subscription::getType).to(consumerProps.in("subscriptionType")); + map.from(this::getSubscription).as(Subscription::getProperties).to(consumerProps.in("subscriptionProperties")); + map.from(this::getSubscription).as(Subscription::getMode).to(consumerProps.in("subscriptionMode")); + map.from(this::getSubscription).as(Subscription::getInitialPosition) + .to(consumerProps.in("subscriptionInitialPosition")); + map.from(this::getSubscription).as(Subscription::getTopicsMode).to(consumerProps.in("regexSubscriptionMode")); + map.from(this::getSubscription).as(Subscription::getReplicateState) + .to(consumerProps.in("replicateSubscriptionState")); + return consumerProps; + } + + /** + * Gets a map representation of base and extended consumer properties. + * @return map of base and extended consumer properties and associated values. + */ + public Map toAllConsumerPropertiesMap() { + var consumerProps = this.toBaseConsumerPropertiesMap(); + consumerProps.putAll(this.toExtendedConsumerPropertiesMap()); + return consumerProps; + } + + private org.apache.pulsar.client.api.DeadLetterPolicy toPulsarDeadLetterPolicy(DeadLetterPolicy policy) { + Assert.state(policy.getMaxRedeliverCount() > 0, + "Pulsar DeadLetterPolicy must have a positive 'max-redelivery-count' property value"); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder builder = org.apache.pulsar.client.api.DeadLetterPolicy + .builder(); + map.from(policy::getMaxRedeliverCount).to(builder::maxRedeliverCount); + map.from(policy::getRetryLetterTopic).to(builder::retryLetterTopic); + map.from(policy::getDeadLetterTopic).to(builder::deadLetterTopic); + map.from(policy::getInitialSubscriptionName).to(builder::initialSubscriptionName); + return builder.build(); + } + + public static class Acknowledgement { + + /** + * Whether the batching index acknowledgment is enabled. + */ + private Boolean batchIndexEnabled = false; + + /** + * Time to group acknowledgements before sending them to the broker. + */ + private Duration groupTime = Duration.ofMillis(100); + + /** + * Whether an acknowledgement receipt is enabled. + */ + private Boolean receiptEnabled = false; + + /** + * Delay before re-delivering messages that have failed to be processed. + */ + private Duration redeliveryDelay = Duration.ofMinutes(1); + + /** + * Timeout for unacked messages to be redelivered. + */ + private Duration timeout = Duration.ZERO; + + /** + * Precision for the ack timeout messages tracker. + */ + private Duration timeoutTickDuration = Duration.ofSeconds(1); + + public Boolean getBatchIndexEnabled() { + return this.batchIndexEnabled; + } + + public void setBatchIndexEnabled(Boolean batchIndexEnabled) { + this.batchIndexEnabled = batchIndexEnabled; + } + + public Duration getGroupTime() { + return this.groupTime; + } + + public void setGroupTime(Duration groupTime) { + this.groupTime = groupTime; + } + + public Boolean getReceiptEnabled() { + return this.receiptEnabled; + } + + public void setReceiptEnabled(Boolean receiptEnabled) { + this.receiptEnabled = receiptEnabled; + } + + public Duration getRedeliveryDelay() { + return this.redeliveryDelay; + } + + public void setRedeliveryDelay(Duration redeliveryDelay) { + this.redeliveryDelay = redeliveryDelay; + } + + public Duration getTimeout() { + return this.timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public Duration getTimeoutTickDuration() { + return this.timeoutTickDuration; + } + + public void setTimeoutTickDuration(Duration timeoutTickDuration) { + this.timeoutTickDuration = timeoutTickDuration; + } + + } + + public static class Chunking { + + /** + * Whether to automatically drop outstanding uncompleted chunked messages once the + * consumer queue reaches the threshold set by the 'maxPendingMessages' property. + */ + private Boolean autoAckOldestOnQueueFull = true; + + /** + * The maximum time period for a consumer to receive all chunks of a message - if + * this threshold is exceeded the consumer will expire the incomplete chunks. + */ + private Duration expireTimeIncomplete = Duration.ofMinutes(1); + + /** + * Maximum number of chunked messages to be kept in memory. + */ + private Integer maxPendingMessages = 10; + + public Boolean getAutoAckOldestOnQueueFull() { + return this.autoAckOldestOnQueueFull; + } + + public void setAutoAckOldestOnQueueFull(Boolean autoAckOldestOnQueueFull) { + this.autoAckOldestOnQueueFull = autoAckOldestOnQueueFull; + } + + public Duration getExpireTimeIncomplete() { + return this.expireTimeIncomplete; + } + + public void setExpireTimeIncomplete(Duration expireTimeIncomplete) { + this.expireTimeIncomplete = expireTimeIncomplete; + } + + public Integer getMaxPendingMessages() { + return this.maxPendingMessages; + } + + public void setMaxPendingMessages(Integer maxPendingMessages) { + this.maxPendingMessages = maxPendingMessages; + } + + } + + public static class Subscription extends PulsarProperties.Consumer.Subscription { + + /** + * Map of properties to add to the subscription. + */ + private Map properties = new HashMap<>(); + + /** + * Whether to replicate subscription state. + */ + private Boolean replicateState = false; + + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public Boolean getReplicateState() { + return this.replicateState; + } + + public void setReplicateState(Boolean replicateState) { + this.replicateState = replicateState; + } + + } + + static class Properties extends HashMap { + + java.util.function.Consumer in(String key) { + return (value) -> put(key, value); + } + + } + +} diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ProducerConfigProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ProducerConfigProperties.java new file mode 100644 index 0000000000..64188f08ac --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/ProducerConfigProperties.java @@ -0,0 +1,333 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar.properties; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.pulsar.client.api.ProducerCryptoFailureAction; + +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.lang.Nullable; +import org.springframework.util.unit.DataSize; + +/** + * Configuration properties used to specify Pulsar producers. + * + * @author Chris Bono + */ +public class ProducerConfigProperties extends PulsarProperties.Producer { + + /** + * Whether the "send" and "sendAsync" methods should block if the outgoing message + * queue is full. + */ + private Boolean blockIfQueueFull = false; + + /** + * Maximum number of pending messages for the producer. + */ + private Integer maxPendingMessages = 1000; + + /** + * Maximum number of pending messages across all the partitions. + */ + private Integer maxPendingMessagesAcrossPartitions = 50000; + + /** + * Action the producer will take in case of encryption failure. + */ + private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL; + + /** + * Names of the public encryption keys to use when encrypting data. + */ + private Set encryptionKeys = new HashSet<>(); + + /** + * Baseline for the sequence ids for messages published by the producer. + */ + @Nullable + private Long initialSequenceId; + + /** + * Whether partitioned producer automatically discover new partitions at runtime. + */ + private Boolean autoUpdatePartitions = true; + + /** + * Interval of partitions discovery updates. + */ + private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1); + + /** + * Whether the multiple schema mode is enabled. + */ + private Boolean multiSchema = true; + + /** + * Whether producers in Shared mode register and connect immediately to the owner + * broker of each partition or start lazily on demand. + */ + private Boolean lazyStartPartitionedProducers = false; + + private final Batching batch = new Batching(); + + public Batching getBatch() { + return this.batch; + } + + /** + * Map of properties to add to the producer. + */ + private Map properties = new HashMap<>(); + + public Boolean getBlockIfQueueFull() { + return this.blockIfQueueFull; + } + + public void setBlockIfQueueFull(Boolean blockIfQueueFull) { + this.blockIfQueueFull = blockIfQueueFull; + } + + public Integer getMaxPendingMessages() { + return this.maxPendingMessages; + } + + public void setMaxPendingMessages(Integer maxPendingMessages) { + this.maxPendingMessages = maxPendingMessages; + } + + public Integer getMaxPendingMessagesAcrossPartitions() { + return this.maxPendingMessagesAcrossPartitions; + } + + public void setMaxPendingMessagesAcrossPartitions(Integer maxPendingMessagesAcrossPartitions) { + this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions; + } + + public ProducerCryptoFailureAction getCryptoFailureAction() { + return this.cryptoFailureAction; + } + + public void setCryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) { + this.cryptoFailureAction = cryptoFailureAction; + } + + public Set getEncryptionKeys() { + return this.encryptionKeys; + } + + public void setEncryptionKeys(Set encryptionKeys) { + this.encryptionKeys = encryptionKeys; + } + + @Nullable + public Long getInitialSequenceId() { + return this.initialSequenceId; + } + + public void setInitialSequenceId(@Nullable Long initialSequenceId) { + this.initialSequenceId = initialSequenceId; + } + + public Boolean getAutoUpdatePartitions() { + return this.autoUpdatePartitions; + } + + public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) { + this.autoUpdatePartitions = autoUpdatePartitions; + } + + public Duration getAutoUpdatePartitionsInterval() { + return this.autoUpdatePartitionsInterval; + } + + public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) { + this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval; + } + + public Boolean getMultiSchema() { + return this.multiSchema; + } + + public void setMultiSchema(Boolean multiSchema) { + this.multiSchema = multiSchema; + } + + public Boolean getLazyStartPartitionedProducers() { + return this.lazyStartPartitionedProducers; + } + + public void setLazyStartPartitionedProducers(Boolean lazyStartPartitionedProducers) { + this.lazyStartPartitionedProducers = lazyStartPartitionedProducers; + } + + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + /** + * Gets a map representation of the base producer properties (those defined in parent + * class). + * @return map of base producer properties and associated values. + */ + public Map toBaseProducerPropertiesMap() { + var producerProps = new ProducerConfigProperties.Properties(); + var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getAccessMode).to(producerProps.in("accessMode")); + map.from(this::isBatchingEnabled).to(producerProps.in("batchingEnabled")); + map.from(this::isChunkingEnabled).to(producerProps.in("chunkingEnabled")); + map.from(this::getCompressionType).to(producerProps.in("compressionType")); + map.from(this::getHashingScheme).to(producerProps.in("hashingScheme")); + map.from(this::getMessageRoutingMode).to(producerProps.in("messageRoutingMode")); + map.from(this::getName).to(producerProps.in("producerName")); + map.from(this::getSendTimeout).asInt(Duration::toMillis).to(producerProps.in("sendTimeoutMs")); + map.from(this::getTopicName).to(producerProps.in("topicName")); + return producerProps; + } + + /** + * Gets a map representation of the extended producer properties (those defined in + * this class). + * @return map of extended producer properties and associated values. + */ + public Map toExtendedProducerPropertiesMap() { + var producerProps = new ProducerConfigProperties.Properties(); + var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getAutoUpdatePartitions).to(producerProps.in("autoUpdatePartitions")); + map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds) + .to(producerProps.in("autoUpdatePartitionsIntervalSeconds")); + map.from(this::getBlockIfQueueFull).to(producerProps.in("blockIfQueueFull")); + map.from(this::getCryptoFailureAction).to(producerProps.in("cryptoFailureAction")); + map.from(this::getEncryptionKeys).to(producerProps.in("encryptionKeys")); + map.from(this::getInitialSequenceId).to(producerProps.in("initialSequenceId")); + map.from(this::getLazyStartPartitionedProducers).to(producerProps.in("lazyStartPartitionedProducers")); + map.from(this::getMaxPendingMessages).to(producerProps.in("maxPendingMessages")); + map.from(this::getMaxPendingMessagesAcrossPartitions) + .to(producerProps.in("maxPendingMessagesAcrossPartitions")); + map.from(this::getMultiSchema).to(producerProps.in("multiSchema")); + map.from(this::getProperties).to(producerProps.in("properties")); + if (this.isBatchingEnabled()) { + map.from(this::getBatch).as(Batching::getMaxPublishDelay).as(it -> it.toNanos() / 1000) + .to(producerProps.in("batchingMaxPublishDelayMicros")); + map.from(this::getBatch).as(Batching::getPartitionSwitchFrequencyByPublishDelay) + .to(producerProps.in("batchingPartitionSwitchFrequencyByPublishDelay")); + map.from(this::getBatch).as(Batching::getMaxMessages).to(producerProps.in("batchingMaxMessages")); + map.from(this::getBatch).as(Batching::getMaxBytes).asInt(DataSize::toBytes) + .to(producerProps.in("batchingMaxBytes")); + } + return producerProps; + } + + /** + * Gets a map representation of base and extended producer properties. + * @return map of base and extended producer properties and associated values. + */ + public Map toAllProducerPropertiesMap() { + var producerProps = this.toBaseProducerPropertiesMap(); + producerProps.putAll(this.toExtendedProducerPropertiesMap()); + return producerProps; + } + + public static class Batching { + + /** + * Time period within which the messages sent will be batched. + */ + private Duration maxPublishDelay = Duration.ofMillis(1); + + /** + * Partition switch frequency while batching of messages is enabled and using + * round-robin routing mode for non-keyed message. + */ + private Integer partitionSwitchFrequencyByPublishDelay = 10; + + /** + * Maximum number of messages to be batched. + */ + private Integer maxMessages = 1000; + + /** + * Maximum number of bytes permitted in a batch. + */ + private DataSize maxBytes = DataSize.ofKilobytes(128); + + /** + * Whether to automatically batch messages. + */ + private Boolean enabled = true; + + public Duration getMaxPublishDelay() { + return this.maxPublishDelay; + } + + public void setMaxPublishDelay(Duration maxPublishDelay) { + this.maxPublishDelay = maxPublishDelay; + } + + public Integer getPartitionSwitchFrequencyByPublishDelay() { + return this.partitionSwitchFrequencyByPublishDelay; + } + + public void setPartitionSwitchFrequencyByPublishDelay(Integer partitionSwitchFrequencyByPublishDelay) { + this.partitionSwitchFrequencyByPublishDelay = partitionSwitchFrequencyByPublishDelay; + } + + public Integer getMaxMessages() { + return this.maxMessages; + } + + public void setMaxMessages(Integer maxMessages) { + this.maxMessages = maxMessages; + } + + public DataSize getMaxBytes() { + return this.maxBytes; + } + + public void setMaxBytes(DataSize maxBytes) { + this.maxBytes = maxBytes; + } + + public Boolean getEnabled() { + return this.enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + } + + static class Properties extends HashMap { + + java.util.function.Consumer in(String key) { + return (value) -> put(key, value); + } + + } + +} diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarBinderConfigurationProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarBinderConfigurationProperties.java index 7722b7c6e4..c1cb7dbc06 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarBinderConfigurationProperties.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarBinderConfigurationProperties.java @@ -19,8 +19,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.lang.Nullable; -import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties; -import org.springframework.pulsar.autoconfigure.ProducerConfigProperties; /** * {@link ConfigurationProperties @ConfigurationProperties} for the Pulsar binder. diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarConsumerProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarConsumerProperties.java index ea4684aac3..d3c66e7ef9 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarConsumerProperties.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarConsumerProperties.java @@ -19,7 +19,6 @@ import org.apache.pulsar.common.schema.SchemaType; import org.springframework.lang.Nullable; -import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties; /** * Pulsar consumer properties used by the binder. diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarExtendedBindingProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarExtendedBindingProperties.java index 4e487a3c0c..38e3a6d2f0 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarExtendedBindingProperties.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarExtendedBindingProperties.java @@ -22,7 +22,6 @@ import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; - /** * {@link ConfigurationProperties @ConfigurationProperties} for Pulsar binder specific * extensions to the common binding properties. diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarProducerProperties.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarProducerProperties.java index db73378e23..b8e2a2cf16 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarProducerProperties.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/properties/PulsarProducerProperties.java @@ -19,7 +19,6 @@ import org.apache.pulsar.common.schema.SchemaType; import org.springframework.lang.Nullable; -import org.springframework.pulsar.autoconfigure.ProducerConfigProperties; /** * Pulsar producer properties used by the binder. diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ConsumerConfigPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ConsumerConfigPropertiesTests.java new file mode 100644 index 0000000000..35f1c458f6 --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ConsumerConfigPropertiesTests.java @@ -0,0 +1,180 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; +import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; + +/** + * Unit tests for {@link ConsumerConfigProperties}. + * + * @author Soby Chacko + * @author Chris Bono + */ +public class ConsumerConfigPropertiesTests { + + @Test + void basePropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps); + var outputProps = consumerConfigProps.toBaseConsumerPropertiesMap(); + verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps); + verifyBasePropsInOutputMap(outputProps); + } + + private Map basePropsInputMap() { + Map inputProps = new HashMap<>(); + inputProps.put("spring.pulsar.consumer.dead-letter-policy.max-redeliver-count", "4"); + inputProps.put("spring.pulsar.consumer.dead-letter-policy.retry-letter-topic", "my-retry-topic"); + inputProps.put("spring.pulsar.consumer.dead-letter-policy.dead-letter-topic", "my-dlt-topic"); + inputProps.put("spring.pulsar.consumer.dead-letter-policy.initial-subscription-name", + "my-initial-subscription"); + inputProps.put("spring.pulsar.consumer.name", "my-consumer"); + inputProps.put("spring.pulsar.consumer.priority-level", "8"); + inputProps.put("spring.pulsar.consumer.read-compacted", "true"); + inputProps.put("spring.pulsar.consumer.retry-enable", "true"); + inputProps.put("spring.pulsar.consumer.topics[0]", "my-topic"); + inputProps.put("spring.pulsar.consumer.topics-pattern", "my-pattern"); + return inputProps; + } + + private void verifyBasePropsInOutputMap(Map outputProps) { + assertThat(outputProps).hasEntrySatisfying("deadLetterPolicy", dlp -> { + DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) dlp; + assertThat(deadLetterPolicy.getMaxRedeliverCount()).isEqualTo(4); + assertThat(deadLetterPolicy.getRetryLetterTopic()).isEqualTo("my-retry-topic"); + assertThat(deadLetterPolicy.getDeadLetterTopic()).isEqualTo("my-dlt-topic"); + assertThat(deadLetterPolicy.getInitialSubscriptionName()).isEqualTo("my-initial-subscription"); + }).containsEntry("consumerName", "my-consumer").containsEntry("priorityLevel", 8) + .containsEntry("readCompacted", true).containsEntry("retryEnable", true) + .hasEntrySatisfying("topicNames", + topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class)) + .containsExactly("my-topic")) + .hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern")); + } + + @Test + void extendedPropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + inputProps.putAll(extendedPropsInputMap()); + var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps); + var outputProps = consumerConfigProps.toExtendedConsumerPropertiesMap(); + verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps); + verifyExtendedPropsInOutputMap(outputProps); + } + + @Test + void allPropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + inputProps.putAll(extendedPropsInputMap()); + var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps); + var outputProps = consumerConfigProps.toAllConsumerPropertiesMap(); + verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps); + verifyBasePropsInOutputMap(outputProps); + verifyExtendedPropsInOutputMap(outputProps); + } + + private Map extendedPropsInputMap() { + Map inputProps = new HashMap<>(); + inputProps.put("spring.pulsar.consumer.auto-update-partitions", "true"); + inputProps.put("spring.pulsar.consumer.auto-update-partitions-interval", "10s"); + inputProps.put("spring.pulsar.consumer.crypto-failure-action", "discard"); + inputProps.put("spring.pulsar.consumer.max-total-receiver-queue-size-across-partitions", "5"); + inputProps.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9"); + inputProps.put("spring.pulsar.consumer.pool-messages", "true"); + inputProps.put("spring.pulsar.consumer.properties[my-prop]", "my-prop-value"); + inputProps.put("spring.pulsar.consumer.receiver-queue-size", "1"); + inputProps.put("spring.pulsar.consumer.reset-include-head", "true"); + inputProps.put("spring.pulsar.consumer.start-paused", "true"); + inputProps.put("spring.pulsar.consumer.ack.group-time", "2s"); + inputProps.put("spring.pulsar.consumer.ack.redelivery-delay", "3s"); + inputProps.put("spring.pulsar.consumer.ack.timeout", "6s"); + inputProps.put("spring.pulsar.consumer.ack.timeout-tick-duration", "7s"); + inputProps.put("spring.pulsar.consumer.ack.batch-index-enabled", "true"); + inputProps.put("spring.pulsar.consumer.ack.receipt-enabled", "true"); + inputProps.put("spring.pulsar.consumer.chunk.expire-time-incomplete", "12s"); + inputProps.put("spring.pulsar.consumer.chunk.auto-ack-oldest-on-queue-full", "false"); + inputProps.put("spring.pulsar.consumer.chunk.max-pending-messages", "11"); + inputProps.put("spring.pulsar.consumer.subscription.name", "my-subscription"); + inputProps.put("spring.pulsar.consumer.subscription.type", "shared"); + inputProps.put("spring.pulsar.consumer.subscription.properties[my-sub-prop]", "my-sub-prop-value"); + inputProps.put("spring.pulsar.consumer.subscription.mode", "nondurable"); + inputProps.put("spring.pulsar.consumer.subscription.initial-position", "earliest"); + inputProps.put("spring.pulsar.consumer.subscription.topics-mode", "all-topics"); + inputProps.put("spring.pulsar.consumer.subscription.replicate-state", "true"); + return inputProps; + } + + private void verifyExtendedPropsInOutputMap(Map outputProps) { + assertThat(outputProps).containsEntry("autoUpdatePartitions", true) + .containsEntry("autoUpdatePartitionsIntervalSeconds", 10L) + .containsEntry("cryptoFailureAction", ConsumerCryptoFailureAction.DISCARD) + .containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5) + .containsEntry("patternAutoDiscoveryPeriod", 9).containsEntry("poolMessages", true) + .hasEntrySatisfying("properties", + properties -> assertThat(properties) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) + .containsEntry("my-prop", "my-prop-value")) + .containsEntry("receiverQueueSize", 1).containsEntry("resetIncludeHead", true) + .containsEntry("startPaused", true).containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L) + .containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L).containsEntry("ackTimeoutMillis", 6_000L) + .containsEntry("tickDurationMillis", 7_000L).containsEntry("batchIndexAckEnabled", true) + .containsEntry("ackReceiptEnabled", true) + .containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L) + .containsEntry("autoAckOldestChunkedMessageOnQueueFull", false) + .containsEntry("maxPendingChunkedMessage", 11).containsEntry("subscriptionName", "my-subscription") + .containsEntry("subscriptionType", SubscriptionType.Shared) + .hasEntrySatisfying("subscriptionProperties", + properties -> assertThat(properties) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) + .containsEntry("my-sub-prop", "my-sub-prop-value")) + .containsEntry("subscriptionMode", SubscriptionMode.NonDurable) + .containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest) + .containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics) + .containsEntry("replicateSubscriptionState", true); + } + + private void verifyOutputPropsCanBeLoadedInConsumerBuilder(Map outputProps) { + assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(outputProps, + new ConsumerConfigurationData<>(), ConsumerConfigurationData.class)); + } + + private ConsumerConfigProperties bindInputPropsToConsumerConfigProps(Map inputProps) { + return new Binder(new MapConfigurationPropertySource(inputProps)) + .bind("spring.pulsar.consumer", Bindable.ofInstance(new ConsumerConfigProperties())).get(); + } + +} diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ProducerConfigPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ProducerConfigPropertiesTests.java new file mode 100644 index 0000000000..87b9623576 --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/ProducerConfigPropertiesTests.java @@ -0,0 +1,163 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; +import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; + +/** + * Unit tests for {@link ProducerConfigProperties}. + * + * @author Soby Chacko + * @author Chris Bono + */ +class ProducerConfigPropertiesTests { + + @Test + void basePropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps); + var outputProps = producerConfigProps.toBaseProducerPropertiesMap(); + verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps); + verifyBasePropsInOutputMap(outputProps); + } + + private Map basePropsInputMap() { + Map inputProps = new HashMap<>(); + inputProps.put("spring.pulsar.producer.access-mode", "Exclusive"); + inputProps.put("spring.pulsar.producer.batching-enabled", "true"); + inputProps.put("spring.pulsar.producer.chunking-enabled", "true"); + inputProps.put("spring.pulsar.producer.compression-type", "lz4"); + inputProps.put("spring.pulsar.producer.hashing-scheme", "murmur3_32hash"); + inputProps.put("spring.pulsar.producer.message-routing-mode", "custompartition"); + inputProps.put("spring.pulsar.producer.name", "my-producer"); + inputProps.put("spring.pulsar.producer.send-timeout", "2s"); + inputProps.put("spring.pulsar.producer.topic-name", "my-topic"); + return inputProps; + } + + private void verifyBasePropsInOutputMap(Map outputProps) { + assertThat(outputProps).containsEntry("accessMode", ProducerAccessMode.Exclusive) + .containsEntry("batchingEnabled", true).containsEntry("chunkingEnabled", true) + .containsEntry("compressionType", CompressionType.LZ4) + .containsEntry("hashingScheme", HashingScheme.Murmur3_32Hash) + .containsEntry("messageRoutingMode", MessageRoutingMode.CustomPartition) + .containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000) + .containsEntry("topicName", "my-topic"); + } + + @Test + void extendedPropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + inputProps.putAll(extendedPropsInputMap()); + var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps); + var outputProps = producerConfigProps.toExtendedProducerPropertiesMap(); + verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps); + verifyExtendedPropsInOutputMap(outputProps); + } + + @Test + void allPropsCanBeExtractedToMap() { + var inputProps = basePropsInputMap(); + inputProps.putAll(extendedPropsInputMap()); + var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps); + var outputProps = producerConfigProps.toAllProducerPropertiesMap(); + verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps); + verifyBasePropsInOutputMap(outputProps); + verifyExtendedPropsInOutputMap(outputProps); + } + + @Test + void batchPropsSkippedWhenBatchDisabled() { + var inputProps = basePropsInputMap(); + inputProps.putAll(extendedPropsInputMap()); + inputProps.put("spring.pulsar.producer.batching-enabled", "false"); + var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps); + var outputProps = producerConfigProps.toAllProducerPropertiesMap(); + assertThat(outputProps).doesNotContainKey("batchingMaxPublishDelayMicros") + .doesNotContainKey("batchingPartitionSwitchFrequencyByPublishDelay") + .doesNotContainKey("batchingMaxMessages").doesNotContainKey("batchingMaxBytes"); + } + + private Map extendedPropsInputMap() { + Map inputProps = new HashMap<>(); + inputProps.put("spring.pulsar.producer.auto-update-partitions", "true"); + inputProps.put("spring.pulsar.producer.auto-update-partitions-interval", "15s"); + inputProps.put("spring.pulsar.producer.block-if-queue-full", "true"); + inputProps.put("spring.pulsar.producer.crypto-failure-action", "send"); + inputProps.put("spring.pulsar.producer.encryption-keys[0]", "my-key"); + inputProps.put("spring.pulsar.producer.initial-sequence-id", "9"); + inputProps.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true"); + inputProps.put("spring.pulsar.producer.max-pending-messages", "3"); + inputProps.put("spring.pulsar.producer.max-pending-messages-across-partitions", "4"); + inputProps.put("spring.pulsar.producer.multi-schema", "true"); + inputProps.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value"); + inputProps.put("spring.pulsar.producer.batch.max-publish-delay", "5s"); + inputProps.put("spring.pulsar.producer.batch.partition-switch-frequency-by-publish-delay", "6"); + inputProps.put("spring.pulsar.producer.batch.max-messages", "7"); + inputProps.put("spring.pulsar.producer.batch.max-bytes", "8"); + return inputProps; + } + + private void verifyExtendedPropsInOutputMap(Map outputProps) { + assertThat(outputProps).containsEntry("autoUpdatePartitions", true) + .containsEntry("autoUpdatePartitionsIntervalSeconds", 15L).containsEntry("blockIfQueueFull", true) + .containsEntry("cryptoFailureAction", ProducerCryptoFailureAction.SEND) + .hasEntrySatisfying("encryptionKeys", + keys -> assertThat(keys).asInstanceOf(InstanceOfAssertFactories.collection(String.class)) + .containsExactly("my-key")) + .containsEntry("initialSequenceId", 9L).containsEntry("lazyStartPartitionedProducers", true) + .containsEntry("maxPendingMessages", 3).containsEntry("maxPendingMessagesAcrossPartitions", 4) + .containsEntry("multiSchema", true) + .hasEntrySatisfying("properties", + properties -> assertThat(properties) + .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) + .containsEntry("my-prop", "my-prop-value")) + .containsEntry("batchingMaxPublishDelayMicros", 5_000_000L) + .containsEntry("batchingPartitionSwitchFrequencyByPublishDelay", 6) + .containsEntry("batchingMaxMessages", 7).containsEntry("batchingMaxBytes", 8); + } + + private void verifyOutputPropsCanBeLoadedInProducerBuilder(Map outputProps) { + assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(outputProps, + new ProducerConfigurationData(), ProducerConfigurationData.class)); + } + + private ProducerConfigProperties bindInputPropsToProducerConfigProps(Map inputProps) { + return new Binder(new MapConfigurationPropertySource(inputProps)) + .bind("spring.pulsar.producer", Bindable.ofInstance(new ProducerConfigProperties())).get(); + } + +} diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderConfigurationPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderConfigurationPropertiesTests.java index d21b403352..06aa6c27b1 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderConfigurationPropertiesTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderConfigurationPropertiesTests.java @@ -29,7 +29,6 @@ import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; -import org.springframework.boot.context.properties.source.ConfigurationPropertySource; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; import org.springframework.cloud.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties; @@ -43,79 +42,57 @@ */ public class PulsarBinderConfigurationPropertiesTests { - private final PulsarBinderConfigurationProperties properties = new PulsarBinderConfigurationProperties(); - - private void bind(Map map) { - ConfigurationPropertySource source = new MapConfigurationPropertySource(map); - new Binder(source).bind("spring.cloud.stream.pulsar.binder", Bindable.ofInstance(this.properties)); - } - @Test void partitionCountProperty() { - assertThat(properties.getPartitionCount()).isNull(); - bind(Map.of("spring.cloud.stream.pulsar.binder.partition-count", "5150")); - assertThat(properties.getPartitionCount()).isEqualTo(5150); + assertThat(new PulsarBinderConfigurationProperties().getPartitionCount()).isNull(); + var binderConfigProps = bindInputPropsToBinderConfigProps( + Map.of("spring.cloud.stream.pulsar.binder.partition-count", "5150")); + assertThat(binderConfigProps.getPartitionCount()).isEqualTo(5150); } @Test void producerProperties() { - // Only spot check a few values (PulsarPropertiesTests does the heavy lifting) - Map props = new HashMap<>(); - props.put("spring.cloud.stream.pulsar.binder.producer.topic-name", "my-topic"); - props.put("spring.cloud.stream.pulsar.binder.producer.send-timeout", "2s"); - props.put("spring.cloud.stream.pulsar.binder.producer.max-pending-messages", "3"); - props.put("spring.cloud.stream.pulsar.binder.producer.producer-access-mode", "exclusive"); - props.put("spring.cloud.stream.pulsar.binder.producer.properties[my-prop]", "my-prop-value"); - - bind(props); - Map producerProps = PulsarBinderUtils.convertProducerPropertiesToMap(properties.getProducer()); + // Only spot check a few values (ProducerConfigPropertiesTests does the heavy + // lifting) + Map inputProps = new HashMap<>(); + inputProps.put("spring.cloud.stream.pulsar.binder.producer.topic-name", "my-topic"); + inputProps.put("spring.cloud.stream.pulsar.binder.producer.send-timeout", "2s"); + inputProps.put("spring.cloud.stream.pulsar.binder.producer.max-pending-messages", "3"); + inputProps.put("spring.cloud.stream.pulsar.binder.producer.access-mode", "exclusive"); + var binderConfigProps = bindInputPropsToBinderConfigProps(inputProps); + var producerProps = binderConfigProps.getProducer().toAllProducerPropertiesMap(); // Verify that the props can be loaded in a ProducerBuilder assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps, new ProducerConfigurationData(), ProducerConfigurationData.class)); - - // @formatter:off - assertThat(producerProps) - .containsEntry("topicName", "my-topic") - .containsEntry("sendTimeoutMs", 2_000) - .containsEntry("maxPendingMessages", 3) - .containsEntry("accessMode", ProducerAccessMode.Exclusive) - .hasEntrySatisfying("properties", properties -> - assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-prop", "my-prop-value")); - // @formatter:on + assertThat(producerProps).containsEntry("topicName", "my-topic").containsEntry("sendTimeoutMs", 2_000) + .containsEntry("maxPendingMessages", 3).containsEntry("accessMode", ProducerAccessMode.Exclusive); } @Test void consumerProperties() { - // Only spot check a few values (PulsarPropertiesTests does the heavy lifting) - Map props = new HashMap<>(); - props.put("spring.cloud.stream.pulsar.binder.consumer.topics[0]", "my-topic"); - props.put("spring.cloud.stream.pulsar.binder.consumer.subscription-properties[my-sub-prop]", - "my-sub-prop-value"); - props.put("spring.cloud.stream.pulsar.binder.consumer.subscription-mode", "nondurable"); - props.put("spring.cloud.stream.pulsar.binder.consumer.receiver-queue-size", "1"); - - bind(props); - Map consumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(properties.getConsumer()); + // Only spot check a few values (ConsumerConfigPropertiesTests does the heavy + // lifting) + Map inputProps = new HashMap<>(); + inputProps.put("spring.cloud.stream.pulsar.binder.consumer.topics[0]", "my-topic"); + inputProps.put("spring.cloud.stream.pulsar.binder.consumer.subscription.mode", "nondurable"); + inputProps.put("spring.cloud.stream.pulsar.binder.consumer.receiver-queue-size", "1"); + var binderConfigProps = bindInputPropsToBinderConfigProps(inputProps); + var consumerProps = binderConfigProps.getConsumer().toAllConsumerPropertiesMap(); // Verify that the props can be loaded in a ConsumerBuilder assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps, new ConsumerConfigurationData<>(), ConsumerConfigurationData.class)); - - // @formatter:off assertThat(consumerProps) .hasEntrySatisfying("topicNames", topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class)) .containsExactly("my-topic")) - .hasEntrySatisfying("subscriptionProperties", - properties -> assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-sub-prop", "my-sub-prop-value")) - .containsEntry("subscriptionMode", SubscriptionMode.NonDurable) - .containsEntry("receiverQueueSize", 1); - // @formatter:on + .containsEntry("subscriptionMode", SubscriptionMode.NonDurable).containsEntry("receiverQueueSize", 1); + } + + private PulsarBinderConfigurationProperties bindInputPropsToBinderConfigProps(Map inputProps) { + return new Binder(new MapConfigurationPropertySource(inputProps)).bind("spring.cloud.stream.pulsar.binder", + Bindable.ofInstance(new PulsarBinderConfigurationProperties())).get(); } } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java index 08ec2731b2..b8b7e73171 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java @@ -26,7 +26,6 @@ import java.util.function.Supplier; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -38,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.WebApplicationType; @@ -47,11 +48,9 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.pulsar.autoconfigure.PulsarProperties; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; @@ -60,7 +59,6 @@ import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; -import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.support.header.PulsarHeaderMapper; import org.springframework.pulsar.support.header.ToStringPulsarHeaderMapper; @@ -84,17 +82,18 @@ void binderAndBindingPropsAreAppliedAndRespected(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext context = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.producer.cache.enabled=false", "--spring.cloud.function.definition=textSupplier;textLogger", "--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0", - "--spring.pulsar.producer.producer-name=textSupplierProducer-fromBase", - "--spring.cloud.stream.pulsar.binder.producer.producer-name=textSupplierProducer-fromBinder", - "--spring.cloud.stream.pulsar.bindings.textSupplier-out-0.producer.producer-name=textSupplierProducer-fromBinding", + "--spring.pulsar.producer.name=textSupplierProducer-fromBase", + "--spring.cloud.stream.pulsar.binder.producer.name=textSupplierProducer-fromBinder", + "--spring.cloud.stream.pulsar.bindings.textSupplier-out-0.producer.name=textSupplierProducer-fromBinding", "--spring.cloud.stream.pulsar.binder.producer.max-pending-messages=1100", - "--spring.pulsar.producer.block-if-queue-full=true", - "--spring.cloud.stream.pulsar.binder.consumer.subscription-name=textLoggerSub-fromBinder", - "--spring.cloud.stream.pulsar.binder.consumer.consumer-name=textLogger-fromBinder", - "--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.consumer-name=textLogger-fromBinding")) { + "--spring.cloud.stream.pulsar.binder.producer.block-if-queue-full=true", + "--spring.cloud.stream.pulsar.binder.consumer.subscription.name=textLoggerSub-fromBinder", + "--spring.cloud.stream.pulsar.binder.consumer.name=textLogger-fromBinder", + "--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.name=textLogger-fromBinding")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: test-basic-scenario")); @@ -122,10 +121,10 @@ void primitiveTypeString(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=textSupplier;textLogger", "--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0", - "--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.subscription-name=pbit-text-sub1")) { + "--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.subscription.name=pbit-text-sub1")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: test-basic-scenario")); } @@ -137,11 +136,11 @@ void primitiveTypeFloat(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=piSupplier;piLogger", "--spring.cloud.stream.bindings.piSupplier-out-0.destination=pi-stream", "--spring.cloud.stream.bindings.piLogger-in-0.destination=pi-stream", - "--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription-name=pbit-float-sub1")) { + "--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription.name=pbit-float-sub1")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 3.14")); } @@ -158,14 +157,14 @@ void primitiveTypeFloat(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=piSupplier;piLogger", "--spring.cloud.stream.bindings.piLogger-in-0.destination=piSupplier-out-0", "--spring.cloud.stream.bindings.piSupplier-out-0.producer.use-native-encoding=true", "--spring.cloud.stream.pulsar.bindings.piSupplier-out-0.producer.schema-type=FLOAT", "--spring.cloud.stream.bindings.piLogger-in-0.consumer.use-native-decoding=true", "--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.schema-type=FLOAT", - "--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription-name=pbit-float-sub2")) { + "--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription.name=pbit-float-sub2")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 3.14")); } @@ -177,7 +176,7 @@ void jsonTypeFooWithSchemaType(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=fooSupplier;fooLogger", "--spring.cloud.stream.bindings.fooSupplier-out-0.destination=foo-stream-1", "--spring.cloud.stream.bindings.fooLogger-in-0.destination=foo-stream-1", @@ -188,7 +187,7 @@ void jsonTypeFooWithSchemaType(CapturedOutput output) { "--spring.cloud.stream.bindings.fooLogger-in-0.consumer.use-native-decoding=true", "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.schema-type=JSON", "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(), - "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription-name=pbit-foo-sub1")) { + "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-foo-sub1")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: Foo[value=5150]")); } @@ -200,7 +199,7 @@ void jsonTypeFooWithoutSchemaTypeDefaultsToJsonSchema(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=fooSupplier;fooLogger", "--spring.cloud.stream.bindings.fooSupplier-out-0.destination=foo-stream-2", "--spring.cloud.stream.bindings.fooLogger-in-0.destination=foo-stream-2", @@ -209,7 +208,7 @@ void jsonTypeFooWithoutSchemaTypeDefaultsToJsonSchema(CapturedOutput output) { + Foo.class.getName(), "--spring.cloud.stream.bindings.fooLogger-in-0.consumer.use-native-decoding=true", "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(), - "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription-name=pbit-foo-sub2")) { + "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-foo-sub2")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: Foo[value=5150]")); } @@ -221,7 +220,7 @@ void avroTypeUserWithSchemaType(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-1", "--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-1", @@ -233,7 +232,7 @@ void avroTypeUserWithSchemaType(CapturedOutput output) { "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.schema-type=AVRO", "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type=" + User.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub1")) { + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub1")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}")); } @@ -245,7 +244,7 @@ void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaProps(CapturedOutput outp app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-2", "--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-2", @@ -255,7 +254,7 @@ void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaProps(CapturedOutput outp "--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true", "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type=" + User.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub2", + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub2", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) @@ -269,7 +268,7 @@ void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaCustomizer(CapturedOutput app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-3", "--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-3", @@ -279,7 +278,7 @@ void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaCustomizer(CapturedOutput "--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true", "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type=" + User.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub3")) { + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub3")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}")); } @@ -291,7 +290,7 @@ void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutput app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-1", "--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-1", @@ -307,7 +306,7 @@ void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutput + User.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type=" + String.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub1", + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub1", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) @@ -321,7 +320,7 @@ void keyValueAvroTypeWithoutSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutp app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-2", "--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-2", @@ -335,7 +334,7 @@ void keyValueAvroTypeWithoutSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutp + User.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type=" + String.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub2", + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub2", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) @@ -349,7 +348,7 @@ void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaCustomizer(CapturedOu app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=userSupplier;userLogger", "--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-3", "--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-3", @@ -365,7 +364,7 @@ void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaCustomizer(CapturedOu + User.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type=" + String.class.getName(), - "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub3")) { + "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub3")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}")); } @@ -377,7 +376,7 @@ void keyValueJsonTypeWithoutSchemaTypeAndWithoutCustomTypeMappings(CapturedOutpu app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=fooSupplier;fooLogger", "--spring.cloud.stream.bindings.fooSupplier-out-0.destination=kv-stream-4", "--spring.cloud.stream.bindings.fooLogger-in-0.destination=kv-stream-4", @@ -390,7 +389,7 @@ void keyValueJsonTypeWithoutSchemaTypeAndWithoutCustomTypeMappings(CapturedOutpu "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(), "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-key-type=" + String.class.getName(), - "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription-name=pbit-kv-sub4")) { + "--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-kv-sub4")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 5150->Foo[value=5150]")); } @@ -407,11 +406,11 @@ void headersPropagatedSendAndReceive(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=springMessageSupplier;springMessageLogger", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-1", "--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-1", - "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription-name=pbit-cmh1-sub1")) { + "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh1-sub1")) { // Wait for a few of the messages to flow through (check for index = 5) Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until( () -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: 5150-5")); @@ -424,11 +423,11 @@ void complexHeadersAreEncodedAndPropagated(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=springMessageSupplier;springMessageLogger", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-2", "--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-2", - "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription-name=pbit-cmh2-sub1")) { + "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh2-sub1")) { // Wait for a few of the messages to flow through (check for index = 5) Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until(() -> output.toString() .contains("Hello binder: test-headers-msg-5 w/ custom-id: FooHeader[value=5150-5]")); @@ -441,12 +440,12 @@ void producerHeaderModeNone(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=springMessageSupplier;springMessageLogger", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-3", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.producer.header-mode=none", "--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-3", - "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription-name=pbit-cmh3-sub1")) { + "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh3-sub1")) { // Wait for a few of the messages to flow through (check for index = 5) Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: null")); @@ -459,12 +458,12 @@ void consumerHeaderModeNone(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=springMessageSupplier;springMessageLogger", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-4", "--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-4", "--spring.cloud.stream.bindings.springMessageLogger-in-0.consumer.header-mode=none", - "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription-name=pbit-cmh4-sub1")) { + "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh4-sub1")) { // Wait for a few of the messages to flow through (check for index = 5) Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: null")); @@ -477,11 +476,11 @@ void customHeaderMapperRespected(CapturedOutput output) { app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext ignored = app.run( "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), "--spring.cloud.function.definition=springMessageSupplier;springMessageLogger", "--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-5", "--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-5", - "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription-name=pbit-cmh5-sub1")) { + "--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh5-sub1")) { // Wait for a few of the messages to flow through (check for index = 5) Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until(() -> output.toString() .contains("Hello binder: test-headers-msg-5 w/ custom-id: tsh->tph->FooHeader[value=5150-5]")); @@ -638,66 +637,109 @@ public Consumer textLogger() { @Import(PrimitiveTextConfig.class) static class BinderAndBindingPropsTestConfig { - @SuppressWarnings("unchecked") @Bean - public PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, - PulsarProperties pulsarProperties, TopicResolver topicResolver) { - var customizer = (ProducerBuilderCustomizer) pulsarProperties.getProducer() - .toProducerBuilderCustomizer(); - return new TrackingProducerFactory(pulsarClient, pulsarProperties.getProducer().getTopicName(), customizer, - topicResolver); + TrackingProducerFactoryBeanPostProcessor trackingProducerFactory() { + return new TrackingProducerFactoryBeanPostProcessor(); } - @SuppressWarnings("unchecked") @Bean - public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, - PulsarProperties pulsarProperties) { - var customizer = (ConsumerBuilderCustomizer) pulsarProperties.getConsumer() - .toConsumerBuilderCustomizer(); - return new TrackingConsumerFactory(pulsarClient, customizer); + TrackingConsumerFactoryBeanPostProcessor trackingConsumerFactory() { + return new TrackingConsumerFactoryBeanPostProcessor(); } } - static class TrackingProducerFactory extends DefaultPulsarProducerFactory { + static class TrackingProducerFactoryBeanPostProcessor implements BeanPostProcessor { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof DefaultPulsarProducerFactory defaultFactory) { + return new TrackingProducerFactory(defaultFactory); + } + return bean; + } + + } + + static class TrackingProducerFactory implements PulsarProducerFactory { + + private final DefaultPulsarProducerFactory trackedProducerFactory; List> producersCreated = new ArrayList<>(); - TrackingProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic, - ProducerBuilderCustomizer defaultConfigCustomizer, TopicResolver topicResolver) { - super(pulsarClient, defaultTopic, defaultConfigCustomizer, topicResolver); + TrackingProducerFactory(DefaultPulsarProducerFactory trackedProducerFactory) { + this.trackedProducerFactory = trackedProducerFactory; } @Override - protected Producer doCreateProducer(Schema schema, @Nullable String topic, - @Nullable Collection encryptionKeys, - @Nullable List> producerBuilderCustomizers) - throws PulsarClientException { - Producer producer = super.doCreateProducer(schema, topic, encryptionKeys, + public Producer createProducer(Schema schema, String topic) throws PulsarClientException { + var producer = this.trackedProducerFactory.createProducer(schema, topic); + this.producersCreated.add(producer); + return producer; + } + + @Override + public Producer createProducer(Schema schema, String topic, + ProducerBuilderCustomizer customizer) throws PulsarClientException { + var producer = this.trackedProducerFactory.createProducer(schema, topic, customizer); + this.producersCreated.add(producer); + return producer; + } + + @Override + public Producer createProducer(Schema schema, String topic, Collection encryptionKeys, + List> producerBuilderCustomizers) throws PulsarClientException { + var producer = this.trackedProducerFactory.createProducer(schema, topic, encryptionKeys, producerBuilderCustomizers); - producersCreated.add(producer); + this.producersCreated.add(producer); return producer; } + @Override + public String getDefaultTopic() { + return this.trackedProducerFactory.getDefaultTopic(); + } + + } + + static class TrackingConsumerFactoryBeanPostProcessor implements BeanPostProcessor { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof DefaultPulsarConsumerFactory defaultFactory) { + return new TrackingConsumerFactory(defaultFactory); + } + return bean; + } + } - static class TrackingConsumerFactory extends DefaultPulsarConsumerFactory { + static class TrackingConsumerFactory implements PulsarConsumerFactory { + + private final DefaultPulsarConsumerFactory trackedConsumerFactory; List> consumersCreated = new ArrayList<>(); - TrackingConsumerFactory(PulsarClient pulsarClient, ConsumerBuilderCustomizer defaultConsumerConfig) { - super(pulsarClient, defaultConsumerConfig); + TrackingConsumerFactory(DefaultPulsarConsumerFactory trackedConsumerFactory) { + this.trackedConsumerFactory = trackedConsumerFactory; } @Override public org.apache.pulsar.client.api.Consumer createConsumer(Schema schema, - @Nullable Collection topics, @Nullable String subscriptionName, - @Nullable Map metadataProperties, - @Nullable List> consumerBuilderCustomizers) + Collection topics, String subscriptionName, ConsumerBuilderCustomizer customizer) throws PulsarClientException { - org.apache.pulsar.client.api.Consumer consumer = super.createConsumer(schema, topics, - subscriptionName, metadataProperties, consumerBuilderCustomizers); - consumersCreated.add(consumer); + var consumer = this.trackedConsumerFactory.createConsumer(schema, topics, subscriptionName, customizer); + this.consumersCreated.add(consumer); + return consumer; + } + + @Override + public org.apache.pulsar.client.api.Consumer createConsumer(Schema schema, + Collection topics, String subscriptionName, Map metadataProperties, + List> consumerBuilderCustomizers) throws PulsarClientException { + var consumer = this.trackedConsumerFactory.createConsumer(schema, topics, subscriptionName, + metadataProperties, consumerBuilderCustomizers); + this.consumersCreated.add(consumer); return consumer; } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java index a412d38900..229959cd45 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.binder.pulsar; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -104,7 +105,8 @@ protected PulsarTestBinder getBinder() { var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient); var pulsarTemplate = new PulsarTemplate<>(producerFactory); var consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, - (consumerBuilder -> consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest))); + List.of((consumerBuilder) -> consumerBuilder + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest))); if (this.binder == null) { this.binder = new PulsarTestBinder(provisioner, pulsarTemplate, consumerFactory, configProps, new DefaultSchemaResolver(), JsonPulsarHeaderMapper.builder().build()); @@ -155,8 +157,7 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception { Binding consumerBinding = binder.bindConsumer("foo.bar", null, moduleInputChannel, consumerProperties); - Message message = MessageBuilder - .withPayload("foo".getBytes(StandardCharsets.UTF_8)).build(); + Message message = MessageBuilder.withPayload("foo".getBytes(StandardCharsets.UTF_8)).build(); // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtilsTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtilsTests.java index 17805d2251..87b06afe7a 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtilsTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderUtilsTests.java @@ -17,42 +17,24 @@ package org.springframework.cloud.stream.binder.pulsar; import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Stream; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.HashingScheme; -import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerAccessMode; -import org.apache.pulsar.client.api.ProducerCryptoFailureAction; -import org.apache.pulsar.client.api.RegexSubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; -import org.springframework.boot.context.properties.bind.Bindable; -import org.springframework.boot.context.properties.bind.Binder; -import org.springframework.boot.context.properties.source.ConfigurationPropertySource; -import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; +import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties; +import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties; import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties; import org.springframework.cloud.stream.provisioning.ConsumerDestination; -import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties; -import org.springframework.pulsar.autoconfigure.ProducerConfigProperties; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNoException; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,8 +53,8 @@ class SubscriptionNameTests { @Test void respectsValueWhenSetAsProperty() { var consumerDestination = mock(ConsumerDestination.class); - var pulsarConsumerProperties = mock(PulsarConsumerProperties.class); - when(pulsarConsumerProperties.getSubscriptionName()).thenReturn("my-sub"); + var pulsarConsumerProperties = mock(PulsarConsumerProperties.class, Mockito.RETURNS_DEEP_STUBS); + when(pulsarConsumerProperties.getSubscription().getName()).thenReturn("my-sub"); assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination)) .isEqualTo("my-sub"); } @@ -80,8 +62,8 @@ void respectsValueWhenSetAsProperty() { @Test void generatesValueWhenNotSetAsProperty() { var consumerDestination = mock(ConsumerDestination.class); - var pulsarConsumerProperties = mock(PulsarConsumerProperties.class); - when(pulsarConsumerProperties.getSubscriptionName()).thenReturn(null); + var pulsarConsumerProperties = mock(PulsarConsumerProperties.class, Mockito.RETURNS_DEEP_STUBS); + when(pulsarConsumerProperties.getSubscription().getName()).thenReturn(null); when(consumerDestination.getName()).thenReturn("my-topic"); assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination)) .startsWith("my-topic-anon-subscription-"); @@ -154,169 +136,162 @@ static Stream mergePropertiesTestProvider() { } @Nested - class ConvertedPropertiesTests { + class MergeProducerPropertiesTests { - private final ProducerConfigProperties properties = new ProducerConfigProperties(); + private static final Consumer SET_NO_PROPS = (__) -> { + }; - private void bind(Map map) { - ConfigurationPropertySource source = new MapConfigurationPropertySource(map); - new Binder(source).bind("spring.pulsar.producer", Bindable.ofInstance(this.properties)); + @Test + void noPropsSpecified() { + doMergeProducerPropertiesTest(SET_NO_PROPS, SET_NO_PROPS, Collections.emptyMap()); + } + + @Test + void basePropSpecifiedAtBinderLevelOnly() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Exclusive), + SET_NO_PROPS, Map.of("accessMode", ProducerAccessMode.Exclusive)); + } + + @Test + void basePropSpecifiedAtBindingLevelOnly() { + doMergeProducerPropertiesTest(SET_NO_PROPS, + (bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Exclusive), + Map.of("accessMode", ProducerAccessMode.Exclusive)); } @Test - void producerPropertiesToMap() { - Map props = new HashMap<>(); - props.put("spring.pulsar.producer.topic-name", "my-topic"); - props.put("spring.pulsar.producer.producer-name", "my-producer"); - props.put("spring.pulsar.producer.send-timeout", "2s"); - props.put("spring.pulsar.producer.block-if-queue-full", "true"); - props.put("spring.pulsar.producer.max-pending-messages", "3"); - props.put("spring.pulsar.producer.max-pending-messages-across-partitions", "4"); - props.put("spring.pulsar.producer.message-routing-mode", "custompartition"); - props.put("spring.pulsar.producer.hashing-scheme", "murmur3_32hash"); - props.put("spring.pulsar.producer.crypto-failure-action", "send"); - props.put("spring.pulsar.producer.batching-max-publish-delay", "5s"); - props.put("spring.pulsar.producer.batching-partition-switch-frequency-by-publish-delay", "6"); - props.put("spring.pulsar.producer.batching-max-messages", "7"); - props.put("spring.pulsar.producer.batching-max-bytes", "8"); - props.put("spring.pulsar.producer.batching-enabled", "false"); - props.put("spring.pulsar.producer.chunking-enabled", "true"); - props.put("spring.pulsar.producer.encryption-keys[0]", "my-key"); - props.put("spring.pulsar.producer.compression-type", "lz4"); - props.put("spring.pulsar.producer.initial-sequence-id", "9"); - props.put("spring.pulsar.producer.producer-access-mode", "exclusive"); - props.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true"); - props.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value"); - - bind(props); - Map producerProps = PulsarBinderUtils.convertProducerPropertiesToMap(properties); - - // Verify that the props can be loaded in a ProducerBuilder - assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps, - new ProducerConfigurationData(), ProducerConfigurationData.class)); - - assertThat(producerProps).containsEntry("topicName", "my-topic") - .containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000) - .containsEntry("blockIfQueueFull", true).containsEntry("maxPendingMessages", 3) - .containsEntry("maxPendingMessagesAcrossPartitions", 4) - .containsEntry("messageRoutingMode", MessageRoutingMode.CustomPartition) - .containsEntry("hashingScheme", HashingScheme.Murmur3_32Hash) - .containsEntry("cryptoFailureAction", ProducerCryptoFailureAction.SEND) - .containsEntry("batchingMaxPublishDelayMicros", 5_000_000L) - .containsEntry("batchingPartitionSwitchFrequencyByPublishDelay", 6) - .containsEntry("batchingMaxMessages", 7).containsEntry("batchingMaxBytes", 8) - .containsEntry("batchingEnabled", false).containsEntry("chunkingEnabled", true) - .hasEntrySatisfying("encryptionKeys", - keys -> assertThat(keys).asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-key")) - .containsEntry("compressionType", CompressionType.LZ4).containsEntry("initialSequenceId", 9L) - .containsEntry("accessMode", ProducerAccessMode.Exclusive) - .containsEntry("lazyStartPartitionedProducers", true).hasEntrySatisfying("properties", - properties -> assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-prop", "my-prop-value")); + void basePropSpecifiedAtBinderAndBindingLevel() { + doMergeProducerPropertiesTest( + (binderProps) -> binderProps.setAccessMode(ProducerAccessMode.ExclusiveWithFencing), + (bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Exclusive), + Map.of("accessMode", ProducerAccessMode.Exclusive)); + } + + @Test + void basePropSpecifiedWithSameValueAsDefault() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Shared), + (bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Shared), Collections.emptyMap()); + } + + @Test + void extPropSpecifiedAtBinderLevelOnly() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setMaxPendingMessages(1200), SET_NO_PROPS, + Map.of("maxPendingMessages", 1200)); + } + + @Test + void extPropSpecifiedAtBindingLevelOnly() { + doMergeProducerPropertiesTest(SET_NO_PROPS, (bindingProps) -> bindingProps.setMaxPendingMessages(1200), + Map.of("maxPendingMessages", 1200)); + } + + @Test + void extPropSpecifiedAtBinderAndBindingLevel() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setMaxPendingMessages(1100), + (bindingProps) -> bindingProps.setMaxPendingMessages(1200), Map.of("maxPendingMessages", 1200)); + } + + @Test + void extPropSpecifiedWithSameValueAsDefault() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setMaxPendingMessages(1000), + (bindingProps) -> bindingProps.setMaxPendingMessages(1000), Collections.emptyMap()); + } + + @Test + void baseAndExtPropsAreCombined() { + doMergeProducerPropertiesTest((binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Exclusive), + (bindingProps) -> bindingProps.setMaxPendingMessages(1200), + Map.of("accessMode", ProducerAccessMode.Exclusive, "maxPendingMessages", 1200)); + } + + void doMergeProducerPropertiesTest(Consumer binderPropsCustomizer, + Consumer bindingPropsCustomizer, Map expectedProps) { + var binderProducerProps = new ProducerConfigProperties(); + binderPropsCustomizer.accept(binderProducerProps); + var bindingProducerProps = new ProducerConfigProperties(); + bindingPropsCustomizer.accept(bindingProducerProps); + var mergedProps = PulsarBinderUtils.mergeModifiedProducerProperties(binderProducerProps, + bindingProducerProps); + assertThat(mergedProps).containsExactlyInAnyOrderEntriesOf(expectedProps); } } @Nested - class ConvertedConsumerPropertiesTests { + class MergeConsumerPropertiesTests { - private final ConsumerConfigProperties properties = new ConsumerConfigProperties(); + private static final Consumer SET_NO_PROPS = (__) -> { + }; - private void bind(Map map) { - ConfigurationPropertySource source = new MapConfigurationPropertySource(map); - new Binder(source).bind("spring.pulsar.consumer", Bindable.ofInstance(this.properties)); + @Test + void noPropsSpecified() { + doMergeConsumerPropertiesTest(SET_NO_PROPS, SET_NO_PROPS, Collections.emptyMap()); + } + + @Test + void basePropSpecifiedAtBinderLevelOnly() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setPriorityLevel(1000), SET_NO_PROPS, + Map.of("priorityLevel", 1000)); + } + + @Test + void basePropSpecifiedAtBindingLevelOnly() { + doMergeConsumerPropertiesTest(SET_NO_PROPS, (bindingProps) -> bindingProps.setPriorityLevel(1000), + Map.of("priorityLevel", 1000)); } @Test - void consumerPropertiesToMap() { - Map props = new HashMap<>(); - props.put("spring.pulsar.consumer.topics[0]", "my-topic"); - props.put("spring.pulsar.consumer.topics-pattern", "my-pattern"); - props.put("spring.pulsar.consumer.subscription-name", "my-subscription"); - props.put("spring.pulsar.consumer.subscription-type", "shared"); - props.put("spring.pulsar.consumer.subscription-properties[my-sub-prop]", "my-sub-prop-value"); - props.put("spring.pulsar.consumer.subscription-mode", "nondurable"); - props.put("spring.pulsar.consumer.receiver-queue-size", "1"); - props.put("spring.pulsar.consumer.acknowledgements-group-time", "2s"); - props.put("spring.pulsar.consumer.negative-ack-redelivery-delay", "3s"); - props.put("spring.pulsar.consumer.max-total-receiver-queue-size-across-partitions", "5"); - props.put("spring.pulsar.consumer.consumer-name", "my-consumer"); - props.put("spring.pulsar.consumer.ack-timeout", "6s"); - props.put("spring.pulsar.consumer.tick-duration", "7s"); - props.put("spring.pulsar.consumer.priority-level", "8"); - props.put("spring.pulsar.consumer.crypto-failure-action", "discard"); - props.put("spring.pulsar.consumer.properties[my-prop]", "my-prop-value"); - props.put("spring.pulsar.consumer.read-compacted", "true"); - props.put("spring.pulsar.consumer.subscription-initial-position", "earliest"); - props.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9"); - props.put("spring.pulsar.consumer.regex-subscription-mode", "all-topics"); - props.put("spring.pulsar.consumer.dead-letter-policy.max-redeliver-count", "4"); - props.put("spring.pulsar.consumer.dead-letter-policy.retry-letter-topic", "my-retry-topic"); - props.put("spring.pulsar.consumer.dead-letter-policy.dead-letter-topic", "my-dlt-topic"); - props.put("spring.pulsar.consumer.dead-letter-policy.initial-subscription-name", "my-initial-subscription"); - props.put("spring.pulsar.consumer.retry-enable", "true"); - props.put("spring.pulsar.consumer.auto-update-partitions", "false"); - props.put("spring.pulsar.consumer.auto-update-partitions-interval", "10s"); - props.put("spring.pulsar.consumer.replicate-subscription-state", "true"); - props.put("spring.pulsar.consumer.reset-include-head", "true"); - props.put("spring.pulsar.consumer.batch-index-ack-enabled", "true"); - props.put("spring.pulsar.consumer.ack-receipt-enabled", "true"); - props.put("spring.pulsar.consumer.pool-messages", "true"); - props.put("spring.pulsar.consumer.start-paused", "true"); - props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false"); - props.put("spring.pulsar.consumer.max-pending-chunked-message", "11"); - props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s"); - - bind(props); - Map consumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(properties); - - // Verify that the props can be loaded in a ConsumerBuilder - assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps, - new ConsumerConfigurationData<>(), ConsumerConfigurationData.class)); - - assertThat(consumerProps) - .hasEntrySatisfying("topicNames", - topics -> assertThat(topics) - .asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-topic")) - .hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern")) - .containsEntry("subscriptionName", "my-subscription") - .containsEntry("subscriptionType", SubscriptionType.Shared) - .hasEntrySatisfying("subscriptionProperties", - properties -> assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-sub-prop", "my-sub-prop-value")) - .containsEntry("subscriptionMode", SubscriptionMode.NonDurable) - .containsEntry("receiverQueueSize", 1).containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L) - .containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L) - .containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5) - .containsEntry("consumerName", "my-consumer").containsEntry("ackTimeoutMillis", 6_000L) - .containsEntry("tickDurationMillis", 7_000L).containsEntry("priorityLevel", 8) - .containsEntry("cryptoFailureAction", ConsumerCryptoFailureAction.DISCARD) - .hasEntrySatisfying("properties", - properties -> assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-prop", "my-prop-value")) - .containsEntry("readCompacted", true) - .containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest) - .containsEntry("patternAutoDiscoveryPeriod", 9) - .containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics) - .hasEntrySatisfying("deadLetterPolicy", dlp -> { - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) dlp; - assertThat(deadLetterPolicy.getMaxRedeliverCount()).isEqualTo(4); - assertThat(deadLetterPolicy.getRetryLetterTopic()).isEqualTo("my-retry-topic"); - assertThat(deadLetterPolicy.getDeadLetterTopic()).isEqualTo("my-dlt-topic"); - assertThat(deadLetterPolicy.getInitialSubscriptionName()).isEqualTo("my-initial-subscription"); - }).containsEntry("retryEnable", true).containsEntry("autoUpdatePartitions", false) - .containsEntry("autoUpdatePartitionsIntervalSeconds", 10L) - .containsEntry("replicateSubscriptionState", true).containsEntry("resetIncludeHead", true) - .containsEntry("batchIndexAckEnabled", true).containsEntry("ackReceiptEnabled", true) - .containsEntry("poolMessages", true).containsEntry("startPaused", true) - .containsEntry("autoAckOldestChunkedMessageOnQueueFull", false) - .containsEntry("maxPendingChunkedMessage", 11) - .containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L); + void basePropSpecifiedAtBinderAndBindingLevel() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setPriorityLevel(2000), + (bindingProps) -> bindingProps.setPriorityLevel(1000), Map.of("priorityLevel", 1000)); + } + + @Test + void basePropSpecifiedWithSameValueAsDefault() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setPriorityLevel(0), + (bindingProps) -> bindingProps.setPriorityLevel(0), Collections.emptyMap()); + } + + @Test + void extPropSpecifiedAtBinderLevelOnly() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setReceiverQueueSize(1200), SET_NO_PROPS, + Map.of("receiverQueueSize", 1200)); + } + + @Test + void extPropSpecifiedAtBindingLevelOnly() { + doMergeConsumerPropertiesTest(SET_NO_PROPS, (bindingProps) -> bindingProps.setReceiverQueueSize(1200), + Map.of("receiverQueueSize", 1200)); + } + + @Test + void extPropSpecifiedAtBinderAndBindingLevel() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setReceiverQueueSize(1100), + (bindingProps) -> bindingProps.setReceiverQueueSize(1200), Map.of("receiverQueueSize", 1200)); + } + + @Test + void extPropSpecifiedWithSameValueAsDefault() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setReceiverQueueSize(1000), + (bindingProps) -> bindingProps.setReceiverQueueSize(1000), Collections.emptyMap()); + } + + @Test + void baseAndExtPropsAreCombined() { + doMergeConsumerPropertiesTest((binderProps) -> binderProps.setPriorityLevel(1000), + (bindingProps) -> bindingProps.setReceiverQueueSize(1200), + Map.of("priorityLevel", 1000, "receiverQueueSize", 1200)); + } + + void doMergeConsumerPropertiesTest(Consumer binderPropsCustomizer, + Consumer bindingPropsCustomizer, Map expectedProps) { + var binderConsumerProps = new ConsumerConfigProperties(); + binderPropsCustomizer.accept(binderConsumerProps); + var bindingConsumerProps = new ConsumerConfigProperties(); + bindingPropsCustomizer.accept(bindingConsumerProps); + var mergedProps = PulsarBinderUtils.mergeModifiedConsumerProperties(binderConsumerProps, + bindingConsumerProps); + assertThat(mergedProps).containsExactlyInAnyOrderEntriesOf(expectedProps); } } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingPropertiesTests.java index c272e064b4..e24392b7f1 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingPropertiesTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingPropertiesTests.java @@ -30,7 +30,6 @@ import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; -import org.springframework.boot.context.properties.source.ConfigurationPropertySource; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; import org.springframework.cloud.stream.binder.pulsar.properties.PulsarExtendedBindingProperties; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -45,96 +44,68 @@ */ public class PulsarExtendedBindingPropertiesTests { - private final PulsarExtendedBindingProperties properties = new PulsarExtendedBindingProperties(); - - private void bind(Map map) { - ConfigurationPropertySource source = new MapConfigurationPropertySource(map); - new Binder(source).bind("spring.cloud.stream.pulsar", Bindable.ofInstance(this.properties)); - } - @Test void producerProperties() { - // Only spot check a few values (PulsarPropertiesTests does the heavy lifting) - Map props = new HashMap<>(); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.topic-name", "my-topic"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.send-timeout", "2s"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.max-pending-messages", "3"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.producer-access-mode", "exclusive"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.properties[my-prop]", "my-prop-value"); + // Only spot check a few values (ProducerConfigPropertiesTests does the heavy + // lifting) + Map inputProps = new HashMap<>(); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.topic-name", "my-topic"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.send-timeout", "2s"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.max-pending-messages", "3"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.access-mode", "exclusive"); + var bindingConfigProps = bindInputPropsToBindingConfigProps(inputProps); + assertThat(bindingConfigProps.getBindings()).containsOnlyKeys("my-foo"); + var producerProps = bindingConfigProps.getExtendedProducerProperties("my-foo").toAllProducerPropertiesMap(); - bind(props); - - assertThat(properties.getBindings()).containsOnlyKeys("my-foo"); - Map producerProps = PulsarBinderUtils - .convertProducerPropertiesToMap(properties.getExtendedProducerProperties("my-foo")); // Verify that the props can be loaded in a ProducerBuilder assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps, new ProducerConfigurationData(), ProducerConfigurationData.class)); - // @formatter:off - assertThat(producerProps) - .containsEntry("topicName", "my-topic") - .containsEntry("sendTimeoutMs", 2_000) - .containsEntry("maxPendingMessages", 3) - .containsEntry("accessMode", ProducerAccessMode.Exclusive) - .hasEntrySatisfying("properties", properties -> - assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-prop", "my-prop-value")); - // @formatter:on + assertThat(producerProps).containsEntry("topicName", "my-topic").containsEntry("sendTimeoutMs", 2_000) + .containsEntry("maxPendingMessages", 3).containsEntry("accessMode", ProducerAccessMode.Exclusive); } @Test void consumerProperties() { - // Only spot check a few values (PulsarPropertiesTests does the heavy lifting) - Map props = new HashMap<>(); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.topics[0]", "my-topic"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-properties[my-sub-prop]", - "my-sub-prop-value"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-mode", "nondurable"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.receiver-queue-size", "1"); - - bind(props); + // Only spot check a few values (ConsumerConfigPropertiesTests does the heavy + // lifting) + Map inputProps = new HashMap<>(); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.topics[0]", "my-topic"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription.mode", "nondurable"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.receiver-queue-size", "1"); + var bindingConfigProps = bindInputPropsToBindingConfigProps(inputProps); + assertThat(bindingConfigProps.getBindings()).containsOnlyKeys("my-foo"); + var consumerProps = bindingConfigProps.getExtendedConsumerProperties("my-foo").toAllConsumerPropertiesMap(); - assertThat(properties.getBindings()).containsOnlyKeys("my-foo"); - Map consumerProps = PulsarBinderUtils - .convertConsumerPropertiesToMap(properties.getExtendedConsumerProperties("my-foo")); // Verify that the props can be loaded in a ConsumerBuilder assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps, new ConsumerConfigurationData<>(), ConsumerConfigurationData.class)); - // @formatter:off assertThat(consumerProps) .hasEntrySatisfying("topicNames", topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class)) .containsExactly("my-topic")) - .hasEntrySatisfying("subscriptionProperties", - properties -> assertThat(properties) - .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class)) - .containsEntry("my-sub-prop", "my-sub-prop-value")) - .containsEntry("subscriptionMode", SubscriptionMode.NonDurable) - .containsEntry("receiverQueueSize", 1); - // @formatter:on + .containsEntry("subscriptionMode", SubscriptionMode.NonDurable).containsEntry("receiverQueueSize", 1); } @Test void extendedBindingsArePropagatedToContainerProperties() { - Map props = new HashMap<>(); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-name", "my-foo-sbscription"); - props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-type", "Shared"); - - bind(props); + Map inputProps = new HashMap<>(); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription.name", "my-foo-sbscription"); + inputProps.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription.type", "Shared"); + var bindingConfigProps = bindInputPropsToBindingConfigProps(inputProps); + var consumerProps = bindingConfigProps.getExtendedConsumerProperties("my-foo").toAllConsumerPropertiesMap(); - var bindingConsumerProps = PulsarBinderUtils - .convertConsumerPropertiesToMap(properties.getExtendedConsumerProperties("my-foo")); PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); - pulsarContainerProperties.getPulsarConsumerProperties().putAll(bindingConsumerProps); - + pulsarContainerProperties.getPulsarConsumerProperties().putAll(consumerProps); assertThat(pulsarContainerProperties.getSubscriptionName()).isNull(); assertThat(pulsarContainerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive); - pulsarContainerProperties.updateContainerProperties(); - assertThat(pulsarContainerProperties.getSubscriptionName()).isEqualTo("my-foo-sbscription"); assertThat(pulsarContainerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); } + private PulsarExtendedBindingProperties bindInputPropsToBindingConfigProps(Map inputProps) { + return new Binder(new MapConfigurationPropertySource(inputProps)) + .bind("spring.cloud.stream.pulsar", Bindable.ofInstance(new PulsarExtendedBindingProperties())).get(); + } + } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTestContainerSupport.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTestContainerSupport.java index b8bddb8388..00107f3af7 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTestContainerSupport.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTestContainerSupport.java @@ -16,8 +16,6 @@ package org.springframework.cloud.stream.binder.pulsar; -import java.util.Locale; - import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.junit.jupiter.Testcontainers; @@ -43,26 +41,11 @@ static String getPulsarBrokerUrl() { } static DockerImageName getPulsarImage() { - return isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage(); + return DockerImageName.parse("apachepulsar/pulsar:3.1.0"); } static String getHttpServiceUrl() { return PULSAR_CONTAINER.getHttpServiceUrl(); } - private static boolean isRunningOnMacM1() { - String osName = System.getProperty("os.name").toLowerCase(Locale.ENGLISH); - String osArchitecture = System.getProperty("os.arch").toLowerCase(Locale.ENGLISH); - return osName.contains("mac") && osArchitecture.equals("aarch64"); - } - - private static DockerImageName getStandardPulsarImage() { - return DockerImageName.parse("apachepulsar/pulsar:2.11.0"); - } - - private static DockerImageName getMacM1PulsarImage() { - return DockerImageName.parse("kezhenxu94/pulsar").asCompatibleSubstituteFor("apachepulsar/pulsar"); - } - } -