Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 27, 2023
1 parent 5f087b7 commit f02db8e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
10 changes: 9 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<enablePreview>true</enablePreview>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
Expand All @@ -42,5 +44,11 @@
<version>5.10.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
48 changes: 48 additions & 0 deletions core/src/test/java/jox/ChannelInterruptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
import static jox.TestUtil.*;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ChannelInterruptionTest {
Expand Down Expand Up @@ -65,4 +73,44 @@ void testRaceInterruptAndSend() throws Exception {
}
});
}

@Test
void testReceiveManyInterruptsReceive() throws ExecutionException, InterruptedException {
scoped(scope -> {
Channel<String> channel = new Channel<>();
Set<String> received = ConcurrentHashMap.newKeySet();

// starting with a single receive
forkVoid(scope, () -> {
received.add(channel.receive());
});
// wait for the `receive` to suspend
Thread.sleep(100);

// then, starting subsequent receives, and interrupting them
for (int i = 0; i < 1000; i++) {
var s = new Semaphore(0);
var f = forkCancelable(scope, () -> {
s.release(); // letting the main thread know that the receive has started
channel.receive();
});
s.acquire();
f.cancel();
}

// then, starting one more receive
forkVoid(scope, () -> {
received.add(channel.receive());
});
// wait for the `receive` to suspend
Thread.sleep(100);

// send two elements, wait for completion
channel.send("a");
channel.send("b");

// check the results
await().atMost(1, SECONDS).until(() -> received.equals(Set.of("a", "b")));
});
}
}

0 comments on commit f02db8e

Please sign in to comment.