Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Actor, external runner and fix flows publisher issues #81

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions flows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,29 @@
<version>0.3.1</version>
<packaging>jar</packaging>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_3</artifactId>
<version>1.1.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channels</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>structured</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down Expand Up @@ -55,14 +77,14 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channels</artifactId>
<version>0.3.1</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>structured</artifactId>
<version>0.3.1</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream-testkit_3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
1 change: 1 addition & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.ThrowingConsumer;

public final class Flows {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Sink;
import com.softwaremill.jox.structured.ExternalRunner;
import com.softwaremill.jox.structured.Scope;
import com.softwaremill.jox.structured.UnsupervisedScope;

class FromFlowPublisher<T> implements Flow.Publisher<T> {

private final Scope scope;
private final ExternalRunner externalRunner;
private final FlowStage<T> last;

FromFlowPublisher(Scope scope, FlowStage<T> last) {
this.scope = scope;
this.externalRunner = scope.externalRunner();
this.last = last;
}

Expand All @@ -39,10 +40,12 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
// we cannot block `subscribe` (see https://github.com/reactive-streams/reactive-streams-jvm/issues/393),
// hence running in a fork; however, the reactive library might run .subscribe on a different thread, that's
// why we need to use the external runner functionality
scope.fork(() -> {
runToSubscriber(subscriber);
return null;
});
externalRunner.runAsync(scope ->
scope.fork(() -> {
runToSubscriber(subscriber);
return null;
})
);
}

private void runToSubscriber(Flow.Subscriber<? super T> subscriber) throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.softwaremill.jox.flows;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;

import com.softwaremill.jox.structured.Scopes;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.AsPublisher;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

public class FlowPekkoStreamTest {

private ActorSystem system;

@BeforeEach
void setUp() {
system = ActorSystem.create("test");
}

@AfterEach
void cleanUp() {
system.terminate();
}

@Test
void test() throws ExecutionException, InterruptedException {
Scopes.supervised(scope -> {
var flow = Flows.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(i -> i * 2)
.filter(i -> i % 3 == 0);
var result = Source
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
.map(i -> i * 2)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();

assertEquals(List.of(12, 24, 36), result);
return null;
});
}

@Test
public void testSimpleFlow() throws ExecutionException, InterruptedException {
Scopes.supervised(scope -> {
var flow = Flows.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(i -> i * 2)
.filter(i -> i % 3 == 0);
var result = Source
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
.map(i -> i * 2)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();

assertEquals(List.of(12, 24, 36), result);
return null;
});
}

@Test
public void testConcurrentFlow() throws ExecutionException, InterruptedException {
Scopes.supervised(scope -> {
var flow = Flows.tick(Duration.ofMillis(100), "x")
.merge(Flows.tick(Duration.ofMillis(200), "y"), false, false)
.take(5);
var result = Source
.fromPublisher(FlowAdapters.toPublisher(flow.toPublisher(scope)))
.map(s -> s + s)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();

result = result.stream().sorted().toList();
assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result);
return null;
});
}

@Test
public void testFlowFromSimplePublisher() throws Exception {
Publisher<Integer> publisher = Source
.fromIterator(() -> List.of(1, 2, 3).iterator())
.map(i -> i * 2)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);

var result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(publisher))
.map(i -> i * 10)
.runToList();

assertEquals(List.of(20, 40, 60), result);
}

@Test
public void testFlowFromConcurrentPublisher() throws Exception {
Publisher<String> publisher = Source
.tick(Duration.ZERO, Duration.ofMillis(100), "x")
.merge(Source.tick(Duration.ZERO, Duration.ofMillis(200), "y"))
.take(5)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);

var result = Flows.fromPublisher(FlowAdapters.toFlowPublisher(publisher))
.map(s -> s + s)
.runToList();

result.sort(String::compareTo);
assertEquals(List.of("xx", "xx", "xx", "yy", "yy"), result);
}
}
5 changes: 5 additions & 0 deletions structured/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>channels</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
123 changes: 123 additions & 0 deletions structured/src/main/java/com/softwaremill/jox/structured/ActorRef.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.softwaremill.jox.structured;

