Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 30, 2024
1 parent 37fd0bf commit 414f12c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2"
compileOnly "io.micronaut.reactor:micronaut-reactor"
compileOnly "io.micronaut:micronaut-jackson-databind"

// kestra
Expand Down Expand Up @@ -85,6 +85,7 @@ dependencies {

testImplementation platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
testImplementation "io.micronaut.test:micronaut-test-junit5"
testImplementation "io.micronaut.reactor:micronaut-reactor"

// test deps needed only for to have a runner
testImplementation group: "io.kestra", name: "core", version: kestraVersion
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/kestra/plugin/pulsar/Produce.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.pulsar.client.api.*;
Expand All @@ -27,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import static io.kestra.core.utils.Rethrow.throwFunction;

Expand Down Expand Up @@ -186,25 +186,25 @@ public Output run(RunContext runContext) throws Exception {
Integer count = 1;

if (this.from instanceof String || this.from instanceof List) {
Flowable<Object> flowable;
Flowable<Integer> resultFlowable;
Flux<Object> flowable;
Flux<Integer> resultFlowable;
if (this.from instanceof String) {
URI from = new URI(runContext.render((String) this.from));
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream), BackpressureStrategy.BUFFER);
flowable = Flux.create(FileSerde.reader(inputStream), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, runContext, producer);

count = resultFlowable
.reduce(Integer::sum)
.blockingGet();
.block();
}
} else {
flowable = Flowable.fromArray(((List<Object>) this.from).toArray());
flowable = Flux.fromArray(((List<Object>) this.from).toArray());
resultFlowable = this.buildFlowable(flowable, runContext, producer);

count = resultFlowable
.reduce(Integer::sum)
.blockingGet();
.block();
}
} else {
this.produceMessage(producer, runContext, (Map<String, Object>) this.from);
Expand All @@ -222,12 +222,12 @@ public Output run(RunContext runContext) throws Exception {
}

@SuppressWarnings("unchecked")
private Flowable<Integer> buildFlowable(Flowable<Object> flowable, RunContext runContext, Producer<byte[]> producer) {
private Flux<Integer> buildFlowable(Flux<Object> flowable, RunContext runContext, Producer<byte[]> producer) throws Exception {
return flowable
.map(row -> {
.map(throwFunction(row -> {
this.produceMessage(producer, runContext, (Map<String, Object>) row);
return 1;
});
}));
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 414f12c

Please sign in to comment.