Skip to content

Commit

Permalink
Fix: Avoid redundant re-starting in flattened switch observables. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
raquo committed Jan 5, 2021
1 parent 86aa84b commit e291092
Show file tree
Hide file tree
Showing 4 changed files with 480 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ class SwitchEventStream[I, O](
)

override protected[airstream] def onNext(nextValue: I, transaction: Transaction): Unit = {
removeInternalObserverFromCurrentEventStream()
val nextStream = makeStream(nextValue)
maybeCurrentEventStream = Success(nextStream)
// If we're receiving events, this stream is started, so no need to check for that
nextStream.addInternalObserver(internalEventObserver)
val isSameStream = maybeCurrentEventStream.exists { currentStream =>
currentStream.isSuccess && (currentStream.get eq nextStream)
}
if (!isSameStream) {
removeInternalObserverFromCurrentEventStream()
maybeCurrentEventStream = Success(nextStream)
// If we're receiving events, this stream is started, so no need to check for that
nextStream.addInternalObserver(internalEventObserver)
}
}

override protected[airstream] def onError(nextError: Throwable, transaction: Transaction): Unit = {
Expand Down
19 changes: 11 additions & 8 deletions src/main/scala/com/raquo/airstream/signal/SwitchSignal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ class SwitchSignal[A](
)

override protected[airstream] def onTry(nextSignalTry: Try[Signal[A]], transaction: Transaction): Unit = {
removeInternalObserverFromCurrentSignal()
currentSignalTry = nextSignalTry
val isSameSignal = nextSignalTry.isSuccess && nextSignalTry == currentSignalTry
if (!isSameSignal) {
removeInternalObserverFromCurrentSignal()
currentSignalTry = nextSignalTry

// If we're receiving events, this signal is started, so no need to check for that
nextSignalTry.foreach { nextSignal =>
nextSignal.addInternalObserver(internalEventObserver)
// If we're receiving events, this signal is started, so no need to check for that
nextSignalTry.foreach { nextSignal =>
nextSignal.addInternalObserver(internalEventObserver)
}
//println(s"> init trx from SwitchSignal.onTry")
// Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal)
new Transaction(fireTry(nextSignalTry.flatMap(_.tryNow()), _))
}
//println(s"> init trx from SwitchSignal.onTry")
// Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal)
new Transaction(fireTry(nextSignalTry.flatMap(_.tryNow()), _))
}

override protected[this] def onStart(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,294 @@ class SwitchEventStreamSpec extends UnitSpec {
effects.clear()
}

it("EventStream: emitting the same inner stream does not cause it to stop and re-start") {

implicit val owner: TestableOwner = new TestableOwner

val outerBus = new EventBus[Int]

val calculations = mutable.Buffer[Calculation[String]]()

// It's important that we reuse the exact same references to inner streams to check the logic
// - fromSeq streams are used to ensure that onStart isn't called extraneously
// - bus.events streams are used to ensure that onStop isn't called extraneously

val smallBus = new EventBus[String]

val smallStream = EventStream.merge(
smallBus.events,
EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true)
)

val bigBus = new EventBus[String]

val bigStream = EventStream.merge(
bigBus.events,
EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true)
)

val flatStream = outerBus.events.flatMap {
case i if i >= 10 => bigStream
case _ => smallStream
}.map(Calculation.log("flat", calculations))

// --

flatStream.addObserver(Observer.empty)

assert(calculations.isEmpty)

// --

outerBus.writer.onNext(1)

assert(calculations.toList == List(
Calculation("flat", "small-1"),
Calculation("flat", "small-2"),
))

calculations.clear()

// --

smallBus.writer.onNext("small-bus-1")

assert(calculations.toList == List(
Calculation("flat", "small-bus-1")
))

calculations.clear()

// --

outerBus.writer.onNext(2)

assert(calculations.isEmpty)

// --

smallBus.writer.onNext("small-bus-2")

assert(calculations.toList == List(
Calculation("flat", "small-bus-2")
))

calculations.clear()

// --

outerBus.writer.onNext(10) // #Note switch to big

assert(calculations.toList == List(
Calculation("flat", "big-1"),
Calculation("flat", "big-2")
))

calculations.clear()

// --

smallBus.writer.onNext("small bus - unrelated change")

assert(calculations.isEmpty)

// --

bigBus.writer.onNext("big-bus-1")

assert(calculations.toList == List(
Calculation("flat", "big-bus-1")
))

calculations.clear()

// --

outerBus.writer.onNext(11)

assert(calculations.isEmpty)

// --

bigBus.writer.onNext("big-bus-2")

assert(calculations.toList == List(
Calculation("flat", "big-bus-2")
))

calculations.clear()

// --

outerBus.writer.onNext(5) // #Note switch back to small

assert(calculations.isEmpty) // empty because of emitOnce = true

// --

smallBus.writer.onNext("small-bus-3")

assert(calculations.toList == List(
Calculation("flat", "small-bus-3")
))

calculations.clear()

// --

bigBus.writer.onNext("big bus - unrelated change")

assert(calculations.isEmpty)
}

it("Signal: emitting the same inner stream does not cause it to stop and re-start") {

implicit val owner: TestableOwner = new TestableOwner

val outerBus = new EventBus[Int]

val calculations = mutable.Buffer[Calculation[String]]()

// It's important that we reuse the exact same references to inner streams to check the logic
// - fromSeq streams are used to ensure that onStart isn't called extraneously
// - bus.events streams are used to ensure that onStop isn't called extraneously

val smallBus = new EventBus[String]

val smallStream = EventStream.merge(
smallBus.events,
EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true)
)

val bigBus = new EventBus[String]

val bigStream = EventStream.merge(
bigBus.events,
EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true)
)

