Skip to content

Commit

Permalink
Review comments:
Browse files Browse the repository at this point in the history
* removed @WithUnsupervisedScope
  • Loading branch information
emil-bar committed Dec 16, 2024
1 parent 1828065 commit 489ed16
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 170 deletions.
91 changes: 49 additions & 42 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.structured.UnsupervisedScope;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -34,20 +35,22 @@ void shouldEmitGroupedElementsAndIncludeRemainingValuesWhenFlowCloses() throws E
}

@Test
@WithUnsupervisedScope
void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed(UnsupervisedScope scope) throws InterruptedException {
// given
RuntimeException failure = new RuntimeException();

// when
Object result = Flows.failed(failure)
.grouped(3)
.runToChannel(scope)
.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, result);
assertEquals(failure, ((ChannelError) result).cause());
void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed() throws InterruptedException, ExecutionException {
Scopes.unsupervised(scope -> {
// given
RuntimeException failure = new RuntimeException();

// when
Object result = Flows.failed(failure)
.grouped(3)
.runToChannel(scope)
.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, result);
assertEquals(failure, ((ChannelError) result).cause());
return null;
});
}

@Test
Expand All @@ -62,34 +65,38 @@ void shouldEmitGroupedElementsWithCustomCostFunction() throws Exception {
}

@Test
@WithUnsupervisedScope
void shouldReturnFailedFlowWhenCostFunctionThrowsException(UnsupervisedScope scope) {
// when
ChannelClosedException exception = assertThrows(ChannelClosedException.class, () ->
Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7)
.groupedWeighted(150, n -> (long) (100 / n))
.runToChannel(scope)
.forEach(i -> {
}));

// then
assertInstanceOf(ArithmeticException.class, exception.getCause());
void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws ExecutionException, InterruptedException {
Scopes.unsupervised(scope -> {
// when
ChannelClosedException exception = assertThrows(ChannelClosedException.class, () ->
Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7)
.groupedWeighted(150, n -> (long) (100 / n))
.runToChannel(scope)
.forEach(i -> {
}));

// then
assertInstanceOf(ArithmeticException.class, exception.getCause());
return null;
});
}

@Test
@WithUnsupervisedScope
void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed(UnsupervisedScope scope) throws InterruptedException {
// given
RuntimeException failure = new RuntimeException();

// when
Object result = Flows.failed(failure)
.groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2)
.runToChannel(scope)
.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, result);
assertEquals(failure, ((ChannelError) result).cause());
void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException, ExecutionException {
Scopes.unsupervised(scope -> {
// given
RuntimeException failure = new RuntimeException();

// when
Object result = Flows.failed(failure)
.groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2)
.runToChannel(scope)
.receiveOrClosed();

// then
assertInstanceOf(ChannelError.class, result);
assertEquals(failure, ((ChannelError) result).cause());
return null;
});
}
}
}
52 changes: 28 additions & 24 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.UnsupervisedScope;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -39,32 +39,36 @@ void shouldRunToList() throws Throwable {
}

