Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement remaining methods, including reactive lib connections #79

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions flows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,36 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this dependency as sample framework for tests. (Just like Pekko in Ox)

<version>3.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.10.2</version>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required to avoid CVE in version shiped with reactive-streams-tck-flow

<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channels</artifactId>
Expand Down
60 changes: 37 additions & 23 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -39,7 +40,7 @@
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.CancellableFork;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.Scope;
import com.softwaremill.jox.structured.UnsupervisedScope;

/**
Expand Down Expand Up @@ -368,7 +369,7 @@ public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T,
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f, OnComplete<S, U> onComplete) {
AtomicReference<S> state = new AtomicReference<>(initializeState.get());
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(t -> {
Map.Entry<S, Iterable<U>> result = f.apply(state.get(), t);
for (U u : result.getValue()) {
Expand Down Expand Up @@ -453,7 +454,7 @@ public <U> Flow<U> flatMap(Function<T, Flow<U>> mappingFunction) {
* completes as well.
*/
public Flow<T> take(int n) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicInteger taken = new AtomicInteger(0);
try {
last.run(t -> {
Expand Down Expand Up @@ -550,8 +551,8 @@ public Flow<List<T>> groupedWeightedWithin(long minWeight, Duration duration, Fu
if (minWeight <= 0) throw new IllegalArgumentException("requirement failed: minWeight must be > 0");
if (duration.toMillis() <= 0) throw new IllegalArgumentException("requirement failed: duration must be > 0");

return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
return usingEmit(emit -> {
unsupervised(scope -> {
Source<T> flowSource = runToChannel(scope);
Channel<List<T>> outputChannel = Channel.withScopedBufferSize();
Channel<GroupingTimeout> timerChannel = Channel.withScopedBufferSize();
Expand Down Expand Up @@ -620,7 +621,7 @@ case ChannelError(Throwable cause):

private CancellableFork<GroupingTimeout> forkTimeout(UnsupervisedScope scope, Channel<GroupingTimeout> timerChannel, Duration duration) {
return scope.forkCancellable(() -> {
Thread.sleep(duration);
sleep(duration);
timerChannel.sendOrClosed(GroupingTimeout.INSTANCE);
return null;
});
Expand Down Expand Up @@ -651,7 +652,7 @@ public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
throw new IllegalArgumentException("minWeight must be > 0");
}

return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
List<T> buffer = new ArrayList<>();
AtomicLong accumulatedCost = new AtomicLong(0L);
last.run(t -> {
Expand All @@ -674,14 +675,14 @@ public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
* Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error).
*/
public Flow<Void> drain() {
return Flows.usingEmit(_ -> last.run(_ -> {}));
return usingEmit(_ -> last.run(_ -> {}));
}

/**
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
*/
public Flow<T> onComplete(Runnable f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(emit);
} finally {
Expand All @@ -694,7 +695,7 @@ public Flow<T> onComplete(Runnable f) {
* Runs `f` after the flow completes successfully, that is when all elements are emitted.
*/
public Flow<T> onDone(Runnable f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(emit);
f.run();
});
Expand All @@ -704,7 +705,7 @@ public Flow<T> onDone(Runnable f) {
* Runs `f` after the flow completes with an error. The error can't be recovered.
*/
public Flow<T> onError(Consumer<Throwable> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(emit);
} catch (Throwable e) {
Expand Down Expand Up @@ -734,7 +735,7 @@ public Flow<T> intersperse(T start, T inject, T end) {
}

private Flow<T> intersperse(Optional<T> start, T inject, Optional<T> end) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
if (start.isPresent()) {
emit.apply(start.get());
}
Expand Down Expand Up @@ -790,7 +791,7 @@ public Flow<T> throttle(int elements, Duration per) {
* Whether the flow should also emit the first element that failed the predicate (`false` by default).
*/
public Flow<T> takeWhile(Predicate<T> f, boolean includeFirstFailing) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
if (f.test(t)) {
Expand Down Expand Up @@ -826,7 +827,7 @@ public Flow<T> concat(Flow<T> other) {
* Number of elements to be dropped.
*/
public Flow<T> drop(int n) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicInteger dropped = new AtomicInteger(0);
last.run(t -> {
if (dropped.get() < n) {
Expand Down Expand Up @@ -854,7 +855,7 @@ public Flow<T> drop(int n) {
* Should the resulting flow complete when the right flow (`outer`) completes, before `this` flow.
*/
public Flow<T> merge(Flow<T> other, boolean propagateDoneLeft, boolean propagateDoneRight) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
unsupervised(scope -> {
Source<T> c1 = this.runToChannel(scope);
Source<T> c2 = other.runToChannel(scope);
Expand Down Expand Up @@ -895,7 +896,7 @@ public Flow<T> prepend(Flow<T> other) {
* An alternative flow to be used when this flow is empty.
*/
public Flow<T> orElse(Flow<T> alternative) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
AtomicBoolean receivedAtLeastOneElement = new AtomicBoolean(false);
last.run(t -> {
emit.apply(t);
Expand Down Expand Up @@ -938,7 +939,7 @@ public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerCompl
* returned flow. If the result of `f` is empty, nothing is emitted by the returned channel.
*/
public <U> Flow<U> mapConcat(Function<T, Iterable<U>> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
last.run(t -> {
for (U u : f.apply(t)) {
emit.apply(u);
Expand All @@ -961,15 +962,15 @@ public <U> Flow<U> mapConcat(Function<T, Iterable<U>> f) {
* The mapping function.
*/
public <U> Flow<U> mapPar(int parallelism, Function<T, U> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
Semaphore semaphore = new Semaphore(parallelism);
Channel<Fork<Optional<U>>> inProgress = new Channel<>(parallelism);
Channel<U> results = Channel.withScopedBufferSize();

// creating a nested scope, so that in case of errors, we can clean up any mapping forks in a "local" fashion,
// that is without closing the main scope; any error management must be done in the forks, as the scope is
// unsupervised
Scopes.unsupervised(scope -> {
unsupervised(scope -> {
// a fork which runs the `last` pipeline, and for each emitted element creates a fork
// notifying only the `results` channels, as it will cause the scope to end, and any other forks to be
// interrupted, including the inProgress-fork, which might be waiting on a join()
Expand Down Expand Up @@ -1029,7 +1030,7 @@ public <U> Flow<U> mapPar(int parallelism, Function<T, U> f) {
* The mapping function.
*/
public <U> Flow<U> mapParUnordered(int parallelism, Function<T, U> f) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
Channel<U> results = Channel.withScopedBufferSize();
Semaphore s = new Semaphore(parallelism);
unsupervised(unsupervisedScope -> { // the outer scope, used for the fork which runs the `last` pipeline
Expand Down Expand Up @@ -1075,7 +1076,7 @@ public Flow<List<T>> sliding(int n, int step) {
if (n <= 0) throw new IllegalArgumentException("n must be > 0");
if (step <= 0) throw new IllegalArgumentException("step must be > 0");

return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
final AtomicReference<List<T>> buf = new AtomicReference<>(new ArrayList<>());
last.run(t -> {
var buffer = buf.get();
Expand Down Expand Up @@ -1113,7 +1114,7 @@ public Flow<List<T>> sliding(int n, int step) {
* @see #alsoToTap for a version that drops elements when the `other` sink is not available for receive.
*/
public Flow<T> alsoTo(Sink<T> other) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
try {
Expand Down Expand Up @@ -1144,7 +1145,7 @@ public Flow<T> alsoTo(Sink<T> other) {
* @see #alsoTo for a version that ensures that elements are emitted both by the returned flow and sent to the `other` sink.
*/
public Flow<T> alsoToTap(Sink<T> other) {
return Flows.usingEmit(emit -> {
return usingEmit(emit -> {
try {
last.run(t -> {
try {
Expand All @@ -1161,6 +1162,19 @@ public Flow<T> alsoToTap(Sink<T> other) {
});
}

/** Converts this {@link Flow} into a {@link Publisher}. The flow is run every time the publisher is subscribed to.
* <p>
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should
* remain active as long as the publisher is used.
* <p>
* Elements emitted by the flow are buffered, using a buffer of capacity given by the {@link Channel#BUFFER_SIZE} in scope or default value {@link Channel#DEFAULT_BUFFER_SIZE} is used.
* <p>
* The returned publisher implements the JDK 9+ {@code Flow.Publisher} API.
*/
public Publisher<T> toPublisher(Scope scope) {
return new FromFlowPublisher<>(scope, last);
}

// endregion

private void forkPropagate(UnsupervisedScope unsupervisedScope, Sink<?> propagateExceptionsTo, Callable<Void> runnable) {
Expand Down
Loading
Loading