From 404c0e62c749d149e1080555e4c78835efca6c5c Mon Sep 17 00:00:00 2001 From: emilb Date: Thu, 2 Jan 2025 11:48:29 +0100 Subject: [PATCH] Implement remaining methods, including reactive lib connections --- flows/pom.xml | 30 +++ .../java/com/softwaremill/jox/flows/Flow.java | 60 +++--- .../com/softwaremill/jox/flows/Flows.java | 193 ++++++++++++++++++ .../jox/flows/FromFlowPublisher.java | 185 +++++++++++++++++ .../jox/flows/FlowPublisherTckTest.java | 76 +++++++ .../jox/flows/FlowsProjectReactorTest.java | 88 ++++++++ .../com/softwaremill/jox/flows/FlowsTest.java | 143 +++++++++++++ 7 files changed, 752 insertions(+), 23 deletions(-) create mode 100644 flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java create mode 100644 flows/src/test/java/com/softwaremill/jox/flows/FlowPublisherTckTest.java create mode 100644 flows/src/test/java/com/softwaremill/jox/flows/FlowsProjectReactorTest.java diff --git a/flows/pom.xml b/flows/pom.xml index b9c4b9c..90090d3 100644 --- a/flows/pom.xml +++ b/flows/pom.xml @@ -24,6 +24,36 @@ awaitility test + + io.projectreactor + reactor-core + 3.7.1 + test + + + org.reactivestreams + reactive-streams + 1.0.4 + test + + + org.reactivestreams + reactive-streams-tck-flow + 1.0.4 + test + + + org.testng + testng + 7.10.2 + test + + + org.slf4j + slf4j-api + + + com.softwaremill.jox channels diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index ff84b98..1322aac 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +40,7 @@ import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.CancellableFork; import com.softwaremill.jox.structured.Fork; -import com.softwaremill.jox.structured.Scopes; +import com.softwaremill.jox.structured.Scope; import com.softwaremill.jox.structured.UnsupervisedScope; /** @@ -368,7 +369,7 @@ public Flow mapStateful(Supplier initializeState, StatefulMapper Flow mapStatefulConcat(Supplier initializeState, StatefulMapper> f, OnComplete onComplete) { AtomicReference state = new AtomicReference<>(initializeState.get()); - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { last.run(t -> { Map.Entry> result = f.apply(state.get(), t); for (U u : result.getValue()) { @@ -453,7 +454,7 @@ public Flow flatMap(Function> mappingFunction) { * completes as well. */ public Flow take(int n) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { AtomicInteger taken = new AtomicInteger(0); try { last.run(t -> { @@ -550,8 +551,8 @@ public Flow> groupedWeightedWithin(long minWeight, Duration duration, Fu if (minWeight <= 0) throw new IllegalArgumentException("requirement failed: minWeight must be > 0"); if (duration.toMillis() <= 0) throw new IllegalArgumentException("requirement failed: duration must be > 0"); - return Flows.usingEmit(emit -> { - Scopes.unsupervised(scope -> { + return usingEmit(emit -> { + unsupervised(scope -> { Source flowSource = runToChannel(scope); Channel> outputChannel = Channel.withScopedBufferSize(); Channel timerChannel = Channel.withScopedBufferSize(); @@ -620,7 +621,7 @@ case ChannelError(Throwable cause): private CancellableFork forkTimeout(UnsupervisedScope scope, Channel timerChannel, Duration duration) { return scope.forkCancellable(() -> { - Thread.sleep(duration); + sleep(duration); timerChannel.sendOrClosed(GroupingTimeout.INSTANCE); return null; }); @@ -651,7 +652,7 @@ public Flow> groupedWeighted(long minWeight, Function costFn) { throw new IllegalArgumentException("minWeight must be > 0"); } - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { List buffer = new ArrayList<>(); AtomicLong accumulatedCost = new AtomicLong(0L); last.run(t -> { @@ -674,14 +675,14 @@ public Flow> groupedWeighted(long minWeight, Function costFn) { * Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error). */ public Flow drain() { - return Flows.usingEmit(_ -> last.run(_ -> {})); + return usingEmit(_ -> last.run(_ -> {})); } /** * Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error. */ public Flow onComplete(Runnable f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { try { last.run(emit); } finally { @@ -694,7 +695,7 @@ public Flow onComplete(Runnable f) { * Runs `f` after the flow completes successfully, that is when all elements are emitted. */ public Flow onDone(Runnable f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { last.run(emit); f.run(); }); @@ -704,7 +705,7 @@ public Flow onDone(Runnable f) { * Runs `f` after the flow completes with an error. The error can't be recovered. */ public Flow onError(Consumer f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { try { last.run(emit); } catch (Throwable e) { @@ -734,7 +735,7 @@ public Flow intersperse(T start, T inject, T end) { } private Flow intersperse(Optional start, T inject, Optional end) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { if (start.isPresent()) { emit.apply(start.get()); } @@ -790,7 +791,7 @@ public Flow throttle(int elements, Duration per) { * Whether the flow should also emit the first element that failed the predicate (`false` by default). */ public Flow takeWhile(Predicate f, boolean includeFirstFailing) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { try { last.run(t -> { if (f.test(t)) { @@ -826,7 +827,7 @@ public Flow concat(Flow other) { * Number of elements to be dropped. */ public Flow drop(int n) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { AtomicInteger dropped = new AtomicInteger(0); last.run(t -> { if (dropped.get() < n) { @@ -854,7 +855,7 @@ public Flow drop(int n) { * Should the resulting flow complete when the right flow (`outer`) completes, before `this` flow. */ public Flow merge(Flow other, boolean propagateDoneLeft, boolean propagateDoneRight) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { unsupervised(scope -> { Source c1 = this.runToChannel(scope); Source c2 = other.runToChannel(scope); @@ -895,7 +896,7 @@ public Flow prepend(Flow other) { * An alternative flow to be used when this flow is empty. */ public Flow orElse(Flow alternative) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { AtomicBoolean receivedAtLeastOneElement = new AtomicBoolean(false); last.run(t -> { emit.apply(t); @@ -938,7 +939,7 @@ public Flow interleave(Flow other, int segmentSize, boolean eagerCompl * returned flow. If the result of `f` is empty, nothing is emitted by the returned channel. */ public Flow mapConcat(Function> f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { last.run(t -> { for (U u : f.apply(t)) { emit.apply(u); @@ -961,7 +962,7 @@ public Flow mapConcat(Function> f) { * The mapping function. */ public Flow mapPar(int parallelism, Function f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { Semaphore semaphore = new Semaphore(parallelism); Channel>> inProgress = new Channel<>(parallelism); Channel results = Channel.withScopedBufferSize(); @@ -969,7 +970,7 @@ public Flow mapPar(int parallelism, Function f) { // creating a nested scope, so that in case of errors, we can clean up any mapping forks in a "local" fashion, // that is without closing the main scope; any error management must be done in the forks, as the scope is // unsupervised - Scopes.unsupervised(scope -> { + unsupervised(scope -> { // a fork which runs the `last` pipeline, and for each emitted element creates a fork // notifying only the `results` channels, as it will cause the scope to end, and any other forks to be // interrupted, including the inProgress-fork, which might be waiting on a join() @@ -1029,7 +1030,7 @@ public Flow mapPar(int parallelism, Function f) { * The mapping function. */ public Flow mapParUnordered(int parallelism, Function f) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { Channel results = Channel.withScopedBufferSize(); Semaphore s = new Semaphore(parallelism); unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline @@ -1075,7 +1076,7 @@ public Flow> sliding(int n, int step) { if (n <= 0) throw new IllegalArgumentException("n must be > 0"); if (step <= 0) throw new IllegalArgumentException("step must be > 0"); - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { final AtomicReference> buf = new AtomicReference<>(new ArrayList<>()); last.run(t -> { var buffer = buf.get(); @@ -1113,7 +1114,7 @@ public Flow> sliding(int n, int step) { * @see #alsoToTap for a version that drops elements when the `other` sink is not available for receive. */ public Flow alsoTo(Sink other) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { try { last.run(t -> { try { @@ -1144,7 +1145,7 @@ public Flow alsoTo(Sink other) { * @see #alsoTo for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink. */ public Flow alsoToTap(Sink other) { - return Flows.usingEmit(emit -> { + return usingEmit(emit -> { try { last.run(t -> { try { @@ -1161,6 +1162,19 @@ public Flow alsoToTap(Sink other) { }); } + /** Converts this {@link Flow} into a {@link Publisher}. The flow is run every time the publisher is subscribed to. + *

+ * Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should + * remain active as long as the publisher is used. + *

+ * Elements emitted by the flow are buffered, using a buffer of capacity given by the {@link Channel#BUFFER_SIZE} in scope or default value {@link Channel#DEFAULT_BUFFER_SIZE} is used. + *

+ * The returned publisher implements the JDK 9+ {@code Flow.Publisher} API. + */ + public Publisher toPublisher(Scope scope) { + return new FromFlowPublisher<>(scope, last); + } + // endregion private void forkPropagate(UnsupervisedScope unsupervisedScope, Sink propagateExceptionsTo, Callable runnable) { diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java index c24a535..cd3752e 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java @@ -2,14 +2,27 @@ import static com.softwaremill.jox.structured.Scopes.unsupervised; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -19,6 +32,7 @@ import com.softwaremill.jox.ChannelError; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Fork; +import com.softwaremill.jox.structured.Scopes; public final class Flows { @@ -226,6 +240,117 @@ public static Flow failed(Exception t) { }); } + /** + * Creates a flow which emits the first element ({@link Map.Entry#getKey()}) of entries returned by repeated applications of `f`. + * The `initial` state is used for the first application, and then the state is updated with the second element of the entry. Emission stops when `f` returns {@link Optional#empty()}, + * otherwise it continues indefinitely. + */ + public static Flow unfold(S initial, Function>> f) { + return usingEmit(emit -> { + S s = initial; + while (true) { + var result = f.apply(s); + if (result.isEmpty()) { + break; + } + Map.Entry entry = result.get(); + emit.apply(entry.getKey()); + s = entry.getValue(); + } + }); + } + + /** Creates a Flow from a Publisher, that is, which emits the elements received by subscribing to the publisher. A new + * subscription is created every time this flow is run. + *

+ * The data is passed from a subscription to the flow using a Channel, with a capacity given by the {@link Channel#BUFFER_SIZE} in + * scope or {@link Channel#DEFAULT_BUFFER_SIZE} is used. That's also how many elements will be at most requested from the publisher at a time. + *

+ * The publisher parameter should implement the JDK 9+ Flow.Publisher API + */ + public static Flow fromPublisher(Publisher p) { + return usingEmit(emit -> { + // using an unsafe scope for efficiency + Scopes.unsupervised(scope -> { + Channel channel = Channel.withScopedBufferSize(); + int capacity = Channel.BUFFER_SIZE.orElse(Channel.DEFAULT_BUFFER_SIZE); + int demandThreshold = (int) Math.ceil(capacity / 2.0); + + // used to "extract" the subscription that is set in the subscription running in a fork + AtomicReference subscriptionRef = new AtomicReference<>(); + Subscription subscription = null; + + int toDemand = 0; + + try { + // unsafe, but we are sure that this won't throw any exceptions (unless there's a bug in the publisher) + scope.forkUnsupervised(() -> { + p.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionRef.set(s); + s.request(capacity); + } + + @Override + public void onNext(T t) { + try { + channel.send(t); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + channel.error(t); + } + + @Override + public void onComplete() { + channel.done(); + } + }); + return null; + }); + + while (true) { + Object t = channel.receiveOrClosed(); + if (t instanceof ChannelDone) { + break; + } else if (t instanceof ChannelError error) { + throw error.toException(); + } else { + //noinspection unchecked + emit.apply((T) t); + + // if we have an element, onSubscribe must have already happened; we can read the subscription and cache it for later + if (subscription == null) { + subscription = subscriptionRef.get(); + } + + // now that we've received an element from the channel, we can request more + toDemand += 1; + // we request in batches, to avoid too many requests + if (toDemand >= demandThreshold) { + subscription.request(toDemand); + toDemand = 0; + } + } + } + // exceptions might be propagated from the channel, but they might also originate from an interruption + return null; + } catch (Exception e) { + Subscription s = subscriptionRef.get(); + if (s != null) { + s.cancel(); + } + throw e; + } + }); + }); + } + /** * Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order * of elements in all flows is preserved. @@ -303,4 +428,72 @@ public static Flow interleaveAll(List> flows, int segmentSize, bo }); } } + + /** + * Converts a {@link java.io.InputStream} into a `Flow`. + * + * @param is + * an `InputStream` to read bytes from. + * @param chunkSize + * maximum number of bytes to read from the underlying `InputStream` before emitting a new chunk. + */ + public static Flow fromInputStream(InputStream is, int chunkSize) { + return usingEmit(emit -> { + try (is) { + while (true) { + byte[] buf = new byte[chunkSize]; + int readBytes = is.read(buf); + if (readBytes == -1) { + break; + } else { + if (readBytes > 0) { + emit.apply(readBytes == chunkSize ? buf : Arrays.copyOf(buf, readBytes)); + } + } + } + } + }); + } + + /** + * Creates a flow that emits byte chunks read from a file. + * + * @param path + * path the file to read from. + * @param chunkSize + * maximum number of bytes to read from the file before emitting a new chunk. + */ + public static Flow fromFile(Path path, int chunkSize) { + return usingEmit(emit -> { + if (Files.isDirectory(path)) { + throw new IOException("Path %s is a directory".formatted(path)); + } + SeekableByteChannel fileChannel; + try { + fileChannel = FileChannel.open(path, StandardOpenOption.READ); + } catch (UnsupportedOperationException e) { + // Some file systems don't support file channels + fileChannel = Files.newByteChannel(path, StandardOpenOption.READ); + } + + try { + while (true) { + ByteBuffer buf = ByteBuffer.allocate(chunkSize); + int readBytes = fileChannel.read(buf); + if (readBytes < 0) { + break; + } else { + if (readBytes > 0) { + byte[] byteArray = new byte[readBytes]; + buf.flip(); + buf.get(byteArray, 0, readBytes); + emit.apply(byteArray); + } + } + } + } finally { + fileChannel.close(); + } + }); + } } diff --git a/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java new file mode 100644 index 0000000..33e8cf8 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/FromFlowPublisher.java @@ -0,0 +1,185 @@ +package com.softwaremill.jox.flows; + +import static com.softwaremill.jox.Select.selectOrClosed; +import static com.softwaremill.jox.structured.Scopes.unsupervised; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.ChannelDone; +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.Sink; +import com.softwaremill.jox.structured.Scope; +import com.softwaremill.jox.structured.UnsupervisedScope; + +class FromFlowPublisher implements Flow.Publisher { + + private final Scope scope; + private final FlowStage last; + + FromFlowPublisher(Scope scope, FlowStage last) { + this.scope = scope; + this.last = last; + } + + // 1.10: subscribe can be called multiple times; each time, the flow is started from scratch + // 1.11: subscriptions are unicast + @Override + public void subscribe(Flow.Subscriber subscriber) { + if (subscriber == null) throw new NullPointerException("1.9: subscriber is null"); + // 3.13: the reference to the subscriber is held only as long as the main loop below runs + // 3.14: not in this implementation + + // `runToSubscriber` blocks as long as data is produced by the flow or until the subscription is cancelled + // we cannot block `subscribe` (see https://github.com/reactive-streams/reactive-streams-jvm/issues/393), + // hence running in a fork; however, the reactive library might run .subscribe on a different thread, that's + // why we need to use the external runner functionality + scope.fork(() -> { + runToSubscriber(subscriber); + return null; + }); + } + + private void runToSubscriber(Flow.Subscriber subscriber) throws ExecutionException, InterruptedException { + // starting a new scope so that cancelling (== completing the main body) cleans up (interrupts) any background forks + // using an unsafe scope for efficiency, we only ever start a single fork where all errors are propagated + unsupervised(scope -> { + // processing state: cancelled flag, error sent flag, demand + final AtomicBoolean cancelled = new AtomicBoolean(false); + final AtomicBoolean errorSent = new AtomicBoolean(false); + final AtomicLong demand = new AtomicLong(0L); + + try { + Channel signals = Channel.newUnlimitedChannel(); + // 1.9: onSubscribe must be called first + subscriber.onSubscribe(new FlowSubscription(signals)); + + // we need separate error & data channels so that we can select from error & signals only, without receiving data + // 1.4 any errors from running the flow end up here + Channel errors = Channel.newUnlimitedChannel(); + Channel data = Channel.withScopedBufferSize(); + + // running the flow in the background; all errors end up as an error of the `errors` channel + forkPropagate(scope, errors, () -> { + last.run(data::send); + data.done(); + return null; + }); + + Runnable cancel = () -> cancelled.set(true); + Consumer signalErrorAndCancel = e -> { + if (!cancelled.get()) { + cancel.run(); + errorSent.set(true); + subscriber.onError(e); + } + }; + + Consumer increaseDemand = d -> { + if (d <= 0) signalErrorAndCancel.accept(new IllegalArgumentException("3.9: demand must be positive")); + else { + demand.addAndGet(d); + // 3.17: when demand overflows `Long.MaxValue`, this is treated as the signalled demand to be "effectively unbounded" + if (demand.get() < 0) demand.set(Long.MAX_VALUE); + } + }; + + // main processing loop: running as long as flow is not completed or error was received + while (!cancelled.get()) { // 1.7, 3.12 - ending the main loop after onComplete/onError + if (demand.get() == 0) { + switch (selectOrClosed(errors.receiveClause(), signals.receiveClause())) { + case Request r -> increaseDemand.accept(r.n()); + case Cancel _ -> cancel.run(); + case DummyError _, ChannelDone _ -> {} // impossible as channel done should be received only from `data`, and error from `errors` is handled in the next branch + case ChannelError e -> { // only `errors` can be closed due to an error + cancel.run(); + errorSent.set(true); + subscriber.onError(e.toException()); + } + default -> + throw new IllegalStateException("unexpected clause result"); + } + } else { + switch (selectOrClosed(errors.receiveClause(), data.receiveClause(), signals.receiveClause())) { + case Request r -> increaseDemand.accept(r.n()); + case Cancel _ -> cancel.run(); + case DummyError _ -> {} // impossible + case ChannelDone _ -> { // only `data` can be done + cancel.run(); // 1.6: when signalling onComplete/onError, the subscription is considered cancelled + subscriber.onComplete(); // 1.5 + } + case ChannelError e -> { // only `errors` can be closed due to an error + cancel.run(); + errorSent.set(true); + subscriber.onError(e.toException()); + } + case Object o -> { + //noinspection unchecked + subscriber.onNext((T) o); + demand.decrementAndGet(); + } + } + } + } + } catch (Throwable e) { + // e might be an interrupted exception (the scope ends), or a bug; either way, letting downstream know + if (!errorSent.get()) subscriber.onError(e); + } + return null; + }); + } + + private void forkPropagate(UnsupervisedScope unsupervisedScope, Sink propagateExceptionsTo, Callable runnable) { + unsupervisedScope.forkUnsupervised(() -> { + try { + runnable.call(); + } catch (Exception e) { + propagateExceptionsTo.errorOrClosed(e); + } + return null; + }); + } + + private interface DummyError {} + + /** Signals sent from a {@link FlowSubscription} to a running {@link Flow.Publisher}. */ + private interface Signal {} + private record Request(long n) implements Signal {} + private record Cancel() implements Signal {} + + + private record FlowSubscription(Sink signals) implements Flow.Subscription { + + // 3.2, 3.4: request/cancel can be called anytime, in a thread-safe way + // 3.3: there's no recursion between request & onNext + // 3.6: after a cancel, more requests can be sent to the channel, but they won't be processed (the cancel will be processed first) + // 3.15: the signals channel is never closed + @Override + public void request(long n) { + try { + signals.send(new Request(n)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // 3.5: as above for 3.2 + // 3.7: as above for 3.6 + // 3.16: as above for 3.15 + @Override + public void cancel() { + try { + signals.send(new Cancel()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // 3.10, 3.11: no synchronous calls in this implementation + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowPublisherTckTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowPublisherTckTest.java new file mode 100644 index 0000000..a0865e9 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowPublisherTckTest.java @@ -0,0 +1,76 @@ +package com.softwaremill.jox.flows; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.atomic.AtomicReference; + +import com.softwaremill.jox.structured.Scope; +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.FlowPublisherVerification; + +public class FlowPublisherTckTest { + + private final AtomicReference scope = new AtomicReference<>(); + private final FlowPublisherVerification verification = new FlowPublisherVerification<>(new TestEnvironment()) { + @Override + public Publisher createFlowPublisher(long l) { + Flow flow = Flows.range(1, (int) l, 1); + return flow.toPublisher(scope.get()); + } + + @Override + public Publisher createFailedFlowPublisher() { + return Flows.failed(new RuntimeException("boom")).toPublisher(scope.get()); + } + }; + + @Test + void verifyTckScenarios() throws ExecutionException, InterruptedException { + List errors = new ArrayList<>(); + // We are invoking tests manually as we need to set separate supervised scope for each test + for (Method method : verification.getClass().getMethods()) { + if (method.getAnnotation(org.testng.annotations.Test.class) != null) { + if (method.getName().startsWith("untested_")) { + continue; + } + Scopes.supervised(s -> { + scope.set(s); + try { + method.invoke(verification); + } catch (InvocationTargetException e) { + handleInvocationTargetException(method, e); + errors.add(e.getCause()); + } + return null; + }); + scope.set(null); + } + } + assertTrue(errors.isEmpty(), "Test suite returned errors"); + } + + private static void handleInvocationTargetException(Method method, Throwable e) { + Throwable cause = e.getCause(); + String errorMessage = String.format("Error in method %s:%n%s%n%s", + method.getName(), + cause.getMessage(), + getStackTrace(cause)); + System.err.println(errorMessage); + } + + private static String getStackTrace(Throwable t) { + StringBuilder sb = new StringBuilder(); + for (StackTraceElement element : t.getStackTrace()) { + sb.append(element.toString()).append("\n"); + } + return sb.toString(); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowsProjectReactorTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsProjectReactorTest.java new file mode 100644 index 0000000..85e7b93 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsProjectReactorTest.java @@ -0,0 +1,88 @@ +package com.softwaremill.jox.flows; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; +import org.reactivestreams.FlowAdapters; +import reactor.core.publisher.Flux; + +public class FlowsProjectReactorTest { + + @Test + void simpleFlowShouldEmitElementsToBeProcessedByFlux() throws ExecutionException, InterruptedException { + Scopes.supervised(scope -> { + // given + var flow = Flows.range(1, 4, 1); + + // when + var result = Flux.from(FlowAdapters.toPublisher(flow.toPublisher(scope))) + .map(i -> i * 2) + .collectList() + .block(); + + // then + assertEquals(List.of(2, 4, 6, 8), result); + return null; + }); + } + + @Test + void concurrentFlowShouldEmitElementsToBeProcessedByFlux() throws ExecutionException, InterruptedException { + Scopes.supervised(scope -> { + // given + var flow = Flows.tick(Duration.ofMillis(100), "x") + .merge(Flows.tick(Duration.ofMillis(200), "y"), false, false) + .take(5); + + // when + var result = Flux.from(FlowAdapters.toPublisher(flow.toPublisher(scope))) + .map(s -> s + s) + .collectList() + .block(); + + // then + result.sort(String::compareTo); + assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result); + return null; + }); + } + + @Test + void shouldCreateFlowFromASimplePublisher() throws Exception { + // given + Flux map = Flux.fromStream(IntStream.rangeClosed(1, 4).boxed()) + .map(i -> i * 2); + + // when + List result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(map)) + .runToList(); + + // then + assertEquals(List.of(2, 4, 6, 8), result); + } + + @Test + void shouldCreateFlowFromAConcurrentPublisher() throws Exception { + // given + Flux flux = Flux.interval(Duration.ofMillis(100)) + .map(_ -> "x") + .mergeWith(Flux.interval(Duration.ofMillis(150)) + .map(_ -> "y")) + .take(5); + + // when + List result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(flux)) + .map(s -> s + s) + .runToList(); + + // then + result.sort(String::compareTo); + assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java index 2eb00df..5fada6c 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java @@ -1,19 +1,30 @@ package com.softwaremill.jox.flows; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.softwaremill.jox.ChannelError; @@ -307,4 +318,136 @@ void shouldInterleaveMultipleFlowsUsingSegmentSizeBiggerThan1AndCompleteEagerly( // then assertEquals(List.of(1, 2, 10, 20, 100, 200, 3, 4, 30), result); } + + @Test + void handleEmptyInputStream() throws Exception { + assertEquals(List.of(), Flows.fromInputStream(emptyInputStream(), 1024).runToList()); + } + + @Test + void handleInputStreamShorterThanBufferSize() throws Exception { + assertEquals(List.of("abc"), toStrings(Flows.fromInputStream(inputStream("abc", false), 1024))); + } + + @Test + void handleInputStreamLongerThanBufferSize() throws Exception { + assertEquals(List.of("som", "e t", "ext"), toStrings(Flows.fromInputStream(inputStream("some text", false), 3))); + } + + @Test + void closeInputStreamAfterReadingIt() throws Exception { + TestInputStream is = inputStream("abc", false); + assertFalse(is.isClosed()); + Flows.fromInputStream(is, 1024).runToList(); + assertTrue(is.isClosed()); + } + + @Test + void closeInputStreamAfterFailingWithException() { + TestInputStream is = inputStream("abc", true); + assertFalse(is.isClosed()); + assertThrows(Exception.class, () -> Flows.fromInputStream(is, 1024).runToList()); + assertTrue(is.isClosed()); + } + + @Test + void readContentFromFileSmallerThanChunkSize() throws Exception { + Path path = Files.createTempFile("ox", "test-readfile1"); + Files.write(path, "Test1 file content".getBytes()); + try { + List result = toStrings(Flows.fromFile(path, 1024)); + assertEquals(List.of("Test1 file content"), result); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + void readContentFromFileLargerThanChunkSize() throws Exception { + Path path = Files.createTempFile("ox", "test-readfile1"); + Files.write(path, "Test2 file content".getBytes()); + try { + List result = toStrings(Flows.fromFile(path, 3)); + assertEquals(List.of("Tes", "t2 ", "fil", "e c", "ont", "ent"), result); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + void handleEmptyFile() throws Exception { + Path path = Files.createTempFile("ox", "test-readfile1"); + try { + List result = toStrings(Flows.fromFile(path, 1024)); + assertEquals(List.of(), result); + } finally { + Files.deleteIfExists(path); + } + } + + @Test + void throwExceptionForMissingFile() { + Path path = Paths.get("/no/such/file.txt"); + assertThrows(NoSuchFileException.class, () -> Flows.fromFile(path, 1024).runToList()); + } + + @Test + void throwExceptionIfPathIsDirectory() throws URISyntaxException { + Path path = Paths.get(getClass().getResource("/").toURI()); + IOException exception = assertThrows(IOException.class, () -> Flows.fromFile(path, 1024).runToList()); + assertTrue(exception.getMessage().endsWith("is a directory")); + } + + @Test + void shouldUnfoldFunction() throws Exception { + Flow c = Flows.unfold(0, i -> i < 3 ? Optional.of(Map.entry(i, i + 1)) : Optional.empty()); + assertEquals(List.of(0, 1, 2), c.runToList()); + } + + private List toStrings(Flow source) throws Exception { + return source.runToList().stream() + .map(chunk -> new String(chunk, StandardCharsets.UTF_8)) + .toList(); + } + + private TestInputStream emptyInputStream() { + return new TestInputStream(""); + } + + private TestInputStream inputStream(String text, boolean failing) { + return new TestInputStream(text, failing); + } + + private static class TestInputStream extends ByteArrayInputStream { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final boolean throwOnRead; + + public TestInputStream(String text) { + this(text, false); + } + + public TestInputStream(String text, boolean throwOnRead) { + super(text.getBytes()); + this.throwOnRead = throwOnRead; + } + + @Override + public void close() throws IOException { + closed.set(true); + super.close(); + } + + @Override + public int read(byte[] a) throws IOException { + if (throwOnRead) { + throw new IOException("expected failed read"); + } else { + return super.read(a); + } + } + + public boolean isClosed() { + return closed.get(); + } + } }