diff --git a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java index beec9af7ad..51f8c01317 100644 --- a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java +++ b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java @@ -7,6 +7,8 @@ package io.camunda.connector.inbound; import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.QueueAttributeName; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import io.camunda.connector.api.annotation.InboundConnector; import io.camunda.connector.api.inbound.Activity; import io.camunda.connector.api.inbound.Health; @@ -22,6 +24,7 @@ import io.camunda.connector.generator.java.annotation.ElementTemplate.ConnectorElementType; import io.camunda.connector.generator.java.annotation.ElementTemplate.PropertyGroup; import io.camunda.connector.inbound.model.SqsInboundProperties; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -63,6 +66,7 @@ templateNameOverride = "Amazon SQS Boundary Event Connector") }) public class SqsExecutable implements InboundConnectorExecutable { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class); private final AmazonSQSClientSupplier sqsClientSupplier; private final ExecutorService executorService; @@ -100,6 +104,16 @@ public void activate(final InboundConnectorContext context) { amazonSQS = sqsClientSupplier.sqsClient( CredentialsProviderSupport.credentialsProvider(properties), region); + + try { + amazonSQS.getQueueAttributes( + properties.getQueue().url(), + List.of(QueueAttributeName.ApproximateNumberOfMessages.toString())); + } catch (QueueDoesNotExistException e) { + LOGGER.error("Queue does not exist, failing subscription activation"); + throw new RuntimeException("Queue does not exist: " + properties.getQueue().url()); + } + LOGGER.debug("SQS client created successfully"); if (sqsQueueConsumer == null) { sqsQueueConsumer = new SqsQueueConsumer(amazonSQS, properties, context); diff --git a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java index f7f2aec09b..dc1922a8c2 100644 --- a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java +++ b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java @@ -54,6 +54,11 @@ public void run() { do { try { receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); + } catch (Exception e) { + LOGGER.error("Failed to receive messages from SQS queue", e); + continue; + } + try { List messages = receiveMessageResult.getMessages(); for (Message message : messages) { context.log( diff --git a/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java b/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java index 7989b4de06..48b0df5b8e 100644 --- a/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java +++ b/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java @@ -9,6 +9,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.file.Files.readString; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -19,6 +20,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.fasterxml.jackson.core.type.TypeReference; @@ -110,6 +112,9 @@ public void activateTest(Map properties) throws InterruptedExcep @Test public void deactivateTest() { // Given + when(sqsClient.getQueueAttributes(any(), any())).thenReturn(null); + when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION))) + .thenReturn(sqsClient); Map properties = Map.of( "authentication", @@ -133,6 +138,30 @@ public void deactivateTest() { assertThat(executorService.isShutdown()).isTrue(); } + @Test + public void nonExistingQueueTest() { + // Given + when(sqsClient.getQueueAttributes(any(), any())).thenThrow(new QueueDoesNotExistException("")); + when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION))) + .thenReturn(sqsClient); + Map properties = + Map.of( + "authentication", + Map.of( + "secretKey", ACTUAL_SECRET_KEY, + "accessKey", ACTUAL_ACCESS_KEY), + "configuration", + Map.of("region", "us-east-1"), + "queue", + Map.of("url", ACTUAL_QUEUE_URL, "pollingWaitTime", "1")); + var context = createConnectorContext(properties, createDefinition()); + consumer = new SqsQueueConsumer(sqsClient, new SqsInboundProperties(), context); + consumer.setQueueConsumerActive(true); + SqsExecutable sqsExecutable = new SqsExecutable(supplier, executorService, consumer); + // When & then + assertThrows(RuntimeException.class, () -> sqsExecutable.activate(context)); + } + private InboundConnectorDefinition createDefinition() { var element = new ProcessElement("proc-id", 1, 2, "element-id", ""); return InboundConnectorDefinitionBuilder.create().elements(element).type("type").build();