import static com.softwaremill.jox.structured.Scopes.unsupervised;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.Sink;

public class ActorRef<T> {

private final Sink<ThrowingConsumer<T>> c;

public ActorRef(Sink<ThrowingConsumer<T>> c) {
this.c = c;
}

/**
* Send an invocation to the actor and await for the result.
* <p>
* The `f` function should be an invocation of a method on `T` and should not directly or indirectly return the `T` value, as this might
* expose the actor's internal mutable state to other threads.
* <p>
* Any non-fatal exceptions thrown by `f` will be propagated to the caller and the actor will continue processing other invocations.
* Fatal exceptions will be propagated to the actor's enclosing scope, and the actor will close.
*/
public <U> U ask(ThrowingFunction<T, U> f) throws Exception {
CompletableFuture<U> cf = new CompletableFuture<>();
c.send(t -> {
try {
cf.complete(f.apply(t));
} catch (Throwable e) {
if (e instanceof RuntimeException) {
cf.completeExceptionally(e);
} else {
cf.completeExceptionally(e);
throw e;
}
}
});
try {
return cf.get();
} catch (ExecutionException e) {
throw (Exception) e.getCause();
}
}

/**
* Send an invocation to the actor that should be processed in the background (fire-and-forget). Might block until there's enough space
* in the actor's mailbox (incoming channel).
* <p>
* Any exceptions thrown by `f` will be propagated to the actor's enclosing scope, and the actor will close.
*/
public void tell(ThrowingConsumer<T> f) throws InterruptedException {
c.send(f);
}

/**
* The same as {@link ActorRef#create(Scope, Object, Consumer)} but with empty close action.
*/
public static <T> ActorRef<T> create(Scope scope, T logic) {
return create(scope, logic, null);
}

/**
* Creates a new actor ref, that is a fork in the current concurrency scope, which protects a mutable resource (`logic`) and executes
* invocations on it serially, one after another. It is guaranteed that `logic` will be accessed by at most one thread at a time. The
* methods of `logic: T` define the actor's interface (the messages that can be "sent to the actor").
* <p>
* Invocations can be scheduled using the returned `ActorRef`. When an invocation is an `ActorRef.ask`, any non-fatal exceptions are
* propagated to the caller, and the actor continues. Fatal exceptions, or exceptions that occur during `ActorRef.tell` invocations,
* cause the actor's channel to be closed with an error, and are propagated to the enclosing scope.
* <p>
* The actor's mailbox (incoming channel) will have a capacity as specified by the {@link Channel#BUFFER_SIZE} in scope or {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*/
public static <T> ActorRef<T> create(Scope scope, T logic, Consumer<T> close) {
Channel<ThrowingConsumer<T>> c = Channel.withScopedBufferSize();
ActorRef<T> ref = new ActorRef<>(c);
scope.fork(() -> {
try {
while (true) {
ThrowingConsumer<T> m = c.receive();
try {
m.accept(logic);
} catch (Throwable t) {
c.error(t);
throw t;
}
}
} finally {
if (close != null) {
uninterruptible(() -> {
close.accept(logic);
return null;
});
}
}
});
return ref;
}

private static void uninterruptible(Callable<Void> f) throws ExecutionException, InterruptedException {
unsupervised(scope -> {
Fork<Void> t = scope.forkUnsupervised(f);

ThrowingRunnable joinDespiteInterrupted = () -> {
while (true) {
try {
t.join();
break;
} catch (InterruptedException e) {
// Continue the loop to retry joining
}
}
};
joinDespiteInterrupted.run();
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.softwaremill.jox.structured;

public record ExternalRunner(ActorRef<ExternalScheduler> scheduler) {
/** Allows to runs the given function asynchronously, in the scope of the concurrency scope in which this runner was created.
* <p>
* `f` should return promptly, not to obstruct execution of other scheduled functions. Typically, it should start a background fork.
*/
public void runAsync(ThrowingConsumer<Scope> f) {
SneakyThrows.sneakyThrows(() ->
scheduler.ask(s -> {
s.run(f);
return null;
})
);
}
}
Loading
Loading