Skip to content

Commit

Permalink
refactor(core): remove Worker constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 23, 2024
1 parent 99e7cdd commit 90c1363
Showing 1 changed file with 47 additions and 44 deletions.
91 changes: 47 additions & 44 deletions src/test/java/io/kestra/plugin/pulsar/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.Worker;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.runner.JdbcScheduler;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
Expand Down Expand Up @@ -50,50 +51,52 @@ void flow() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
Worker worker = new Worker(applicationContext, 8, null);
try (
AbstractScheduler scheduler = new JdbcScheduler(
this.applicationContext,
this.flowListenersService
);
) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
queueCount.countDown();
assertThat(execution.getLeft().getFlowId(), is("trigger"));
});

Produce task = Produce.builder()
.id(TriggerTest.class.getSimpleName())
.type(Produce.class.getName())
.uri("pulsar://localhost:26650")
.serializer(SerdeType.JSON)
.topic("tu_trigger")
.from(List.of(
ImmutableMap.builder()
.put("key", "key1")
.put("value", "value1")
.build(),
ImmutableMap.builder()
.put("key", "key2")
.put("value", "value2")
.build()
))
.build();

worker.run();
scheduler.run();

repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/trigger.yaml")));

task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

boolean await = queueCount.await(1, TimeUnit.MINUTES);
assertThat(await, is(true));

Integer trigger = (Integer) receive.blockLast().getTrigger().getVariables().get("messagesCount");

assertThat(trigger, greaterThanOrEqualTo(2));
try (Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null)) {
try (
AbstractScheduler scheduler = new JdbcScheduler(
this.applicationContext,
this.flowListenersService
);
) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
queueCount.countDown();
assertThat(execution.getLeft().getFlowId(), is("trigger"));
});

Produce task = Produce.builder()
.id(TriggerTest.class.getSimpleName())
.type(Produce.class.getName())
.uri("pulsar://localhost:26650")
.serializer(SerdeType.JSON)
.topic("tu_trigger")
.from(List.of(
ImmutableMap.builder()
.put("key", "key1")
.put("value", "value1")
.build(),
ImmutableMap.builder()
.put("key", "key2")
.put("value", "value2")
.build()
))
.build();

worker.run();
scheduler.run();

repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader()
.getResource("flows/trigger.yaml")));

task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

boolean await = queueCount.await(1, TimeUnit.MINUTES);
assertThat(await, is(true));

Integer trigger = (Integer) Objects.requireNonNull(receive.blockLast()).getTrigger().getVariables().get("messagesCount");

assertThat(trigger, greaterThanOrEqualTo(2));
}
}
}
}

0 comments on commit 90c1363

Please sign in to comment.