diff --git a/build.gradle b/build.gradle index 8cfc59e..8535be0 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,7 @@ dependencies { compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion") compileOnly "io.micronaut:micronaut-inject" compileOnly "io.micronaut.validation:micronaut-validation" - compileOnly "io.micronaut.rxjava2:micronaut-rxjava2" + compileOnly "io.micronaut.reactor:micronaut-reactor" compileOnly "io.micronaut:micronaut-jackson-databind" // kestra diff --git a/src/main/java/io/kestra/plugin/pulsar/Produce.java b/src/main/java/io/kestra/plugin/pulsar/Produce.java index c9b38fc..2ec5af6 100644 --- a/src/main/java/io/kestra/plugin/pulsar/Produce.java +++ b/src/main/java/io/kestra/plugin/pulsar/Produce.java @@ -7,8 +7,6 @@ import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.pulsar.client.api.*; @@ -27,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import jakarta.validation.constraints.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -186,25 +186,25 @@ public Output run(RunContext runContext) throws Exception { Integer count = 1; if (this.from instanceof String || this.from instanceof List) { - Flowable flowable; - Flowable resultFlowable; + Flux flowable; + Flux resultFlowable; if (this.from instanceof String) { URI from = new URI(runContext.render((String) this.from)); try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) { - flowable = Flowable.create(FileSerde.reader(inputStream), BackpressureStrategy.BUFFER); + flowable = Flux.create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER); resultFlowable = this.buildFlowable(flowable, runContext, producer); count = resultFlowable .reduce(Integer::sum) - .blockingGet(); + .block(); } } else { - flowable = Flowable.fromArray(((List) this.from).toArray()); + flowable = Flux.fromArray(((List) this.from).toArray()); resultFlowable = this.buildFlowable(flowable, runContext, producer); count = resultFlowable .reduce(Integer::sum) - .blockingGet(); + .block(); } } else { this.produceMessage(producer, runContext, (Map) this.from); @@ -222,12 +222,12 @@ public Output run(RunContext runContext) throws Exception { } @SuppressWarnings("unchecked") - private Flowable buildFlowable(Flowable flowable, RunContext runContext, Producer producer) { + private Flux buildFlowable(Flux flowable, RunContext runContext, Producer producer) throws Exception { return flowable - .map(row -> { + .map(throwFunction(row -> { this.produceMessage(producer, runContext, (Map) row); return 1; - }); + })); } @SuppressWarnings("unchecked")