Skip to content

Commit

Permalink
Add remaining flow methods (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar authored Jan 10, 2025
1 parent 7700513 commit ef1702e
Show file tree
Hide file tree
Showing 7 changed files with 781 additions and 10 deletions.
305 changes: 300 additions & 5 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.softwaremill.jox.structured.CancellableFork;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scope;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.ThrowingRunnable;
import com.softwaremill.jox.structured.UnsupervisedScope;

/**
Expand Down Expand Up @@ -439,6 +441,290 @@ public Flow<T> filter(Predicate<T> filteringPredicate) {
});
}

/**
* Emits only every nth element emitted by this flow.
*
* @param n
* The interval between two emitted elements.
*/
public Flow<T> sample(int n) {
return Flows.usingEmit(emit -> {
AtomicInteger sampleCounter = new AtomicInteger(0);
last.run(t -> {
int counter = sampleCounter.incrementAndGet();
if (n != 0 && counter % n == 0) {
emit.apply(t);
}
});
});
}

/** Remove subsequent, repeating elements
*/
public Flow<T> debounce() {
return debounceBy(Function.identity());
}

/**
* Remove subsequent, repeating elements matching 'f'
*
* @param f The function used to compare the previous and current elements
*/
public <U> Flow<T> debounceBy(Function<T, U> f) {
return Flows.usingEmit(emit -> {
AtomicReference<Optional<U>> previousElement = new AtomicReference<>(Optional.empty());
last.run(t -> {
U currentElement = f.apply(t);
if (!previousElement.get().equals(Optional.ofNullable(currentElement))) {
emit.apply(t);
}
previousElement.set(Optional.ofNullable(currentElement));
});
});
}

/**
* Applies the given mapping function `f` to each element emitted by this flow, for which the function returns a non-empty Optional, and emits the result.
* If `f` returns an empty Optional at an element, the element will be skipped.
*
* @param f
* The mapping function.
*/
public <U> Flow<U> collect(Function<T, Optional<U>> f) {
return Flows.usingEmit(emit ->
last.run(t -> {
Optional<U> result = f.apply(t);
if (result.isPresent()) {
emit.apply(result.get());
}
})
);
}

/**
* Transforms the elements of the flow by applying an accumulation function to each element, producing a new value at each step. The
* resulting flow contains the accumulated values at each point in the original flow.
*
* @param initial The initial value to start the accumulation.
* @param f The accumulation function that is applied to each element of the flow.
* @return A new Flow containing the accumulated values.
*/
public <V> Flow<V> scan(V initial, BiFunction<V, T, V> f) {
return Flows.usingEmit(emit -> {
emit.apply(initial);
AtomicReference<V> accumulator = new AtomicReference<>(initial);
last.run(t -> {
emit.apply(accumulator.updateAndGet(acc -> f.apply(acc, t)));
});
});
}

/** Combines elements from this and the other flow into Map.Entry. Completion of either flow completes the returned flow as well. The flows
* are run concurrently.
* <p>
* Method uses channels to emit elements. The size of channel buffer is determined by the scoped value {@link Channel#BUFFER_SIZE} or {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @see
* Flow#zipAll
*/
public <U> Flow<Map.Entry<T, U>> zip(Flow<U> other) {
return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
Source<T> s1 = this.runToChannel(scope);
Source<U> s2 = other.runToChannel(scope);

while (true) {
Object result1 = s1.receiveOrClosed();
if (result1 instanceof ChannelDone) {
return null;
} else if (result1 instanceof ChannelError error) {
throw error.toException();
} else {
// noinspection unchecked
T t = (T) result1;
Object result2 = s2.receiveOrClosed();
if (result2 instanceof ChannelDone) {
return null;
} else if (result2 instanceof ChannelError error) {
throw error.toException();
} else {
// noinspection unchecked
U u = (U) result2;
emit.apply(Map.entry(t, u));
}
}
}
});
});
}

