Skip to content

Commit

Permalink
Autoconfigure QueueMessagingTemplate (#230)
Browse files Browse the repository at this point in the history
Fixes #228.
  • Loading branch information
maciejwalkowiak authored Feb 8, 2022
1 parent a5dd79c commit 77dff95
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.awspring.cloud.core.region.StaticRegionProvider;
import io.awspring.cloud.messaging.config.QueueMessageHandlerFactory;
import io.awspring.cloud.messaging.config.SimpleMessageListenerContainerFactory;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import io.awspring.cloud.messaging.listener.QueueMessageHandler;
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;

Expand Down Expand Up @@ -175,6 +176,17 @@ public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAs
return simpleMessageListenerContainer;
}

@Bean
@ConditionalOnMissingBean(QueueMessagingTemplate.class)
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSqs) {
if (objectMapper != null) {
return new QueueMessagingTemplate(amazonSqs, resourceIdResolver, objectMapper);
}
else {
return new QueueMessagingTemplate(amazonSqs, resourceIdResolver);
}
}

@Bean
public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSqs) {
if (this.simpleMessageListenerContainerFactory.getQueueMessageHandler() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.awspring.cloud.core.region.StaticRegionProvider;
import io.awspring.cloud.messaging.config.QueueMessageHandlerFactory;
import io.awspring.cloud.messaging.config.SimpleMessageListenerContainerFactory;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import io.awspring.cloud.messaging.listener.QueueMessageHandler;
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
Expand All @@ -46,6 +47,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.DestinationResolver;
Expand Down Expand Up @@ -250,6 +252,22 @@ void configuration_withObjectMapper_shouldSetObjectMapperOnQueueMessageHandler()
});
}

@Test
void configuration_withObjectMapper_shouldSetObjectMapperOnQueueMessagingTemplate() throws Exception {
// Arrange & Act
this.contextRunner.withUserConfiguration(ConfigurationWithObjectMapper.class).run((context) -> {
QueueMessagingTemplate queueMessageHandler = context.getBean(QueueMessagingTemplate.class);
ObjectMapper objectMapper = context.getBean(ObjectMapper.class);
CompositeMessageConverter converter = (CompositeMessageConverter) ReflectionTestUtils
.getField(queueMessageHandler, "converter");
MappingJackson2MessageConverter mappingJackson2MessageConverter = (MappingJackson2MessageConverter) converter
.getConverters().get(1);

// Assert
assertThat(mappingJackson2MessageConverter.getObjectMapper()).isEqualTo(objectMapper);
});
}

@Test
void configuration_withoutAwsCredentials_shouldCreateAClientWithDefaultCredentialsProvider() throws Exception {
// Arrange & Act
Expand Down Expand Up @@ -359,6 +377,24 @@ void enableSqsWithCustomEndpoint() {
});
}

@Test
void createsQueueMessagingTemplate() {
this.contextRunner.run(context -> {
assertThat(context).hasSingleBean(QueueMessagingTemplate.class);
});
}

@Test
void doesNotCreateQueueMessagingTemplateWhenOneIsAlreadyDefined() {
this.contextRunner.withUserConfiguration(ConfigurationWithCustomQueueMessagingTemplate.class).run(context -> {
assertThat(context).hasSingleBean(QueueMessagingTemplate.class);
// has bean defined in custom config
assertThat(context).hasBean("customQueueMessagingTemplate");
// does not have been defined in autoconfiguration
assertThat(context).doesNotHaveBean("queueMessagingTemplate");
});
}

@Configuration(proxyBeanMethods = false)
static class MinimalConfiguration {

Expand Down Expand Up @@ -494,6 +530,16 @@ QueueMessageHandlerFactory queueMessageHandlerFactory() {

}

@Configuration(proxyBeanMethods = false)
static class ConfigurationWithCustomQueueMessagingTemplate {

@Bean
QueueMessagingTemplate customQueueMessagingTemplate() {
return mock(QueueMessagingTemplate.class);
}

}

@Configuration(proxyBeanMethods = false)
static class ConfigurationWithMissingAwsCredentials {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.core.env.ResourceIdResolver;
import io.awspring.cloud.messaging.core.support.AbstractMessageChannelMessagingSendingTemplate;
import io.awspring.cloud.messaging.support.destination.DynamicQueueUrlDestinationResolver;
Expand Down Expand Up @@ -50,11 +51,11 @@ public class QueueMessagingTemplate extends AbstractMessageChannelMessagingSendi
private final AmazonSQSAsync amazonSqs;

public QueueMessagingTemplate(AmazonSQSAsync amazonSqs) {
this(amazonSqs, (ResourceIdResolver) null, null);
this(amazonSqs, null, (ObjectMapper) null);
}

public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
this(amazonSqs, resourceIdResolver, null);
this(amazonSqs, resourceIdResolver, (ObjectMapper) null);
}

/**
Expand All @@ -72,6 +73,11 @@ public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resou
this(amazonSqs, new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver), messageConverter);
}

public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver,
ObjectMapper objectMapper) {
this(amazonSqs, new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver), null, objectMapper);
}

/**
* Initializes the messaging template by configuring the destination resolver as well
* as the message converter. Uses the {@link DynamicQueueUrlDestinationResolver} with
Expand All @@ -86,9 +92,14 @@ public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resou
*/
public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, DestinationResolver<String> destinationResolver,
MessageConverter messageConverter) {
this(amazonSqs, destinationResolver, messageConverter, null);
}

