Skip to content

Commit

Permalink
fix(sqs): fail fast when queue does not exist (#3415) (#3420)
Browse files Browse the repository at this point in the history
* fix(sqs): fail fast when queue does not exist (#3415)

* fix(sqs): fail fast when queue does not exist

* fix tests

(cherry picked from commit 949a742)

* fix syntax
  • Loading branch information
chillleader authored Oct 1, 2024
1 parent 6cb426c commit 144c075
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package io.camunda.connector.inbound;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
Expand All @@ -15,6 +17,7 @@
import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier;
import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,6 +26,7 @@

@InboundConnector(name = "AWS SQS Inbound", type = "io.camunda:aws-sqs-inbound:1")
public class SqsExecutable implements InboundConnectorExecutable {

private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class);
private final AmazonSQSClientSupplier sqsClientSupplier;
private final ExecutorService executorService;
Expand Down Expand Up @@ -54,6 +58,16 @@ public void activate(final InboundConnectorContext context) {
amazonSQS =
sqsClientSupplier.sqsClient(
CredentialsProviderSupport.credentialsProvider(properties), region);

try {
amazonSQS.getQueueAttributes(
properties.getQueue().getUrl(),
List.of(QueueAttributeName.ApproximateNumberOfMessages.toString()));
} catch (QueueDoesNotExistException e) {
LOGGER.error("Queue does not exist, failing subscription activation");
throw new RuntimeException("Queue does not exist: " + properties.getQueue().getUrl());
}

LOGGER.debug("SQS client created successfully");
if (sqsQueueConsumer == null) {
sqsQueueConsumer = new SqsQueueConsumer(amazonSQS, properties, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public void run() {
do {
try {
receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
} catch (Exception e) {
LOGGER.error("Failed to receive messages from SQS queue", e);
continue;
}
try {
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.readString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
Expand All @@ -19,6 +20,7 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -117,6 +119,30 @@ public void deactivateTest() {
assertThat(executorService.isShutdown()).isTrue();
}

@Test
public void nonExistingQueueTest() {
// Given
when(sqsClient.getQueueAttributes(any(), any())).thenThrow(new QueueDoesNotExistException(""));
when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION)))
.thenReturn(sqsClient);
Map<String, Object> properties =
Map.of(
"authentication",
Map.of(
"secretKey", ACTUAL_SECRET_KEY,
"accessKey", ACTUAL_ACCESS_KEY),
"configuration",
Map.of("region", "us-east-1"),
"queue",
Map.of("url", ACTUAL_QUEUE_URL, "pollingWaitTime", "1"));
var context = createConnectorContext(properties, createDefinition());
consumer = new SqsQueueConsumer(sqsClient, new SqsInboundProperties(), context);
consumer.setQueueConsumerActive(true);
SqsExecutable sqsExecutable = new SqsExecutable(supplier, executorService, consumer);
// When & then
assertThrows(RuntimeException.class, () -> sqsExecutable.activate(context));
}

private InboundConnectorDefinition createDefinition() {
return InboundConnectorDefinitionBuilder.create()
.bpmnProcessId("proc-id")
Expand Down

0 comments on commit 144c075

Please sign in to comment.