From b720a5bc1c877b1938e74c9ec6a93c5fe478457c Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 25 Mar 2023 10:30:16 +0000 Subject: [PATCH 01/11] * fixing interruption behaviour * adding error & interruption propagation and integrity test --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- .../scala/fs2/StreamCombinatorsSuite.scala | 74 ++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 96f09ec383..c150e0bd12 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1471,7 +1471,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } def endSupply(result: Either[Throwable, Unit]): F2[Unit] = - buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue) + buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue + outputLong) def endDemand(result: Either[Throwable, Unit]): F2[Unit] = buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index e48fe36efa..9280f1e812 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -23,7 +23,7 @@ package fs2 import cats.effect.kernel.Deferred import cats.effect.kernel.Ref -import cats.effect.std.{Semaphore, Queue} +import cats.effect.std.{Queue, Semaphore} import cats.effect.testkit.TestControl import cats.effect.{IO, SyncIO} import cats.syntax.all._ @@ -34,6 +34,7 @@ import org.scalacheck.Prop.forAll import scala.concurrent.duration._ import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { @@ -833,6 +834,77 @@ class StreamCombinatorsSuite extends Fs2Suite { ) .assertEquals(0.millis) } + + test("upstream failures are propagated downstream") { + + case object SevenNotAllowed extends NoStackTrace + + val source = Stream + .unfold(0)(s => Some((s, s + 1))) + .covary[IO] + .evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n)) + + val downstream = source.groupWithin(100, 2.seconds) + + downstream.compile.lastOrError.intercept[SevenNotAllowed.type] + } + + test( + "upstream interruption causes immediate downstream termination with all elements being emitted" + ) { + + val sourceTimeout = 5.5.seconds + val downstreamTimeout = sourceTimeout + 2.seconds + + TestControl + .executeEmbed( + Ref[IO] + .of(0.millis) + .flatMap { ref => + val source: Stream[IO, Int] = + Stream + .unfold(0)(s => Some((s, s + 1))) + .covary[IO] + .meteredStartImmediately(1.second) + .interruptAfter(sourceTimeout) + + // large chunkSize and timeout (no emissions expected in the window + // specified, unless source ends, due to interruption or + // natural termination (i.e runs out of elements) + val downstream: Stream[IO, Chunk[Int]] = + source.groupWithin(Int.MaxValue, 1.day) + + downstream.compile.lastOrError + .map(_.toList) + .timeout(downstreamTimeout) + .flatTap(_ => IO.monotonic.flatMap(ref.set)) + .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) + } + ) + .assertEquals( + // downstream ended immediately (i.e timeLapsed = sourceTimeout) + // emitting whatever was accumulated at the time of interruption + (sourceTimeout, List(0, 1, 2, 3, 4, 5)) + ) + } + + test("stress test: all elements are processed") { + + val rangeLength = 1000000 + + Stream + .eval(Ref.of[IO, Int](0)) + .flatMap { counter => + Stream + .range(0, rangeLength) + .covary[IO] + .groupWithin(4096, 100.micros) + .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) + } + .compile + .lastOrError + .assertEquals(rangeLength) + } } property("head")(forAll((s: Stream[Pure, Int]) => assertEquals(s.head.toList, s.toList.take(1)))) From b03e1f87241b5e0f13ce57092e9a0de0d6858861 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 25 Mar 2023 10:56:19 +0000 Subject: [PATCH 02/11] fmt --- core/shared/src/main/scala/fs2/Stream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index c150e0bd12..c067c14720 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1471,7 +1471,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } def endSupply(result: Either[Throwable, Unit]): F2[Unit] = - buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue + outputLong) + buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN( + Int.MaxValue + outputLong + ) def endDemand(result: Either[Throwable, Unit]): F2[Unit] = buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue) From 4955b5137d5261eb4516617a824ba407da179b8d Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 25 Mar 2023 14:25:25 +0000 Subject: [PATCH 03/11] reducing rangeLength by a factor of 10 to avoid timeout on CI --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 9280f1e812..7e8210d76a 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -890,7 +890,7 @@ class StreamCombinatorsSuite extends Fs2Suite { test("stress test: all elements are processed") { - val rangeLength = 1000000 + val rangeLength = 100000 Stream .eval(Ref.of[IO, Int](0)) From 8182c8ef39aabfb4d9908c18b871b078c434176e Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Fri, 31 Mar 2023 22:08:14 +0100 Subject: [PATCH 04/11] simplifying stream, using testcontrol --- .../scala/fs2/StreamCombinatorsSuite.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 5962c5360e..cc411cbd69 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -837,17 +837,18 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("upstream failures are propagated downstream") { + TestControl.executeEmbed { + case object SevenNotAllowed extends NoStackTrace - case object SevenNotAllowed extends NoStackTrace - - val source = Stream - .unfold(0)(s => Some((s, s + 1))) - .covary[IO] - .evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n)) + val source = Stream + .iterate(0)(_ + 1) + .covary[IO] + .evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n)) - val downstream = source.groupWithin(100, 2.seconds) + val downstream = source.groupWithin(100, 2.seconds) - downstream.compile.lastOrError.intercept[SevenNotAllowed.type] + downstream.intercept[SevenNotAllowed.type] + } } test( @@ -864,7 +865,7 @@ class StreamCombinatorsSuite extends Fs2Suite { .flatMap { ref => val source: Stream[IO, Int] = Stream - .unfold(0)(s => Some((s, s + 1))) + .iterate(0)(_ + 1) .covary[IO] .meteredStartImmediately(1.second) .interruptAfter(sourceTimeout) From fdc2d16b8fd17162454017ea3b0211b99e58892b Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Fri, 31 Mar 2023 22:17:29 +0100 Subject: [PATCH 05/11] minor --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index cc411cbd69..16c742c8d3 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -843,7 +843,7 @@ class StreamCombinatorsSuite extends Fs2Suite { val source = Stream .iterate(0)(_ + 1) .covary[IO] - .evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n)) + .evalTap(n => IO.raiseError(SevenNotAllowed).whenA(n == 7)) val downstream = source.groupWithin(100, 2.seconds) From d0f0a3a0908cdeeac6588b41dca5f7e050bf5d2d Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Fri, 31 Mar 2023 23:27:56 +0100 Subject: [PATCH 06/11] reducing rangeLength by a factor of 10 to prevent timeout on js --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 16c742c8d3..7c8a50cdb6 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -891,8 +891,7 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("stress test: all elements are processed") { - - val rangeLength = 100000 + val rangeLength = 10000 Stream .eval(Ref.of[IO, Int](0)) From ff688bedaed3e588220515882cee444e08851299 Mon Sep 17 00:00:00 2001 From: Angel-O <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 16:52:05 +0100 Subject: [PATCH 07/11] Update core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala Co-authored-by: Arman Bilge --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 7c8a50cdb6..39a0036883 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -878,9 +878,7 @@ class StreamCombinatorsSuite extends Fs2Suite { downstream.compile.lastOrError .map(_.toList) - .timeout(downstreamTimeout) - .flatTap(_ => IO.monotonic.flatMap(ref.set)) - .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) + .timed } ) .assertEquals( From c73ab66d8695744f85ae6bbea4ac3b4da926781b Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 16:55:28 +0100 Subject: [PATCH 08/11] removing unused ref --- .../scala/fs2/StreamCombinatorsSuite.scala | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 39a0036883..2d07f00514 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -859,28 +859,24 @@ class StreamCombinatorsSuite extends Fs2Suite { val downstreamTimeout = sourceTimeout + 2.seconds TestControl - .executeEmbed( - Ref[IO] - .of(0.millis) - .flatMap { ref => - val source: Stream[IO, Int] = - Stream - .iterate(0)(_ + 1) - .covary[IO] - .meteredStartImmediately(1.second) - .interruptAfter(sourceTimeout) - - // large chunkSize and timeout (no emissions expected in the window - // specified, unless source ends, due to interruption or - // natural termination (i.e runs out of elements) - val downstream: Stream[IO, Chunk[Int]] = - source.groupWithin(Int.MaxValue, 1.day) - - downstream.compile.lastOrError - .map(_.toList) - .timed - } - ) + .executeEmbed { + val source: Stream[IO, Int] = + Stream + .iterate(0)(_ + 1) + .covary[IO] + .meteredStartImmediately(1.second) + .interruptAfter(sourceTimeout) + + // large chunkSize and timeout (no emissions expected in the window + // specified, unless source ends, due to interruption or + // natural termination (i.e runs out of elements) + val downstream: Stream[IO, Chunk[Int]] = + source.groupWithin(Int.MaxValue, 1.day) + + downstream.compile.lastOrError + .map(_.toList) + .timed + } .assertEquals( // downstream ended immediately (i.e timeLapsed = sourceTimeout) // emitting whatever was accumulated at the time of interruption From b64eb5ae9e42fc5934c0bb51955904b048e7e5c5 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 16:57:20 +0100 Subject: [PATCH 09/11] downstreamtimeout --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 2d07f00514..8f43bd1afe 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -874,6 +874,7 @@ class StreamCombinatorsSuite extends Fs2Suite { source.groupWithin(Int.MaxValue, 1.day) downstream.compile.lastOrError + .timeout(downstreamTimeout) .map(_.toList) .timed } From 8d5759015c416e04781a905e30f0100028a558e8 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 17:05:44 +0100 Subject: [PATCH 10/11] adding assertion --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 8f43bd1afe..3bf473d795 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -845,9 +845,11 @@ class StreamCombinatorsSuite extends Fs2Suite { .covary[IO] .evalTap(n => IO.raiseError(SevenNotAllowed).whenA(n == 7)) - val downstream = source.groupWithin(100, 2.seconds) + val downstream = source.groupWithin(100, 2.seconds).map(_.toList) - downstream.intercept[SevenNotAllowed.type] + val expected = List(List(1, 2, 3, 4, 5, 6)) + + downstream.assertEmits(expected).intercept[SevenNotAllowed.type] } } From 9ae18d78b5da9d4e628376c059254db40051a3ba Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 19:53:20 +0100 Subject: [PATCH 11/11] supply increase made clear --- core/shared/src/main/scala/fs2/Stream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index bd7f32f836..91624b63f1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1472,7 +1472,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def endSupply(result: Either[Throwable, Unit]): F2[Unit] = buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN( - Int.MaxValue + outputLong + // enough supply for 2 iterations of the race loop in case of upstream + // interruption: so that downstream can terminate immediately + outputLong * 2 ) def endDemand(result: Either[Throwable, Unit]): F2[Unit] =