diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ae11887..0396d19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,7 +42,7 @@ jobs: id: benchmark_java run: | mvn package -DskipTests=true - java -Djmh.executor=VIRTUAL -jar bench/bench-java/target/benchmarks.jar -i 5 -wi 5 -f 3 -to 2100ms -r 2000ms -w 2000ms -rf json | tee out.txt + java -Djmh.executor=VIRTUAL -jar bench/bench-java/target/benchmarks.jar -i 5 -wi 5 -f 1 -to 1100ms -r 1000ms -w 1000ms -rf json -rff jmh-result-java.json | tee out.txt echo 'output<> $GITHUB_OUTPUT cat out.txt >> $GITHUB_OUTPUT @@ -50,13 +50,13 @@ jobs: - name: Run kotlin benchmarks id: benchmark_kotlin run: | - java -jar bench/bench-kotlin/target/benchmarks.jar -i 5 -wi 5 -f 3 -to 2100ms -r 2000ms -w 2000ms -rf json -rff jmh-result-kotlin.json | tee out.txt + java -jar bench/bench-kotlin/target/benchmarks.jar -i 5 -wi 5 -f 1 -to 1100ms -r 1000ms -w 1000ms -rf json -rff jmh-result-kotlin.json | tee out.txt echo 'output<> $GITHUB_OUTPUT cat out.txt >> $GITHUB_OUTPUT echo 'EOF' >> $GITHUB_OUTPUT - name: Merge java and kotlin benchmark results - run: jq -s '.[0] + .[1]' jmh-result.json jmh-result-kotlin.json > jmh-result-both.json + run: jq -s '.[0] + .[1]' jmh-result-java.json jmh-result-kotlin.json > jmh-result-all.json - name: Extract branch name shell: bash run: echo "branch=${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}}" >> $GITHUB_OUTPUT @@ -67,7 +67,7 @@ jobs: if: steps.extract_branch.outputs.branch == 'main' with: tool: 'jmh' - output-file-path: jmh-result-both.json + output-file-path: jmh-result-all.json github-token: ${{ secrets.GITHUB_TOKEN }} auto-push: true # Show alert with commit comment on detecting possible performance regression diff --git a/README.md b/README.md index cab5c0b..e7a8039 100644 --- a/README.md +++ b/README.md @@ -214,54 +214,70 @@ class Demo6 { The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java synchronisation primitives (queues), as well as the Kotlin channel implementation. -The test results for version 0.0.1, run on an M1 Max MacBook Pro, with Java 21.0.1, are as follows: +The test results for version 0.0.4, run on an M1 Max MacBook Pro, with Java 21.0.1, are as follows: ``` -Benchmark (capacity) Mode Cnt Score Error Units - -// jox -RendezvousBenchmark.channel N/A avgt 30 176.499 ± 14.964 ns/op -RendezvousBenchmark.channel:receiveFromChannel N/A avgt 30 176.499 ± 14.964 ns/op -RendezvousBenchmark.channel:sendToChannel N/A avgt 30 176.499 ± 14.964 ns/op -RendezvousBenchmark.channel_iterative N/A avgt 30 209.041 ± 30.397 ns/op - -BufferedBenchmark.channel 1 avgt 30 177.547 ± 14.626 ns/op -BufferedBenchmark.channel:receiveFromChannel 1 avgt 30 177.547 ± 14.626 ns/op -BufferedBenchmark.channel:sendToChannel 1 avgt 30 177.547 ± 14.626 ns/op -BufferedBenchmark.channel 10 avgt 30 135.838 ± 14.578 ns/op -BufferedBenchmark.channel:receiveFromChannel 10 avgt 30 135.838 ± 14.578 ns/op -BufferedBenchmark.channel:sendToChannel 10 avgt 30 135.838 ± 14.578 ns/op -BufferedBenchmark.channel 100 avgt 30 92.837 ± 13.936 ns/op -BufferedBenchmark.channel:receiveFromChannel 100 avgt 30 92.837 ± 13.936 ns/op -BufferedBenchmark.channel:sendToChannel 100 avgt 30 92.836 ± 13.935 ns/op -BufferedBenchmark.channel_iterative 1 avgt 30 185.138 ± 14.382 ns/op -BufferedBenchmark.channel_iterative 10 avgt 30 126.594 ± 12.089 ns/op -BufferedBenchmark.channel_iterative 100 avgt 30 83.534 ± 6.540 ns/op - -// java -RendezvousBenchmark.exchanger N/A avgt 30 177.630 ± 152.388 ns/op -RendezvousBenchmark.exchanger:exchange1 N/A avgt 30 177.630 ± 152.388 ns/op -RendezvousBenchmark.exchanger:exchange2 N/A avgt 30 177.630 ± 152.388 ns/op -RendezvousBenchmark.synchronous_queue N/A avgt 30 978.826 ± 188.831 ns/op -RendezvousBenchmark.synchronous_queue:putToSynchronousQueue N/A avgt 30 978.826 ± 188.830 ns/op -RendezvousBenchmark.synchronous_queue:takeFromSynchronousQueue N/A avgt 30 978.825 ± 188.832 ns/op - -BufferedBenchmark.array_blocking_queue 1 avgt 30 2266.799 ± 231.198 ns/op -BufferedBenchmark.array_blocking_queue:putToArrayBlockingQueue 1 avgt 30 2266.798 ± 231.197 ns/op -BufferedBenchmark.array_blocking_queue:takeFromArrayBlockingQueue 1 avgt 30 2266.799 ± 231.199 ns/op -BufferedBenchmark.array_blocking_queue 10 avgt 30 450.796 ± 93.496 ns/op -BufferedBenchmark.array_blocking_queue:putToArrayBlockingQueue 10 avgt 30 450.795 ± 93.495 ns/op -BufferedBenchmark.array_blocking_queue:takeFromArrayBlockingQueue 10 avgt 30 450.797 ± 93.497 ns/op -BufferedBenchmark.array_blocking_queue 100 avgt 30 147.962 ± 9.743 ns/op -BufferedBenchmark.array_blocking_queue:putToArrayBlockingQueue 100 avgt 30 147.962 ± 9.743 ns/op -BufferedBenchmark.array_blocking_queue:takeFromArrayBlockingQueue 100 avgt 30 147.962 ± 9.743 ns/op - -// kotlin -RendezvousKotlinBenchmark.sendReceiveUsingDefaultDispatcher N/A avgt 30 108.338 ± 0.538 ns/op - -BufferedKotlinBenchmark.sendReceiveUsingDefaultDispatcher 1 avgt 30 86.614 ± 0.784 ns/op -BufferedKotlinBenchmark.sendReceiveUsingDefaultDispatcher 10 avgt 30 40.153 ± 0.221 ns/op -BufferedKotlinBenchmark.sendReceiveUsingDefaultDispatcher 100 avgt 30 26.764 ± 0.022 ns/op +Benchmark (capacity) (chainLength) Mode Cnt Score Error Units + +// jox - single channel +BufferedBenchmark.channel 1 N/A avgt 20 210.366 ± 18.979 ns/op +BufferedBenchmark.channel 10 N/A avgt 20 148.691 ± 25.368 ns/op +BufferedBenchmark.channel 100 N/A avgt 20 149.499 ± 22.495 ns/op + +RendezvousBenchmark.channel N/A N/A avgt 20 187.940 ± 8.783 ns/op + +// kotlin - single channel +BufferedKotlinBenchmark.channel_defaultDispatcher 1 N/A avgt 30 85.027 ± 0.709 ns/op +BufferedKotlinBenchmark.channel_defaultDispatcher 10 N/A avgt 30 40.095 ± 0.452 ns/op +BufferedKotlinBenchmark.channel_defaultDispatcher 100 N/A avgt 30 26.879 ± 0.063 ns/op + +RendezvousKotlinBenchmark.channel_defaultDispatcher N/A N/A avgt 30 116.664 ± 10.099 ns/op + +// jox - selects +SelectBenchmark.selectWithSingleClause N/A N/A avgt 20 353.074 ± 27.860 ns/op +SelectBenchmark.selectWithTwoClauses N/A N/A avgt 20 651.050 ± 31.037 ns/op + +// kotlin - selects +SelectKotlinBenchmark.selectWithSingleClause_defaultDispatcher N/A N/A avgt 30 169.823 ± 1.250 ns/op +SelectKotlinBenchmark.selectWithTwoClauses_defaultDispatcher N/A N/A avgt 30 227.413 ± 2.659 ns/op + +// java built-in - single queue +BufferedBenchmark.arrayBlockingQueue 1 N/A avgt 20 2447.455 ± 427.354 ns/op +BufferedBenchmark.arrayBlockingQueue 10 N/A avgt 20 546.227 ± 96.690 ns/op +BufferedBenchmark.arrayBlockingQueue 100 N/A avgt 20 125.287 ± 4.387 ns/op + +RendezvousBenchmark.exchanger N/A N/A avgt 20 106.114 ± 20.360 ns/op +RendezvousBenchmark.synchronousQueue N/A N/A avgt 20 869.988 ± 101.291 ns/op + +// jox - multi channel +ChainedBenchmark.channelChain 0 100 avgt 20 225.370 ± 4.693 ns/op +ChainedBenchmark.channelChain 0 1000 avgt 20 173.997 ± 4.160 ns/op +ChainedBenchmark.channelChain 0 10000 avgt 20 160.097 ± 4.520 ns/op +ChainedBenchmark.channelChain 100 100 avgt 20 8.377 ± 0.133 ns/op +ChainedBenchmark.channelChain 100 1000 avgt 20 6.147 ± 0.054 ns/op +ChainedBenchmark.channelChain 100 10000 avgt 20 7.942 ± 0.447 ns/op + +// kotlin - multi channel +ChainedKotlinBenchmark.channelChain_defaultDispatcher 0 100 avgt 30 96.106 ± 1.247 ns/op +ChainedKotlinBenchmark.channelChain_defaultDispatcher 0 1000 avgt 30 74.858 ± 0.810 ns/op +ChainedKotlinBenchmark.channelChain_defaultDispatcher 0 10000 avgt 30 72.894 ± 0.787 ns/op +ChainedKotlinBenchmark.channelChain_defaultDispatcher 100 100 avgt 30 5.164 ± 0.104 ns/op +ChainedKotlinBenchmark.channelChain_defaultDispatcher 100 1000 avgt 30 4.157 ± 0.029 ns/op +ChainedKotlinBenchmark.channelChain_defaultDispatcher 100 10000 avgt 30 4.965 ± 0.043 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 0 100 avgt 30 70.484 ± 0.431 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 0 1000 avgt 30 98.400 ± 1.003 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 0 10000 avgt 30 92.579 ± 1.650 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 100 100 avgt 30 27.052 ± 0.121 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 100 1000 avgt 30 25.982 ± 0.111 ns/op +ChainedKotlinBenchmark.channelChain_eventLoop 100 10000 avgt 30 27.276 ± 0.316 ns/op + +// java built-in - multi queues +ChainedBenchmark.queueChain 0 100 avgt 20 186.677 ± 2.564 ns/op +ChainedBenchmark.queueChain 0 1000 avgt 20 108.954 ± 13.825 ns/op +ChainedBenchmark.queueChain 0 10000 avgt 20 101.643 ± 10.526 ns/op +ChainedBenchmark.queueChain 100 100 avgt 20 7.933 ± 0.546 ns/op +ChainedBenchmark.queueChain 100 1000 avgt 20 5.281 ± 0.261 ns/op +ChainedBenchmark.queueChain 100 10000 avgt 20 5.798 ± 0.058 ns/op ``` ## Feedback diff --git a/bench/bench-java/src/main/java/com/softwaremill/jox/BufferedBenchmark.java b/bench/bench-java/src/main/java/com/softwaremill/jox/BufferedBenchmark.java index 938f50c..2b4bb4f 100644 --- a/bench/bench-java/src/main/java/com/softwaremill/jox/BufferedBenchmark.java +++ b/bench/bench-java/src/main/java/com/softwaremill/jox/BufferedBenchmark.java @@ -8,75 +8,58 @@ /** * Buffered tests for {@link ArrayBlockingQueue} and {@link Channel}. */ -@Warmup(iterations = 3, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -// after the measurement time, we want to interrupt any pending methods (which might block, waiting for a partner) -// this needs to be slightly larger than the test time to avoid warnings -@Timeout(time = 5100, timeUnit = TimeUnit.MILLISECONDS) -@Fork(value = 3) +@Warmup(iterations = 3, time = 3000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 2) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Group) +@State(Scope.Benchmark) public class BufferedBenchmark { @Param({"1", "10", "100"}) public int capacity; - private ArrayBlockingQueue queue; - private Channel channel; + // going against jmh's best practises, the benchmarks are "iterative" (not using groups), for two reasons: + // (1) direct comparison w/ Kotlin, as we can't write a group-based benchmark there, due to suspended functions + // (2) the more complex benchmarks (which use higher numbers of threads) need to be enclosed in a single method anyway - @Setup - public void create() { - queue = new ArrayBlockingQueue<>(capacity); - channel = new Channel<>(capacity); - } - - // - - @Benchmark - @Group("array_blocking_queue") - @GroupThreads(1) - public void putToArrayBlockingQueue() throws InterruptedException { - queue.put(63); - } + private final static int OPERATIONS_PER_INVOCATION = 1_000_000; @Benchmark - @Group("array_blocking_queue") - @GroupThreads(1) - public void takeFromArrayBlockingQueue() throws InterruptedException { - queue.take(); - } - - // + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) + public void arrayBlockingQueue() throws InterruptedException { + var queue = new ArrayBlockingQueue<>(capacity); + var t1 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + queue.put(63); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - @Benchmark - @Group("channel") - @GroupThreads(1) - public void sendToChannel() throws InterruptedException { - channel.send(63); - } + var t2 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - @Benchmark - @Group("channel") - @GroupThreads(1) - public void receiveFromChannel() throws InterruptedException { - channel.receive(); + t1.join(); + t2.join(); } - // - - // including an iterative benchmark, for direct comparison w/ Kotlin, as we can't write a group-based benchmark - // there, due to suspended functions - - private final static int OPERATIONS_PER_INVOCATION = 1_000_000; - @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - @Group("channel_iterative") - public void sendReceive() throws InterruptedException { + public void channel() throws InterruptedException { + var ch = new Channel<>(capacity); var t1 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - channel.send(63); + ch.send(63); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -86,7 +69,7 @@ public void sendReceive() throws InterruptedException { var t2 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - channel.receive(); + ch.receive(); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/bench/bench-java/src/main/java/com/softwaremill/jox/ChainedBenchmark.java b/bench/bench-java/src/main/java/com/softwaremill/jox/ChainedBenchmark.java new file mode 100644 index 0000000..32517b5 --- /dev/null +++ b/bench/bench-java/src/main/java/com/softwaremill/jox/ChainedBenchmark.java @@ -0,0 +1,139 @@ +package com.softwaremill.jox; + +import org.openjdk.jmh.annotations.*; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +/** + * Chained send-receive test for {@link Channel} and {@link BlockingQueue} - a series of threads proxying values to subsequent channels/queues. + */ +@Warmup(iterations = 3, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 2) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class ChainedBenchmark { + @Param({"0", "100"}) + public int capacity; + + @Param({"100", "1000", "10000"}) + public int chainLength; + + private final static int OPERATIONS_PER_INVOCATION = 10_000_000; + + @Benchmark + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) + public void channelChain() throws InterruptedException { + // we want to measure the amount of time a send-receive pair takes + int elements = OPERATIONS_PER_INVOCATION / chainLength; + Channel[] channels = new Channel[chainLength]; + for (int i = 0; i < chainLength; i++) { + channels[i] = new Channel<>(capacity); + } + + Thread[] threads = new Thread[chainLength + 1]; + threads[0] = Thread.startVirtualThread(() -> { + var ch = channels[0]; + for (int i = 0; i < elements; i++) { + try { + ch.send(63); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + for (int t = 1; t < chainLength; t++) { + int finalT = t; + threads[t] = Thread.startVirtualThread(() -> { + var ch1 = channels[finalT - 1]; + var ch2 = channels[finalT]; + for (int i = 0; i < elements; i++) { + try { + ch2.send(ch1.receive()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + threads[chainLength] = Thread.startVirtualThread(() -> { + var ch = channels[chainLength - 1]; + for (int i = 0; i < elements; i++) { + try { + ch.receive(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + for (Thread thread : threads) { + thread.join(); + } + } + + @Benchmark + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) + public void queueChain() throws InterruptedException { + // we want to measure the amount of time a send-receive pair takes + int elements = OPERATIONS_PER_INVOCATION / chainLength; + BlockingQueue[] queues = new BlockingQueue[chainLength]; + if (capacity == 0) { + for (int i = 0; i < chainLength; i++) { + queues[i] = new SynchronousQueue<>(); + } + } else { + for (int i = 0; i < chainLength; i++) { + queues[i] = new ArrayBlockingQueue<>(capacity); + } + } + + Thread[] threads = new Thread[chainLength + 1]; + threads[0] = Thread.startVirtualThread(() -> { + var q = queues[0]; + for (int i = 0; i < elements; i++) { + try { + q.put(63); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + for (int t = 1; t < chainLength; t++) { + int finalT = t; + threads[t] = Thread.startVirtualThread(() -> { + var q1 = queues[finalT - 1]; + var q2 = queues[finalT]; + for (int i = 0; i < elements; i++) { + try { + q2.put(q1.take()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + threads[chainLength] = Thread.startVirtualThread(() -> { + var q = queues[chainLength - 1]; + for (int i = 0; i < elements; i++) { + try { + q.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + for (Thread thread : threads) { + thread.join(); + } + } +} diff --git a/bench/bench-java/src/main/java/com/softwaremill/jox/RendezvousBenchmark.java b/bench/bench-java/src/main/java/com/softwaremill/jox/RendezvousBenchmark.java index a98f607..60998d6 100644 --- a/bench/bench-java/src/main/java/com/softwaremill/jox/RendezvousBenchmark.java +++ b/bench/bench-java/src/main/java/com/softwaremill/jox/RendezvousBenchmark.java @@ -9,83 +9,78 @@ /** * Rendezvous tests for {@link SynchronousQueue}, {@link Exchanger} and {@link Channel}. */ -@Warmup(iterations = 3, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -// after the measurement time, we want to interrupt any pending methods (which might block, waiting for a partner) -// this needs to be slightly larger than the test time to avoid warnings -@Timeout(time = 5100, timeUnit = TimeUnit.MILLISECONDS) -@Fork(value = 3) +@Warmup(iterations = 3, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 2) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Group) public class RendezvousBenchmark { - private SynchronousQueue queue = new SynchronousQueue<>(); - - @Benchmark - @Group("synchronous_queue") - @GroupThreads(1) - public void putToSynchronousQueue() throws InterruptedException { - queue.put(63); - } + private final static int OPERATIONS_PER_INVOCATION = 1_000_000; @Benchmark - @Group("synchronous_queue") - @GroupThreads(1) - public void takeFromSynchronousQueue() throws InterruptedException { - queue.take(); - } - - // + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) + public void synchronousQueue() throws InterruptedException { + var queue = new SynchronousQueue<>(); + var t1 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + queue.put(63); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - private Exchanger exchanger = new Exchanger<>(); + var t2 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - @Benchmark - @Group("exchanger") - @GroupThreads(1) - public void exchange1() throws InterruptedException { - exchanger.exchange(63); + t1.join(); + t2.join(); } @Benchmark - @Group("exchanger") - @GroupThreads(1) - public void exchange2() throws InterruptedException { - exchanger.exchange(64); - } - - // - - private Channel channel = new Channel<>(); + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) + public void exchanger() throws InterruptedException { + var exchanger = new Exchanger<>(); + var t1 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + exchanger.exchange(63); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - @Benchmark - @Group("channel") - @GroupThreads(1) - public void sendToChannel() throws InterruptedException { - channel.send(63); - } + var t2 = Thread.startVirtualThread(() -> { + for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { + try { + exchanger.exchange(64); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); - @Benchmark - @Group("channel") - @GroupThreads(1) - public void receiveFromChannel() throws InterruptedException { - channel.receive(); + t1.join(); + t2.join(); } - // - - // including an iterative benchmark, for direct comparison w/ Kotlin, as we can't write a group-based benchmark - // there, due to suspended functions - - private final static int OPERATIONS_PER_INVOCATION = 1_000_000; - @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - @Group("channel_iterative") - public void sendReceive() throws InterruptedException { + public void channel() throws InterruptedException { + var ch = new Channel<>(); var t1 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - channel.send(63); + ch.send(63); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -95,7 +90,7 @@ public void sendReceive() throws InterruptedException { var t2 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - channel.receive(); + ch.receive(); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/bench/bench-java/src/main/java/com/softwaremill/jox/SelectBenchmark.java b/bench/bench-java/src/main/java/com/softwaremill/jox/SelectBenchmark.java index 0832155..5baebc4 100644 --- a/bench/bench-java/src/main/java/com/softwaremill/jox/SelectBenchmark.java +++ b/bench/bench-java/src/main/java/com/softwaremill/jox/SelectBenchmark.java @@ -2,8 +2,6 @@ import org.openjdk.jmh.annotations.*; -import java.util.concurrent.Exchanger; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import static com.softwaremill.jox.Select.select; @@ -11,48 +9,22 @@ /** * Tests for {@link Select#select(SelectClause[])}. */ -@Warmup(iterations = 3, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 5000, timeUnit = TimeUnit.MILLISECONDS) -// after the measurement time, we want to interrupt any pending methods (which might block, waiting for a partner) -// this needs to be slightly larger than the test time to avoid warnings -@Timeout(time = 5100, timeUnit = TimeUnit.MILLISECONDS) -@Fork(value = 3) +@Warmup(iterations = 3, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 4000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 2) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Group) public class SelectBenchmark { - private Channel channel1 = new Channel<>(); - private Channel channel2 = new Channel<>(); - - @Benchmark - @Group("channel") - @GroupThreads(1) - public void sendToChannel() throws InterruptedException { - channel1.send(63); - } - - @Benchmark - @Group("channel") - @GroupThreads(1) - public void receiveFromChannelUsingSelect() throws InterruptedException { - select(channel1.receiveClause()); - } - - // - - // including an iterative benchmark, for direct comparison w/ Kotlin, as we can't write a group-based benchmark - // there, due to suspended functions - private final static int OPERATIONS_PER_INVOCATION = 1_000_000; @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - @Group("single_channel_iterative") - public void sendReceiveUsingSelectSingleChannel() throws InterruptedException { + public void selectWithSingleClause() throws InterruptedException { + var ch = new Channel(); var t1 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - channel2.send(63); + ch.send(63); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -62,7 +34,7 @@ public void sendReceiveUsingSelectSingleChannel() throws InterruptedException { var t2 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - select(channel2.receiveClause()); + select(ch.receiveClause()); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -75,12 +47,13 @@ public void sendReceiveUsingSelectSingleChannel() throws InterruptedException { @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - @Group("two_channels_iterative") - public void sendReceiveUsingSelectTwoChannels() throws InterruptedException { + public void selectWithTwoClauses() throws InterruptedException { + var ch1 = new Channel(); + var ch2 = new Channel(); var t1 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION / 2; i++) { try { - channel1.send(63); + ch1.send(63); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -90,7 +63,7 @@ public void sendReceiveUsingSelectTwoChannels() throws InterruptedException { var t2 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION / 2; i++) { try { - channel2.send(63); + ch2.send(63); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -100,7 +73,7 @@ public void sendReceiveUsingSelectTwoChannels() throws InterruptedException { var t3 = Thread.startVirtualThread(() -> { for (int i = 0; i < OPERATIONS_PER_INVOCATION; i++) { try { - select(channel1.receiveClause(), channel2.receiveClause()); + select(ch1.receiveClause(), ch2.receiveClause()); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/BufferedKotlinBenchmark.kt b/bench/bench-kotlin/src/com/softwaremill/jox/BufferedKotlinBenchmark.kt index 0251990..205d6b9 100644 --- a/bench/bench-kotlin/src/com/softwaremill/jox/BufferedKotlinBenchmark.kt +++ b/bench/bench-kotlin/src/com/softwaremill/jox/BufferedKotlinBenchmark.kt @@ -21,7 +21,7 @@ open class BufferedKotlinBenchmark { @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - fun sendReceiveUsingDefaultDispatcher() { + fun channel_defaultDispatcher() { runBlocking { val channel = Channel(capacity) launch(Dispatchers.Default) { diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/ChainedKotlinBenchmark.kt b/bench/bench-kotlin/src/com/softwaremill/jox/ChainedKotlinBenchmark.kt new file mode 100644 index 0000000..242be05 --- /dev/null +++ b/bench/bench-kotlin/src/com/softwaremill/jox/ChainedKotlinBenchmark.kt @@ -0,0 +1,83 @@ +package com.softwaremill.jox + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.openjdk.jmh.annotations.* +import java.util.concurrent.TimeUnit + +// same parameters as in the java benchmark +@Warmup(iterations = 3, time = 5000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 5000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(value = 3) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +open class ChainedKotlinBenchmark { + @Param("0", "100") + var capacity: Int = 0 + + @Param("100", "1000", "10000") + var chainLength: Int = 0 + + @Benchmark + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION_CHAINED) + fun channelChain_defaultDispatcher() { + runBlocking { + // we want to measure the amount of time a send-receive pair takes + var elements = OPERATIONS_PER_INVOCATION_CHAINED / chainLength + + // create an array of channelCount channels + val channels = Array(chainLength) { Channel(capacity) } + + launch(Dispatchers.Default) { + var ch = channels[0] + for (x in 1..elements) ch.send(63) + } + + for (t in 1 until chainLength) { + val ch1 = channels[t - 1] + val ch2 = channels[t] + launch(Dispatchers.Default) { + for (x in 1..elements) ch2.send(ch1.receive()) + } + } + + launch(Dispatchers.Default) { + var ch = channels[chainLength - 1] + for (x in 1..elements) ch.receive() + } + } + } + + @Benchmark + @OperationsPerInvocation(OPERATIONS_PER_INVOCATION_CHAINED) + fun channelChain_eventLoop() { + runBlocking { + // we want to measure the amount of time a send-receive pair takes + var elements = OPERATIONS_PER_INVOCATION_CHAINED / chainLength + + // create an array of channelCount channels + val channels = Array(chainLength) { Channel(capacity) } + + launch { + var ch = channels[0] + for (x in 1..elements) ch.send(63) + } + + for (t in 1 until chainLength) { + val ch1 = channels[t - 1] + val ch2 = channels[t] + launch { + for (x in 1..elements) ch2.send(ch1.receive()) + } + } + + launch { + var ch = channels[chainLength - 1] + for (x in 1..elements) ch.receive() + } + } + } +} diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/RendezvousKotlinBenchmark.kt b/bench/bench-kotlin/src/com/softwaremill/jox/RendezvousKotlinBenchmark.kt index 2bad718..65c2880 100644 --- a/bench/bench-kotlin/src/com/softwaremill/jox/RendezvousKotlinBenchmark.kt +++ b/bench/bench-kotlin/src/com/softwaremill/jox/RendezvousKotlinBenchmark.kt @@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit open class RendezvousKotlinBenchmark { @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - fun sendReceiveUsingDefaultDispatcher() { + fun channel_defaultDispatcher() { runBlocking { val channel = Channel(0) launch(Dispatchers.Default) { diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/SelectKotlinBenchmark.kt b/bench/bench-kotlin/src/com/softwaremill/jox/SelectKotlinBenchmark.kt index 01d17c0..33895a9 100644 --- a/bench/bench-kotlin/src/com/softwaremill/jox/SelectKotlinBenchmark.kt +++ b/bench/bench-kotlin/src/com/softwaremill/jox/SelectKotlinBenchmark.kt @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit open class SelectKotlinBenchmark { @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - fun sendReceiveUsingSelect_singleChannel_defaultDispatcher() { + fun selectWithSingleClause_defaultDispatcher() { runBlocking { val channel = Channel(0) launch(Dispatchers.Default) { @@ -34,7 +34,7 @@ open class SelectKotlinBenchmark { @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) - fun sendReceiveUsingSelect_twoChannels_defaultDispatcher() { + fun selectWithTwoClauses_defaultDispatcher() { runBlocking { val channel1 = Channel(0) val channel2 = Channel(0) diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/constants.kt b/bench/bench-kotlin/src/com/softwaremill/jox/constants.kt index b880581..eb3a0f2 100644 --- a/bench/bench-kotlin/src/com/softwaremill/jox/constants.kt +++ b/bench/bench-kotlin/src/com/softwaremill/jox/constants.kt @@ -1,3 +1,4 @@ package com.softwaremill.jox const val OPERATIONS_PER_INVOCATION = 1_000_000 +const val OPERATIONS_PER_INVOCATION_CHAINED = 10_000_000 diff --git a/core/src/main/java/com/softwaremill/jox/Channel.java b/core/src/main/java/com/softwaremill/jox/Channel.java index 8bd4b49..6125478 100644 --- a/core/src/main/java/com/softwaremill/jox/Channel.java +++ b/core/src/main/java/com/softwaremill/jox/Channel.java @@ -1076,9 +1076,16 @@ record Buffered(Object value) {} final class Continuation { /** * The number of busy-looping iterations before yielding, during {@link Continuation#await(Segment, int)}. - * {@code 0}, if there's a single CPU. + * {@code 0}, if there's a single CPU. When there's no more than 4 CPUs, we use {@code 128} iterations: this is + * based on the (limited) testing that we've done with various systems. Otherwise, we use 1024 iterations. + * This might need revisiting when more testing & more benchmarks are available. */ - static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000; + static final int SPINS; + + static { + var nproc = Runtime.getRuntime().availableProcessors(); + SPINS = (nproc == 1) ? 0 : ((nproc <= 4) ? (1 << 7) : (1 << 10)); + } private final Thread creatingThread; private volatile Object data; // set using DATA var handle