/**
* Combines elements from this and the other flow into tuples, handling early completion of either flow with defaults. The flows are run
* concurrently.
* <p>
* Method uses channels to emit elements. The size of channel buffer is determined by the scoped value {@link Channel#BUFFER_SIZE} or {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param other
* A flow of elements to be combined with.
* @param thisDefault
* A default element to be used in the result tuple when the other flow is longer.
* @param otherDefault
* A default element to be used in the result tuple when the current flow is longer.
*/
public <U> Flow<Map.Entry<T, U>> zipAll(Flow<U> other, T thisDefault, U otherDefault) {
return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
Source<T> s1 = this.runToChannel(scope);
Source<U> s2 = other.runToChannel(scope);

boolean continueLoop = true;
while (continueLoop) {
Object received1 = s1.receiveOrClosed();
if (received1 instanceof ChannelDone) {
Object received2 = s2.receiveOrClosed();
if (received2 instanceof ChannelDone) {
continueLoop = false;
} else if (received2 instanceof ChannelError e) {
throw e.toException();
} else {
//noinspection unchecked
emit.apply(Map.entry(thisDefault, (U) received2));
}
} else if (received1 instanceof ChannelError e) {
throw e.toException();
} else {
Object received2 = s2.receiveOrClosed();
if (received2 instanceof ChannelDone) {
//noinspection unchecked
emit.apply(Map.entry((T) received1, otherDefault));
} else if (received2 instanceof ChannelError e) {
throw e.toException();
} else {
//noinspection unchecked
emit.apply(Map.entry((T) received1, (U) received2));
}
}
}
return null;
});
});
}

/**
* Combines each element from this and the index of the element (starting at 0).
*/
public Flow<Map.Entry<T, Long>> zipWithIndex() {
return Flows.usingEmit(emit -> {
AtomicLong index = new AtomicLong(0L);
last.run(t -> {
Map.Entry<T, Long> zipped = Map.entry(t, index.getAndIncrement());
emit.apply(zipped);
});
});
}

/** Given that this flow emits other flows, flattens the nested flows into a single flow. The resulting flow emits elements from the
* nested flows in the order they are emitted.
* <p>
* The nested flows are run in sequence, that is, the next nested flow is started only after the previous one completes.
*
* @param args
* This param should *NOT* be passed. It's only used to verify that this flow contains other flows.
* @throws IllegalArgumentException
* when flow does not contain nested flows, or when `args` are not empty
*/
@SafeVarargs
public final T flatten(T... args) {
if (!Flow.class.equals(getTClass(args))) {
throw new IllegalArgumentException("requirement failed: flatten can be called on Flow containing Flows");
}
//noinspection unchecked,rawtypes
return (T) this.flatMap(t -> (Flow) t);
}

