Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
* fixed documentation
* added ScopedValue support
  • Loading branch information
emil-bar committed Dec 18, 2024
1 parent 489ed16 commit f296448
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
26 changes: 23 additions & 3 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package com.softwaremill.jox;

import static com.softwaremill.jox.CellState.BROKEN;
import static com.softwaremill.jox.CellState.CLOSED;
import static com.softwaremill.jox.CellState.DONE;
import static com.softwaremill.jox.CellState.INTERRUPTED_RECEIVE;
import static com.softwaremill.jox.CellState.INTERRUPTED_SEND;
import static com.softwaremill.jox.CellState.IN_BUFFER;
import static com.softwaremill.jox.CellState.RESUMING;
import static com.softwaremill.jox.Segment.findAndMoveForward;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
Expand All @@ -8,9 +17,6 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static com.softwaremill.jox.CellState.*;
import static com.softwaremill.jox.Segment.findAndMoveForward;

/**
* Channel is a thread-safe data structure which exposes three basic operations:
* <p>
Expand Down Expand Up @@ -87,6 +93,13 @@ operations on these (previous) segments, and we'll end up wanting to remove such
operations won't use them, so the relinking won't be useful.
*/

/**
* Can be used with {@link Channel#withScopedBufferSize()} to pass buffer size value from scope.
* e.g. `ScopedValues.where(BUFFER_SIZE, 8).run(() -> Channel.withScopedBufferSize())` will create a channel with buffer size = 8
* **/
public static final ScopedValue<Integer> BUFFER_SIZE = ScopedValue.newInstance();
public static final int DEFAULT_BUFFER_SIZE = 16;

// immutable state

private final int capacity;
Expand Down Expand Up @@ -202,6 +215,13 @@ public static <T> Channel<T> newUnlimitedChannel() {
return new Channel<>(UNLIMITED_CAPACITY);
}

/**
* Allows for creating Channel with buffer size specified in scope by {@link ScopedValue} {@link Channel#BUFFER_SIZE}
*/
public static <T> Channel<T> withScopedBufferSize() {
return new Channel<>(BUFFER_SIZE.orElse(DEFAULT_BUFFER_SIZE));
}

private static final int UNLIMITED_CAPACITY = -1;

// *******
Expand Down
19 changes: 11 additions & 8 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.softwaremill.jox.flows;


import com.softwaremill.jox.Channel;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.UnsupervisedScope;
import static com.softwaremill.jox.flows.Flows.usingEmit;
import static com.softwaremill.jox.structured.Scopes.unsupervised;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -16,8 +15,9 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static com.softwaremill.jox.flows.Flows.usingEmit;
import static com.softwaremill.jox.structured.Scopes.unsupervised;
import com.softwaremill.jox.Channel;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.UnsupervisedScope;

/**
* Describes an asynchronous transformation pipeline. When run, emits elements of type `T`.
Expand Down Expand Up @@ -60,15 +60,18 @@ public List<T> runToList() throws Exception {
/** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result
* of this method.
* <p>
* By default, buffer capacity is unlimited.
* Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used.
* <p>
* Blocks until the flow completes.
* Method does not block until the flow completes.
*
* @param scope
* Required for creating async forks responsible for writing to channel
*/
public Source<T> runToChannel(UnsupervisedScope scope) {
if (last instanceof SourceBackedFlowStage<T>(Source<T> source)) {
return source;
} else {
Channel<T> channel = new Channel<>();
Channel<T> channel = Channel.withScopedBufferSize();
runLastToChannelAsync(scope, channel);
return channel;
}
Expand Down
33 changes: 28 additions & 5 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package com.softwaremill.jox.flows;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.ArrayList;
import java.util.List;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

class FlowTest {

@Test
Expand Down Expand Up @@ -55,6 +57,27 @@ void shouldRunToChannel() throws Throwable {
});
}

@Test
void shouldRunToChannelWithBufferSizeDefinedInScope() throws Throwable {
ScopedValue.where(Channel.BUFFER_SIZE, 2).call(() -> {
Scopes.unsupervised(scope -> {
// given
Flow<Integer> flow = Flows.fromValues(1, 2, 3);

// when
Source<Integer> source = flow.runToChannel(scope);

// then
assertEquals(1, source.receive());
assertEquals(2, source.receive());
assertEquals(3, source.receive());
return null;
});
return null;
});
}


@Test
void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable {
Scopes.unsupervised(scope -> {
Expand Down

0 comments on commit f296448

Please sign in to comment.