Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Pulsar binder to use new Spring Pulsar starter. #2803

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<artifactId>spring-cloud-stream-binder-pulsar</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-pulsar</name>
<description>PUlsar binder implementation</description>
<description>Pulsar binder implementation</description>

<parent>
<groupId>org.springframework.cloud</groupId>
Expand All @@ -15,9 +15,9 @@

<dependencies>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.1-SNAPSHOT</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
<version>3.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.pulsar.support.header.PulsarHeaderMapper;


/**
* A delegating {@code PulsarHeaderMapper} that ensures the delegate mapper never includes
* internal binder specific headers during outbound mapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@

package org.springframework.cloud.stream.binder.pulsar;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties;
import org.springframework.pulsar.autoconfigure.ProducerConfigProperties;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;

/**
* Binder utility methods.
Expand All @@ -55,12 +52,74 @@ private PulsarBinderUtils() {
* consumer properties
*/
static String subscriptionName(PulsarConsumerProperties consumerProps, ConsumerDestination consumerDestination) {
if (StringUtils.hasText(consumerProps.getSubscriptionName())) {
return consumerProps.getSubscriptionName();
if (StringUtils.hasText(consumerProps.getSubscription().getName())) {
return consumerProps.getSubscription().getName();
}
return SUBSCRIPTION_NAME_FORMAT_STR.formatted(consumerDestination.getName(), UUID.randomUUID());
}

/**
* Merges base and extended producer properties defined at the binder and binding
* level. Only properties whose value has changed from the default are considered. If
* a property is defined at both the binder and binding level, the binding level
* property value is given precedence.
* @param binderProducerProps the binder level producer config properties (eg.
* 'spring.cloud.stream.pulsar.binder.producer.*')
* @param bindingProducerProps the binding level config properties (eg.
* 'spring.cloud.stream.pulsar.bindings.myBinding-out-0.producer.*')
* @return map of modified merged binder and binding producer properties
*/
static Map<String, Object> mergeModifiedProducerProperties(ProducerConfigProperties binderProducerProps,
ProducerConfigProperties bindingProducerProps) {
// Layer the base props for common -> binder -> bindings
var baseProducerProps = new ProducerConfigProperties().toBaseProducerPropertiesMap();
var binderBaseProducerProps = binderProducerProps.toBaseProducerPropertiesMap();
var bindingBaseProducerProps = bindingProducerProps.toBaseProducerPropertiesMap();
var layeredBaseProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseProducerProps,
binderBaseProducerProps, bindingBaseProducerProps);
// Layer the extended props for binder -> bindings
var extProducerProps = new ProducerConfigProperties().toExtendedProducerPropertiesMap();
var binderExtProducerProps = binderProducerProps.toExtendedProducerPropertiesMap();
var bindingExtProducerProps = bindingProducerProps.toExtendedProducerPropertiesMap();
var layeredExtProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(extProducerProps,
binderExtProducerProps, bindingExtProducerProps);
// Combine both base and extended layers
var layeredProducerProps = new HashMap<>(layeredBaseProducerProps);
layeredProducerProps.putAll(layeredExtProducerProps);
return layeredProducerProps;
}

/**
* Merges base and extended consumer properties defined at the binder and binding
* level. Only properties whose value has changed from the default are considered. If
* a property is defined at both the binder and binding level, the binding level
* property value is given precedence.
* @param binderConsumerProps the binder level consumer config properties (eg.
* 'spring.cloud.stream.pulsar.binder.consumer.*')
* @param bindingConsumerProps the binding level config properties (eg.
* 'spring.cloud.stream.pulsar.bindings.myBinding-in-0.consumer.*')
* @return map of modified merged binder and binding consumer properties
*/
static Map<String, Object> mergeModifiedConsumerProperties(ConsumerConfigProperties binderConsumerProps,
ConsumerConfigProperties bindingConsumerProps) {
// Layer the base props for common -> binder -> bindings
var baseConsumerProps = new ConsumerConfigProperties().toBaseConsumerPropertiesMap();
var binderBaseConsumerProps = binderConsumerProps.toBaseConsumerPropertiesMap();
var bindingBaseConsumerProps = bindingConsumerProps.toBaseConsumerPropertiesMap();
var layeredBaseConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps,
binderBaseConsumerProps, bindingBaseConsumerProps);
// Layer the extended props for binder -> bindings
var extConsumerProps = new ConsumerConfigProperties().toExtendedConsumerPropertiesMap();
var binderExtConsumerProps = binderConsumerProps.toExtendedConsumerPropertiesMap();
var bindingExtConsumerProps = bindingConsumerProps.toExtendedConsumerPropertiesMap();
var layeredExtConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(extConsumerProps,
binderExtConsumerProps, bindingExtConsumerProps);
// Combine both base and extended layers
var layeredConsumerProps = new HashMap<>(layeredBaseConsumerProps);
layeredConsumerProps.putAll(layeredExtConsumerProps);
return layeredConsumerProps;
}

