Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(amqp): allow an incomplete URL as parameter #34

Merged
merged 6 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions src/main/java/io/kestra/plugin/amqp/AbstractAmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -32,11 +33,11 @@ public ConnectionFactory connectionFactory(RunContext runContext) throws Excepti
}

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(runContext.render(host));
factory.setPort(Integer.parseInt(runContext.render(port)));
factory.setUsername(runContext.render(username));
factory.setPassword(runContext.render(password));
factory.setVirtualHost(runContext.render(virtualHost));
Optional.ofNullable(runContext.render(host)).ifPresent(factory::setHost);
Optional.ofNullable(runContext.render(port)).map(Integer::parseInt).ifPresent(factory::setPort);
Optional.ofNullable(runContext.render(username)).ifPresent(factory::setUsername);
Optional.ofNullable(runContext.render(password)).ifPresent(factory::setPassword);
Optional.ofNullable(runContext.render(virtualHost)).ifPresent(factory::setVirtualHost);

factory.setExceptionHandler(new AmqpExceptionHandler(runContext.logger()));

Expand All @@ -47,14 +48,18 @@ void parseFromUrl(RunContext runContext, String url) throws IllegalVariableEvalu
URI amqpUri = new URI(runContext.render(url));

host = amqpUri.getHost();
yvrng marked this conversation as resolved.
Show resolved Hide resolved
port = String.valueOf(amqpUri.getPort());
if (amqpUri.getPort() != -1) {
port = String.valueOf(amqpUri.getPort());
}

String auth = amqpUri.getUserInfo();
int pos = auth.indexOf(':');
username = pos > 0 ? auth.substring(0, pos) : auth;
password = pos > 0 ? auth.substring(pos + 1) : "";
if (auth != null) {
int pos = auth.indexOf(':');
username = pos > 0 ? auth.substring(0, pos) : auth;
password = pos > 0 ? auth.substring(pos + 1) : "";
}

if (!amqpUri.getPath().equals("")) {
if (!amqpUri.getPath().isEmpty()) {
virtualHost = amqpUri.getPath();
}
}
Expand Down
66 changes: 64 additions & 2 deletions src/test/java/io/kestra/plugin/amqp/AMQPTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.ConnectionFactory;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.FileSerde;
Expand All @@ -21,8 +22,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
Expand All @@ -37,6 +36,69 @@ class AMQPTest {
@Inject
protected StorageInterface storageInterface;

@Test
void createConnectionFactoryWithDefaultValues() throws Exception {
Publish push = Publish.builder().build();

ConnectionFactory connectionFactory = push.connectionFactory(runContextFactory.of());
assertThat(connectionFactory.getHost(), is(ConnectionFactory.DEFAULT_HOST));
assertThat(connectionFactory.getPort(), is(ConnectionFactory.DEFAULT_AMQP_PORT));
assertThat(connectionFactory.getUsername(), is(ConnectionFactory.DEFAULT_USER));
assertThat(connectionFactory.getPassword(), is(ConnectionFactory.DEFAULT_PASS));
assertThat(connectionFactory.getVirtualHost(), is(ConnectionFactory.DEFAULT_VHOST));
}

@Test
void createConnectionFactoryWithUriOnly() throws Exception {
Publish push = Publish.builder()
.url("amqp://kestra:[email protected]:12345/my_vhost")
.build();

ConnectionFactory connectionFactory = push.connectionFactory(runContextFactory.of());
assertThat(connectionFactory.getHost(), is("example.org"));
assertThat(connectionFactory.getPort(), is(12345));
assertThat(connectionFactory.getUsername(), is("kestra"));
assertThat(connectionFactory.getPassword(), is("K3str4"));
assertThat(connectionFactory.getVirtualHost(), is("/my_vhost"));
}

@Test
void createConnectionFactoryWithFieldsOnly() throws Exception {
Publish push = Publish.builder()
.host("example.org")
.port("12345")
.username("kestra")
.password("K3str4")
.virtualHost("/my_vhost")
.build();

ConnectionFactory connectionFactory = push.connectionFactory(runContextFactory.of());
assertThat(connectionFactory.getHost(), is("example.org"));
assertThat(connectionFactory.getPort(), is(12345));
assertThat(connectionFactory.getUsername(), is("kestra"));
assertThat(connectionFactory.getPassword(), is("K3str4"));
assertThat(connectionFactory.getVirtualHost(), is("/my_vhost"));
}

@Test
void createConnectionFactoryWithUriAndFields() throws Exception {
Publish push = Publish.builder()
.url("amqp://example.org") // values extracted from the URI are considered first
.host("ignore.it")
.port("12345")
.username("kestra")
.password("K3str4")
.virtualHost("/my_vhost")
.build();

ConnectionFactory connectionFactory = push.connectionFactory(runContextFactory.of());
assertThat(connectionFactory.getHost(), is("example.org"));
assertThat(connectionFactory.getPort(), is(12345));
assertThat(connectionFactory.getUsername(), is("kestra"));
assertThat(connectionFactory.getPassword(), is("K3str4"));
assertThat(connectionFactory.getVirtualHost(), is("/my_vhost"));
}
yvrng marked this conversation as resolved.
Show resolved Hide resolved

@Test
void pushAsList() throws Exception {
Publish push = Publish.builder()
Expand Down