val flatStream = outerBus.events.startWith(0).flatMap {
case i if i >= 10 => bigStream
case _ => smallStream
}.map(Calculation.log("flat", calculations))

// --

flatStream.addObserver(Observer.empty)

assert(calculations.toList == List(
Calculation("flat", "small-1"),
Calculation("flat", "small-2"),
))

calculations.clear()

// --

smallBus.writer.onNext("small-bus-0")

assert(calculations.toList == List(
Calculation("flat", "small-bus-0")
))

calculations.clear()

// --

outerBus.writer.onNext(1)

assert(calculations.isEmpty) // Signal == filter eats this up

// --

smallBus.writer.onNext("small-bus-1")

assert(calculations.toList == List(
Calculation("flat", "small-bus-1")
))

calculations.clear()

// --

outerBus.writer.onNext(2) // Signal == filter eats this up

assert(calculations.isEmpty)

// --

smallBus.writer.onNext("small-bus-2")

assert(calculations.toList == List(
Calculation("flat", "small-bus-2")
))

calculations.clear()

// --

outerBus.writer.onNext(10) // #Note switch to big

assert(calculations.toList == List(
Calculation("flat", "big-1"),
Calculation("flat", "big-2")
))

calculations.clear()

// --

smallBus.writer.onNext("small bus - unrelated change")

assert(calculations.isEmpty)

// --

bigBus.writer.onNext("big-bus-1")

assert(calculations.toList == List(
Calculation("flat", "big-bus-1")
))

calculations.clear()

// --

outerBus.writer.onNext(11)

assert(calculations.isEmpty)

// --

bigBus.writer.onNext("big-bus-2")

assert(calculations.toList == List(
Calculation("flat", "big-bus-2")
))

calculations.clear()

// --

outerBus.writer.onNext(5) // #Note switch back to small

assert(calculations.isEmpty) // empty because of emitOnce = true

// --

smallBus.writer.onNext("small-bus-3")

assert(calculations.toList == List(
Calculation("flat", "small-bus-3")
))

calculations.clear()

// --

bigBus.writer.onNext("big bus - unrelated change")

assert(calculations.isEmpty)
}

}
Loading

0 comments on commit e291092

Please sign in to comment.