Skip to content

Commit

Permalink
feat: use FileSerde.writeAll and buferring for improved performances
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 13, 2024
1 parent d07b766 commit ba063d3
Showing 1 changed file with 1 addition and 3 deletions.
4 changes: 1 addition & 3 deletions src/main/java/io/kestra/plugin/amqp/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@

import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -106,7 +104,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
flowable = FileSerde.readAll(inputStream, Message.class);
resultFlowable = this.buildFlowable(flowable, channel, runContext);

count = resultFlowable
Expand Down

0 comments on commit ba063d3

Please sign in to comment.