Skip to content

Commit

Permalink
fix(): changed block() to blockOptional()
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Sep 5, 2024
1 parent c3ac68f commit 0eb6230
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions src/main/java/io/kestra/plugin/amqp/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
.reduce(Integer::sum)
.block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
}

} else if (this.from instanceof List) {
Expand All @@ -128,9 +126,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
.reduce(Integer::sum)
.block();
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
publish(channel, JacksonMapper.toMap(runContext.render((Map<String, Object>) this.from), Message.class), runContext);
}
Expand Down

0 comments on commit 0eb6230

Please sign in to comment.