/**
* Merges properties defined at the binder and binding level (binding properties
* override binder properties).
Expand Down Expand Up @@ -105,103 +164,4 @@ private static Map<String, Object> extractNewOrModifiedProperties(Map<String, Ob
return newOrModifiedProps;
}

/**
* Gets a map representation of a {@link ProducerConfigProperties}.
* @param producerProps the producer props
* @return map representation of producer props where each entry is a field and its
* associated value
*/
static Map<String, Object> convertProducerPropertiesToMap(ProducerConfigProperties producerProps) {
var properties = new PulsarBinderUtils.Properties();
var map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(producerProps::getTopicName).to(properties.in("topicName"));
map.from(producerProps::getProducerName).to(properties.in("producerName"));
map.from(producerProps::getSendTimeout).asInt(Duration::toMillis).to(properties.in("sendTimeoutMs"));
map.from(producerProps::getBlockIfQueueFull).to(properties.in("blockIfQueueFull"));
map.from(producerProps::getMaxPendingMessages).to(properties.in("maxPendingMessages"));
map.from(producerProps::getMaxPendingMessagesAcrossPartitions)
.to(properties.in("maxPendingMessagesAcrossPartitions"));
map.from(producerProps::getMessageRoutingMode).to(properties.in("messageRoutingMode"));
map.from(producerProps::getHashingScheme).to(properties.in("hashingScheme"));
map.from(producerProps::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(producerProps::getBatchingMaxPublishDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("batchingMaxPublishDelayMicros"));
map.from(producerProps::getBatchingPartitionSwitchFrequencyByPublishDelay)
.to(properties.in("batchingPartitionSwitchFrequencyByPublishDelay"));
map.from(producerProps::getBatchingMaxMessages).to(properties.in("batchingMaxMessages"));
map.from(producerProps::getBatchingMaxBytes).asInt(DataSize::toBytes).to(properties.in("batchingMaxBytes"));
map.from(producerProps::getBatchingEnabled).to(properties.in("batchingEnabled"));
map.from(producerProps::getChunkingEnabled).to(properties.in("chunkingEnabled"));
map.from(producerProps::getEncryptionKeys).to(properties.in("encryptionKeys"));
map.from(producerProps::getCompressionType).to(properties.in("compressionType"));
map.from(producerProps::getInitialSequenceId).to(properties.in("initialSequenceId"));
map.from(producerProps::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(producerProps::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(producerProps::getMultiSchema).to(properties.in("multiSchema"));
map.from(producerProps::getProducerAccessMode).to(properties.in("accessMode"));
map.from(producerProps::getLazyStartPartitionedProducers).to(properties.in("lazyStartPartitionedProducers"));
map.from(producerProps::getProperties).to(properties.in("properties"));
return properties;
}

/**
* Gets a map representation of a {@link ConsumerConfigProperties}.
* @param consumerProps the consumer props
* @return map representation of consumer props where each entry is a field and its
* associated value
*/
static Map<String, Object> convertConsumerPropertiesToMap(ConsumerConfigProperties consumerProps) {
var properties = new PulsarBinderUtils.Properties();
var map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(consumerProps::getTopics).to(properties.in("topicNames"));
map.from(consumerProps::getTopicsPattern).to(properties.in("topicsPattern"));
map.from(consumerProps::getSubscriptionName).to(properties.in("subscriptionName"));
map.from(consumerProps::getSubscriptionType).to(properties.in("subscriptionType"));
map.from(consumerProps::getSubscriptionProperties).to(properties.in("subscriptionProperties"));
map.from(consumerProps::getSubscriptionMode).to(properties.in("subscriptionMode"));
map.from(consumerProps::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
map.from(consumerProps::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000)
.to(properties.in("acknowledgementsGroupTimeMicros"));
map.from(consumerProps::getNegativeAckRedeliveryDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("negativeAckRedeliveryDelayMicros"));
map.from(consumerProps::getMaxTotalReceiverQueueSizeAcrossPartitions)
.to(properties.in("maxTotalReceiverQueueSizeAcrossPartitions"));
map.from(consumerProps::getConsumerName).to(properties.in("consumerName"));
map.from(consumerProps::getAckTimeout).as(Duration::toMillis).to(properties.in("ackTimeoutMillis"));
map.from(consumerProps::getTickDuration).as(Duration::toMillis).to(properties.in("tickDurationMillis"));
map.from(consumerProps::getPriorityLevel).to(properties.in("priorityLevel"));
map.from(consumerProps::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(consumerProps::getProperties).to(properties.in("properties"));
map.from(consumerProps::getReadCompacted).to(properties.in("readCompacted"));
map.from(consumerProps::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
map.from(consumerProps::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
map.from(consumerProps::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
map.from(consumerProps::getDeadLetterPolicy).to(properties.in("deadLetterPolicy"));
map.from(consumerProps::getRetryEnable).to(properties.in("retryEnable"));
map.from(consumerProps::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(consumerProps::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(consumerProps::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(consumerProps::getResetIncludeHead).to(properties.in("resetIncludeHead"));
map.from(consumerProps::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled"));
map.from(consumerProps::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled"));
map.from(consumerProps::getPoolMessages).to(properties.in("poolMessages"));
map.from(consumerProps::getStartPaused).to(properties.in("startPaused"));
map.from(consumerProps::getAutoAckOldestChunkedMessageOnQueueFull)
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
map.from(consumerProps::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
map.from(consumerProps::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis)
.to(properties.in("expireTimeOfIncompleteChunkedMessageMillis"));
return properties;
}

static class Properties extends HashMap<String, Object> {

<V> java.util.function.Consumer<V> in(String key) {
return (value) -> put(key, value);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties;
import org.springframework.pulsar.autoconfigure.ProducerConfigProperties;
import org.springframework.pulsar.core.ProducerBuilderConfigurationUtil;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
Expand All @@ -61,7 +59,6 @@
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.support.header.PulsarHeaderMapper;


/**
* {@link Binder} implementation for Apache Pulsar.
*
Expand Down Expand Up @@ -112,15 +109,10 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
schema = null;
}

var baseProducerProps = PulsarBinderUtils.convertProducerPropertiesToMap(new ProducerConfigProperties());
var binderProducerProps = PulsarBinderUtils
.convertProducerPropertiesToMap(this.binderConfigProps.getProducer());
var bindingProducerProps = PulsarBinderUtils.convertProducerPropertiesToMap(producerProperties.getExtension());
var mergedProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseProducerProps,
binderProducerProps, bindingProducerProps);

var layeredProducerProps = PulsarBinderUtils.mergeModifiedProducerProperties(
this.binderConfigProps.getProducer(), producerProperties.getExtension());
var handler = new PulsarProducerConfigurationMessageHandler(this.pulsarTemplate, schema, destination.getName(),
(builder) -> ProducerBuilderConfigurationUtil.loadConf(builder, mergedProducerProps),
(builder) -> ProducerBuilderConfigurationUtil.loadConf(builder, layeredProducerProps),
determineOutboundHeaderMapper(producerProperties));
handler.setApplicationContext(getApplicationContext());
handler.setBeanFactory(getBeanFactory());
Expand Down Expand Up @@ -167,13 +159,9 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
containerProperties.setSubscriptionName(subscriptionName);

var baseConsumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(new ConsumerConfigProperties());
var binderConsumerProps = PulsarBinderUtils
.convertConsumerPropertiesToMap(this.binderConfigProps.getConsumer());
var bindingConsumerProps = PulsarBinderUtils.convertConsumerPropertiesToMap(properties.getExtension());
var mergedConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps,
binderConsumerProps, bindingConsumerProps);
containerProperties.getPulsarConsumerProperties().putAll(mergedConsumerProps);
var layeredConsumerProps = PulsarBinderUtils
.mergeModifiedConsumerProperties(this.binderConfigProps.getConsumer(), properties.getExtension());
containerProperties.getPulsarConsumerProperties().putAll(layeredConsumerProps);
containerProperties.updateContainerProperties();

var container = new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.pulsar.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.pulsar.PulsarMessageChannelBinder;
Expand All @@ -25,7 +26,6 @@
import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.autoconfigure.PulsarProperties;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
Expand All @@ -48,7 +48,7 @@ public class PulsarBinderConfiguration {

@Bean
public PulsarTopicProvisioner pulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) {
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) {
return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties);
}

Expand All @@ -63,9 +63,9 @@ public PulsarHeaderMapper pulsarHeaderMapper() {

@Bean
public PulsarMessageChannelBinder pulsarMessageChannelBinder(PulsarTopicProvisioner pulsarTopicProvisioner,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<byte[]> pulsarConsumerFactory,
PulsarBinderConfigurationProperties binderConfigProps, PulsarExtendedBindingProperties bindingConfigProps,
SchemaResolver schemaResolver, PulsarHeaderMapper headerMapper) {
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<?> pulsarConsumerFactory,
PulsarBinderConfigurationProperties binderConfigProps, PulsarExtendedBindingProperties bindingConfigProps,
SchemaResolver schemaResolver, PulsarHeaderMapper headerMapper) {
PulsarMessageChannelBinder pulsarMessageChannelBinder = new PulsarMessageChannelBinder(pulsarTopicProvisioner,
pulsarTemplate, pulsarConsumerFactory, binderConfigProps, schemaResolver, headerMapper);
pulsarMessageChannelBinder.setExtendedBindingProperties(bindingConfigProps);
Expand Down
Loading
Loading