From e291092e1c56bd3f424a10489f7e2b91211625b0 Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Tue, 5 Jan 2021 00:38:12 -0800 Subject: [PATCH] Fix: Avoid redundant re-starting in flattened switch observables. Fixes #55 --- .../eventstream/SwitchEventStream.scala | 13 +- .../raquo/airstream/signal/SwitchSignal.scala | 19 +- .../eventstream/SwitchEventStreamSpec.scala | 290 ++++++++++++++++++ .../airstream/signal/SwitchSignalSpec.scala | 170 ++++++++++ 4 files changed, 480 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/raquo/airstream/eventstream/SwitchEventStream.scala b/src/main/scala/com/raquo/airstream/eventstream/SwitchEventStream.scala index 028b8742..32229dcd 100644 --- a/src/main/scala/com/raquo/airstream/eventstream/SwitchEventStream.scala +++ b/src/main/scala/com/raquo/airstream/eventstream/SwitchEventStream.scala @@ -53,11 +53,16 @@ class SwitchEventStream[I, O]( ) override protected[airstream] def onNext(nextValue: I, transaction: Transaction): Unit = { - removeInternalObserverFromCurrentEventStream() val nextStream = makeStream(nextValue) - maybeCurrentEventStream = Success(nextStream) - // If we're receiving events, this stream is started, so no need to check for that - nextStream.addInternalObserver(internalEventObserver) + val isSameStream = maybeCurrentEventStream.exists { currentStream => + currentStream.isSuccess && (currentStream.get eq nextStream) + } + if (!isSameStream) { + removeInternalObserverFromCurrentEventStream() + maybeCurrentEventStream = Success(nextStream) + // If we're receiving events, this stream is started, so no need to check for that + nextStream.addInternalObserver(internalEventObserver) + } } override protected[airstream] def onError(nextError: Throwable, transaction: Transaction): Unit = { diff --git a/src/main/scala/com/raquo/airstream/signal/SwitchSignal.scala b/src/main/scala/com/raquo/airstream/signal/SwitchSignal.scala index 6b0e91f6..00755826 100644 --- a/src/main/scala/com/raquo/airstream/signal/SwitchSignal.scala +++ b/src/main/scala/com/raquo/airstream/signal/SwitchSignal.scala @@ -32,16 +32,19 @@ class SwitchSignal[A]( ) override protected[airstream] def onTry(nextSignalTry: Try[Signal[A]], transaction: Transaction): Unit = { - removeInternalObserverFromCurrentSignal() - currentSignalTry = nextSignalTry + val isSameSignal = nextSignalTry.isSuccess && nextSignalTry == currentSignalTry + if (!isSameSignal) { + removeInternalObserverFromCurrentSignal() + currentSignalTry = nextSignalTry - // If we're receiving events, this signal is started, so no need to check for that - nextSignalTry.foreach { nextSignal => - nextSignal.addInternalObserver(internalEventObserver) + // If we're receiving events, this signal is started, so no need to check for that + nextSignalTry.foreach { nextSignal => + nextSignal.addInternalObserver(internalEventObserver) + } + //println(s"> init trx from SwitchSignal.onTry") + // Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal) + new Transaction(fireTry(nextSignalTry.flatMap(_.tryNow()), _)) } - //println(s"> init trx from SwitchSignal.onTry") - // Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal) - new Transaction(fireTry(nextSignalTry.flatMap(_.tryNow()), _)) } override protected[this] def onStart(): Unit = { diff --git a/src/test/scala/com/raquo/airstream/eventstream/SwitchEventStreamSpec.scala b/src/test/scala/com/raquo/airstream/eventstream/SwitchEventStreamSpec.scala index bd32ecf3..530d0629 100644 --- a/src/test/scala/com/raquo/airstream/eventstream/SwitchEventStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/eventstream/SwitchEventStreamSpec.scala @@ -252,4 +252,294 @@ class SwitchEventStreamSpec extends UnitSpec { effects.clear() } + it("EventStream: emitting the same inner stream does not cause it to stop and re-start") { + + implicit val owner: TestableOwner = new TestableOwner + + val outerBus = new EventBus[Int] + + val calculations = mutable.Buffer[Calculation[String]]() + + // It's important that we reuse the exact same references to inner streams to check the logic + // - fromSeq streams are used to ensure that onStart isn't called extraneously + // - bus.events streams are used to ensure that onStop isn't called extraneously + + val smallBus = new EventBus[String] + + val smallStream = EventStream.merge( + smallBus.events, + EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true) + ) + + val bigBus = new EventBus[String] + + val bigStream = EventStream.merge( + bigBus.events, + EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) + ) + + val flatStream = outerBus.events.flatMap { + case i if i >= 10 => bigStream + case _ => smallStream + }.map(Calculation.log("flat", calculations)) + + // -- + + flatStream.addObserver(Observer.empty) + + assert(calculations.isEmpty) + + // -- + + outerBus.writer.onNext(1) + + assert(calculations.toList == List( + Calculation("flat", "small-1"), + Calculation("flat", "small-2"), + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(2) + + assert(calculations.isEmpty) + + // -- + + smallBus.writer.onNext("small-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(10) // #Note switch to big + + assert(calculations.toList == List( + Calculation("flat", "big-1"), + Calculation("flat", "big-2") + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small bus - unrelated change") + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(11) + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(5) // #Note switch back to small + + assert(calculations.isEmpty) // empty because of emitOnce = true + + // -- + + smallBus.writer.onNext("small-bus-3") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-3") + )) + + calculations.clear() + + // -- + + bigBus.writer.onNext("big bus - unrelated change") + + assert(calculations.isEmpty) + } + + it("Signal: emitting the same inner stream does not cause it to stop and re-start") { + + implicit val owner: TestableOwner = new TestableOwner + + val outerBus = new EventBus[Int] + + val calculations = mutable.Buffer[Calculation[String]]() + + // It's important that we reuse the exact same references to inner streams to check the logic + // - fromSeq streams are used to ensure that onStart isn't called extraneously + // - bus.events streams are used to ensure that onStop isn't called extraneously + + val smallBus = new EventBus[String] + + val smallStream = EventStream.merge( + smallBus.events, + EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true) + ) + + val bigBus = new EventBus[String] + + val bigStream = EventStream.merge( + bigBus.events, + EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) + ) + + val flatStream = outerBus.events.startWith(0).flatMap { + case i if i >= 10 => bigStream + case _ => smallStream + }.map(Calculation.log("flat", calculations)) + + // -- + + flatStream.addObserver(Observer.empty) + + assert(calculations.toList == List( + Calculation("flat", "small-1"), + Calculation("flat", "small-2"), + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small-bus-0") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-0") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(1) + + assert(calculations.isEmpty) // Signal == filter eats this up + + // -- + + smallBus.writer.onNext("small-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(2) // Signal == filter eats this up + + assert(calculations.isEmpty) + + // -- + + smallBus.writer.onNext("small-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(10) // #Note switch to big + + assert(calculations.toList == List( + Calculation("flat", "big-1"), + Calculation("flat", "big-2") + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small bus - unrelated change") + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(11) + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(5) // #Note switch back to small + + assert(calculations.isEmpty) // empty because of emitOnce = true + + // -- + + smallBus.writer.onNext("small-bus-3") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-3") + )) + + calculations.clear() + + // -- + + bigBus.writer.onNext("big bus - unrelated change") + + assert(calculations.isEmpty) + } + } diff --git a/src/test/scala/com/raquo/airstream/signal/SwitchSignalSpec.scala b/src/test/scala/com/raquo/airstream/signal/SwitchSignalSpec.scala index 24337f28..9eaa13d5 100644 --- a/src/test/scala/com/raquo/airstream/signal/SwitchSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/signal/SwitchSignalSpec.scala @@ -3,6 +3,8 @@ package com.raquo.airstream.signal import com.raquo.airstream.UnitSpec import com.raquo.airstream.core.Observable.MetaObservable import com.raquo.airstream.core.Observer +import com.raquo.airstream.eventbus.EventBus +import com.raquo.airstream.eventstream.EventStream import com.raquo.airstream.fixtures.{Calculation, Effect, TestableOwner} import scala.collection.mutable @@ -175,4 +177,172 @@ class SwitchSignalSpec extends UnitSpec { effects.clear() } + it("Signal: emitting the same inner signal does not cause it to stop and re-start") { + + implicit val owner: TestableOwner = new TestableOwner + + val calculations = mutable.Buffer[Calculation[String]]() + + // It's important that we reuse the exact same references to inner signals to check the logic + // - fromSeq streams are used to ensure that onStart isn't called extraneously + // - bus.events streams are used to ensure that onStop isn't called extraneously + + val outerBus = new EventBus[Int] + + val smallBus = new EventBus[String] + + val bigBus = new EventBus[String] + + val smallSignal = EventStream.merge( + smallBus.events, + EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true) + ).startWith("small-0") + + val bigSignal = EventStream.merge( + bigBus.events, + EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) + ).startWith("big-0") + + val flatSignal = outerBus.events.startWith(0).flatMap { + case i if i >= 10 => bigSignal + case _ => smallSignal + }.map(Calculation.log("flat", calculations)) + + // -- + + flatSignal.addObserver(Observer.empty) + + assert(calculations.toList == List( + Calculation("flat", "small-0"), + Calculation("flat", "small-1"), + Calculation("flat", "small-2"), + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small-bus-0") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-0") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(1) + + assert(calculations.isEmpty) // Signal == filter eats this up + + // -- + + smallBus.writer.onNext("small-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(2) // Signal == filter eats this up + + assert(calculations.isEmpty) + + // -- + + smallBus.writer.onNext("small-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(10) // #Note switch to big + + // @TODO[API] I expected `big-0` to be emitted here first, because that was the initial state of `bigSignal` + // - However, the reason why it's not emitted is kinda compelling. I'm not sure how it should behave. + // - The switching logic first starts the signal that it's switching to (if it wasn't started already, like in this case) + // and then it schedules a transaction to emit the signal's current value. + // - But if the inner signal being started emits new events in new transactions, those transactions will be scheduled + // BEFORE the transaction in the switching logic, and so those transactions will have a chance to update the + // inner signal's current state before it has a chance to be observed by the switching transaction. + // - I guess you could say the idea here is that the signal should be fully started before we read its current value, + // but I'm not sure if I'm buying this. + // - Keep in mind this is not only about observing the original initial value, but also about observing stale values + // of signals that haven't been running. I think the solution to this might depend on https://github.com/raquo/Airstream/issues/43 + // - Maybe what needs to change is EventStream.fromSeq's timing... + // - So well, it is what it is for now. We should look into this some time (this behaviour didn't change in this commit) + assert(calculations.toList == List( + //Calculation("flat", "big-0"), + Calculation("flat", "big-1"), + Calculation("flat", "big-2") + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small bus - unrelated change") + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-1") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-1") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(11) + + assert(calculations.isEmpty) + + // -- + + bigBus.writer.onNext("big-bus-2") + + assert(calculations.toList == List( + Calculation("flat", "big-bus-2") + )) + + calculations.clear() + + // -- + + outerBus.writer.onNext(5) // #Note switch back to small + + assert(calculations.toList == List( + Calculation("flat", "small-bus-2") // Restore current value of small signal + )) + + calculations.clear() + + // -- + + smallBus.writer.onNext("small-bus-3") + + assert(calculations.toList == List( + Calculation("flat", "small-bus-3") + )) + + calculations.clear() + + // -- + + bigBus.writer.onNext("big bus - unrelated change") + + assert(calculations.isEmpty) + } }