Skip to content

Commit

Permalink
Merge pull request #61 from kestra-io/fix/fix-npe-with-result-flowabl…
Browse files Browse the repository at this point in the history
…e-block

fix(): changed block() to blockOptional()
  • Loading branch information
mgabelle authored Sep 5, 2024
2 parents c3ac68f + 0eb6230 commit 09ae9ca
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions src/main/java/io/kestra/plugin/amqp/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
.reduce(Integer::sum)
.block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
Expand All @@ -128,9 +126,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
.reduce(Integer::sum)
.block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
publish(channel, JacksonMapper.toMap(runContext.render((Map<String, Object>) this.from), Message.class), runContext);
}
Expand Down

0 comments on commit 09ae9ca

Please sign in to comment.