@Test
@WithUnsupervisedScope
void shouldRunToChannel(UnsupervisedScope scope) throws Throwable {
// given
Flow<Integer> flow = Flows.fromValues(1, 2, 3);

// when
Source<Integer> source = flow.runToChannel(scope);

// then
assertEquals(1, source.receive());
assertEquals(2, source.receive());
assertEquals(3, source.receive());
void shouldRunToChannel() throws Throwable {
Scopes.unsupervised(scope -> {
// given
Flow<Integer> flow = Flows.fromValues(1, 2, 3);

// when
Source<Integer> source = flow.runToChannel(scope);

// then
assertEquals(1, source.receive());
assertEquals(2, source.receive());
assertEquals(3, source.receive());
return null;
});
}

@Test
@WithUnsupervisedScope
void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow(UnsupervisedScope scope) throws Throwable {
// given
Channel<Integer> channel = Channel.newUnlimitedChannel();
Flow<Integer> flow = Flows.fromSource(channel);

// when
Source<Integer> receivedChannel = flow.runToChannel(scope);

// then
assertEquals(channel, receivedChannel);
void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable {
Scopes.unsupervised(scope -> {
// given
Channel<Integer> channel = Channel.newUnlimitedChannel();
Flow<Integer> flow = Flows.fromSource(channel);

// when
Source<Integer> receivedChannel = flow.runToChannel(scope);

// then
assertEquals(channel, receivedChannel);
return null;
});
}

@Test
Expand Down
113 changes: 62 additions & 51 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.UnsupervisedScope;
import com.softwaremill.jox.structured.Scopes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -13,6 +13,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -26,11 +27,13 @@ void shouldBeEmpty() throws Exception {
}

@Test
@WithUnsupervisedScope
void shouldCreateFlowFromFork(UnsupervisedScope scope) throws Exception {
Fork<Integer> f = scope.forkUnsupervised(() -> 1);
Flow<Integer> c = Flows.fromFork(f);
assertEquals(List.of(1), c.runToList());
void shouldCreateFlowFromFork() throws Exception {
Scopes.unsupervised(scope -> {
Fork<Integer> f = scope.forkUnsupervised(() -> 1);
Flow<Integer> c = Flows.fromFork(f);
assertEquals(List.of(1), c.runToList());
return null;
});
}

@Test
Expand All @@ -50,15 +53,17 @@ void shouldProduceRange() throws Exception {
}

@Test
@WithUnsupervisedScope
public void shouldFailOnReceive(UnsupervisedScope scope) throws Exception {
// when
Flow<String> s = Flows.failed(new RuntimeException("boom"));
public void shouldFailOnReceive() throws Exception {
Scopes.unsupervised(scope -> {
// when
Flow<String> s = Flows.failed(new RuntimeException("boom"));

// then
Object received = s.runToChannel(scope).receiveOrClosed();
assertInstanceOf(ChannelError.class, received);
assertEquals("boom", ((ChannelError) received).cause().getMessage());
// then
Object received = s.runToChannel(scope).receiveOrClosed();
assertInstanceOf(ChannelError.class, received);
assertEquals("boom", ((ChannelError) received).cause().getMessage());
return null;
});
}

@Test
Expand All @@ -85,17 +90,19 @@ void shouldReturnFutureValue() throws Exception {
}

@Test
@WithUnsupervisedScope
void shouldReturnFuturesSourceValues(UnsupervisedScope scope) throws Exception {
// given
CompletableFuture<Source<Integer>> completableFuture = CompletableFuture
.completedFuture(Flows.fromValues(1, 2).runToChannel(scope));
void shouldReturnFuturesSourceValues() throws Exception {
Scopes.unsupervised(scope -> {
// given
CompletableFuture<Source<Integer>> completableFuture = CompletableFuture
.completedFuture(Flows.fromValues(1, 2).runToChannel(scope));

// when
List<Integer> result = Flows.fromFutureSource(completableFuture).runToList();
// when
List<Integer> result = Flows.fromFutureSource(completableFuture).runToList();

// then
assertEquals(List.of(1, 2), result);
// then
assertEquals(List.of(1, 2), result);
return null;
});
}

@Test
Expand Down Expand Up @@ -155,41 +162,45 @@ void shouldRepeatTheSameElement() throws Exception {

@Test
@Timeout(value = 1, unit = TimeUnit.SECONDS)
@WithUnsupervisedScope
void shouldTickRegularly(UnsupervisedScope scope) throws InterruptedException {
var c = Flows.tick(Duration.ofMillis(100), 1L)
.runToChannel(scope);
var start = System.currentTimeMillis();
void shouldTickRegularly() throws InterruptedException, ExecutionException {
Scopes.unsupervised(scope -> {
var c = Flows.tick(Duration.ofMillis(100), 1L)
.runToChannel(scope);
var start = System.currentTimeMillis();

c.receive();
assertTrue(System.currentTimeMillis() - start >= 0);
assertTrue(System.currentTimeMillis() - start <= 50);
c.receive();
assertTrue(System.currentTimeMillis() - start >= 0);
assertTrue(System.currentTimeMillis() - start <= 50);

c.receive();
assertTrue(System.currentTimeMillis() - start >= 100);
assertTrue(System.currentTimeMillis() - start <= 150);
c.receive();
assertTrue(System.currentTimeMillis() - start >= 100);
assertTrue(System.currentTimeMillis() - start <= 150);

c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);
c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);
return null;
});
}

@Test
@Timeout(value = 1, unit = TimeUnit.SECONDS)
@WithUnsupervisedScope
void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal(UnsupervisedScope scope) throws InterruptedException {
var c = Flows.tick(Duration.ofMillis(100), 1L)
.runToChannel(scope);
var start = System.currentTimeMillis();

Thread.sleep(200);
c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);

c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);
void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal() throws InterruptedException, ExecutionException {
Scopes.unsupervised(scope -> {
var c = Flows.tick(Duration.ofMillis(100), 1L)
.runToChannel(scope);
var start = System.currentTimeMillis();

Thread.sleep(200);
c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);

c.receive();
assertTrue(System.currentTimeMillis() - start >= 200);
assertTrue(System.currentTimeMillis() - start <= 250);
return null;
});
}

@Test
Expand Down

This file was deleted.

Loading

0 comments on commit 489ed16

Please sign in to comment.