public QueueMessagingTemplate(AmazonSQSAsync amazonSqs, DestinationResolver<String> destinationResolver,
MessageConverter messageConverter, ObjectMapper objectMapper) {
super(destinationResolver);
this.amazonSqs = amazonSqs;
initMessageConverter(messageConverter);
initMessageConverter(messageConverter, objectMapper);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -96,7 +98,10 @@ protected D resolveMessageChannelByLogicalName(String destination) {
}

protected void initMessageConverter(MessageConverter messageConverter) {
this.initMessageConverter(messageConverter, null);
}

protected void initMessageConverter(MessageConverter messageConverter, ObjectMapper objectMapper) {
StringMessageConverter stringMessageConverter = new StringMessageConverter();
stringMessageConverter.setSerializedPayloadClass(String.class);

Expand All @@ -109,6 +114,9 @@ protected void initMessageConverter(MessageConverter messageConverter) {
else {
MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
mappingJackson2MessageConverter.setSerializedPayloadClass(String.class);
if (objectMapper != null) {
mappingJackson2MessageConverter.setObjectMapper(objectMapper);
}
messageConverters.add(mappingJackson2MessageConverter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.core.env.ResourceIdResolver;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -186,8 +187,8 @@ void instantiation_withConverter_shouldAddItToTheCompositeConverter() {
@Test
void instantiation_withoutConverter_shouldAddDefaultJacksonConverterToTheCompositeConverter() {
// Act
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(createAmazonSqs(),
(ResourceIdResolver) null, null);
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(createAmazonSqs(), null,
(ObjectMapper) null);

// Assert
assertThat(((CompositeMessageConverter) queueMessagingTemplate.getMessageConverter()).getConverters())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
"cdk": "cdk"
},
"devDependencies": {
"@aws-cdk/assert": "1.83.0",
"@aws-cdk/assert": "1.143.0",
"@types/jest": "^26.0.10",
"@types/node": "10.17.27",
"jest": "^26.4.2",
"ts-jest": "^26.2.0",
"aws-cdk": "1.83.0",
"aws-cdk": "1.143.0",
"ts-node": "^9.0.0",
"typescript": "~3.9.7"
},
"dependencies": {
"@aws-cdk/aws-sqs": "^1.86.0",
"@aws-cdk/core": "^1.83.0",
"@aws-cdk/aws-sqs": "^1.143.0",
"@aws-cdk/core": "^1.143.0",
"source-map-support": "^0.5.16"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,28 @@

package io.awspring.cloud.sqs.sample;

import java.time.LocalDate;

public class Person {

private String firstName;

private String lastName;

public Person(String firstName, String lastName) {
private LocalDate birthDate;

public Person(String firstName, String lastName, LocalDate birthDate) {
this.firstName = firstName;
this.lastName = lastName;
this.birthDate = birthDate;
}

public LocalDate getBirthDate() {
return birthDate;
}

public void setBirthDate(LocalDate birthDate) {
this.birthDate = birthDate;
}

public String getFirstName() {
Expand All @@ -45,7 +58,8 @@ public void setLastName(String lastName) {

@Override
public String toString() {
return "Person{" + "firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + '}';
return "Person{" + "firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", birthDate="
+ birthDate + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.awspring.cloud.sqs.sample;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import java.time.LocalDate;

import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import io.awspring.cloud.messaging.listener.annotation.SqsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand All @@ -34,9 +34,8 @@ public class SqsSampleApplication {

private final QueueMessagingTemplate queueMessagingTemplate;

@Autowired
public SqsSampleApplication(AmazonSQSAsync amazonSqs) {
this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
SqsSampleApplication(QueueMessagingTemplate queueMessagingTemplate) {
this.queueMessagingTemplate = queueMessagingTemplate;
}

private static final Logger LOGGER = LoggerFactory.getLogger(SqsSampleApplication.class);
Expand All @@ -49,7 +48,8 @@ public static void main(String[] args) {
public void sendMessage() {
this.queueMessagingTemplate.send("InfrastructureStack-spring-aws",
MessageBuilder.withPayload("Spring cloud Aws SQS sample!").build());
this.queueMessagingTemplate.convertAndSend("InfrastructureStack-aws-pojo", new Person("Joe", "Doe"));
this.queueMessagingTemplate.convertAndSend("InfrastructureStack-aws-pojo",
new Person("Joe", "Doe", LocalDate.of(2000, 1, 12)));
}

@SqsListener("InfrastructureStack-spring-aws")
Expand Down

0 comments on commit 77dff95

Please sign in to comment.