/** Pipes the elements of child flows into the returned flow.
* <p>
* If this flow or any of the child flows emit an error, the pulling stops and the output flow propagates the error.
* <p>
* Up to `parallelism` child flows are run concurrently in the background. When the limit is reached, until a child flow completes, no
* more child flows are run.
* <p>
* The size of the buffers for the elements emitted by the child flows is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param parallelism
* An upper bound on the number of child flows that run in parallel.
* @param args
* This param should *NOT* be passed. It's only used to verify that this flow contains other flows.
* @throws IllegalArgumentException
* when flow does not contain nested flows, or when `args` are not empty
*/
@SuppressWarnings("unchecked")
@SafeVarargs
public final <U> T flattenPar(int parallelism, T... args) {
if (!Flow.class.equals(getTClass(args))) {
throw new IllegalArgumentException("requirement failed: flattenPar can be called on Flow containing Flows");
}
return (T) Flows.usingEmit(emit -> {
class Nested {
final Flow<U> child;
Nested(Flow<U> child) {
this.child = child;
}
}
final class ChildDone {}

unsupervised(scope -> {
Channel<U> childOutputChannel = Channel.withScopedBufferSize();
Channel<ChildDone> childDoneChannel = Channel.withScopedBufferSize();

// When an error occurs in the parent, propagating it also to `childOutputChannel`, from which we always
// `select` in the main loop. That way, even if max parallelism is reached, errors in the parent will
// be discovered without delay.
//noinspection unchecked
Source<Nested> parentChannel = map(t -> new Nested((Flow<U>) t))
.onError(childOutputChannel::error)
.runToChannel(scope);

int runningChannelCount = 1; // parent is running
boolean parentDone = false;

while (runningChannelCount > 0) {
assert runningChannelCount <= parallelism + 1;

Object result;
if (runningChannelCount == parallelism + 1 || parentDone) {
result = selectOrClosed(childOutputChannel.receiveClause(), childDoneChannel.receiveClause());
} else {
result = selectOrClosed(childOutputChannel.receiveClause(), childDoneChannel.receiveClause(), parentChannel.receiveClause());
}

// Only `parentChannel` might be done, child completion is signalled via `childDoneChannel`.
if (result instanceof ChannelDone) {
parentDone = parentChannel.isClosedForReceive();
assert parentDone;
runningChannelCount--;
} else if (result instanceof ChannelError e) {
throw e.toException();
} else if (ChildDone.class.isInstance(result)) {
runningChannelCount--;
} else if (Nested.class.isInstance(result)) {
//noinspection unchecked
Nested t = (Nested) result;
scope.forkUnsupervised(() -> {
t.child.onDone(() -> childDoneChannel.send(new ChildDone()))
.runPipeToSink(childOutputChannel, false);
return null;
});
runningChannelCount++;
} else if (result != null) {
emit.apply(result);
}
}
return null;
});
});
}

/**
* Applies the given `mappingFunction` to each element emitted by this flow, in sequence.
* The given {@link Consumer<FlowEmit>} can be used to emit an arbitrary number of elements.
Expand Down Expand Up @@ -715,7 +1001,7 @@ public Flow<T> onComplete(Runnable f) {
/**
* Runs `f` after the flow completes successfully, that is when all elements are emitted.
*/
public Flow<T> onDone(Runnable f) {
public Flow<T> onDone(ThrowingRunnable f) {
return usingEmit(emit -> {
last.run(emit);
f.run();
Expand Down Expand Up @@ -1221,19 +1507,20 @@ public Publisher<T> toPublisher(Scope scope) {
// endregion

// region ByteFlow

public interface ByteChunkMapper<T> extends Function<T, ByteChunk> {}
public interface ByteArrayMapper<T> extends Function<T, byte[]> {}

/**
* Converts a flow of `byte[]` or {@link ByteChunk} into a dedicated Flow type {@link ByteFlow}.
*
* @throws IllegalArgumentException if the flow does not contain `byte[]` or {@link ByteChunk} elements.
* @param args
* This param should *NOT* be passed. It's only used to verify that this flow contains byte[] or {@link ByteChunk}.
* @throws IllegalArgumentException
* if the flow does not contain `byte[]` or {@link ByteChunk} elements.
*/
@SafeVarargs
public final ByteFlow toByteFlow(T... args) {
//noinspection unchecked
return new ByteFlow(last, (Class<T>) args.getClass().getComponentType());
return new ByteFlow(last, getTClass(args));
}

/**
Expand Down Expand Up @@ -1456,6 +1743,14 @@ private void runLastToChannelAsync(UnsupervisedScope scope, Channel<T> channel)
});
}

@SuppressWarnings("unchecked")
private static <T> Class<T> getTClass(T[] args) {
if (args.length > 0) {
throw new IllegalArgumentException("Please do not pass any arguments for this method. Java will detect the type automatically.");
}
return (Class<T>) args.getClass().getComponentType();
}

private static class BreakException extends RuntimeException {
}
}
Loading

0 comments on commit ef1702e

Please sign in to comment.