Skip to content

Commit

Permalink
Fix: Delay operator now clears the pending queue when stopped
Browse files Browse the repository at this point in the history
Previously, if you stopped the delayed stream and then immediately started it, delayed events scheduled before the stream was stopped would fire after it was re-started if their delays did not complete while the stream was stopped.
  • Loading branch information
raquo committed Dec 26, 2020
1 parent bf18b86 commit eaca9ca
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.raquo.airstream.core.Transaction
import com.raquo.airstream.features.{InternalNextErrorObserver, SingleParentObservable}

import scala.scalajs.js
import scala.scalajs.js.timers.SetTimeoutHandle

class DelayEventStream[A](
override protected val parent: EventStream[A],
Expand All @@ -13,16 +14,30 @@ class DelayEventStream[A](
/** Async stream, so reset rank */
override protected[airstream] val topoRank: Int = 1

private val timerHandles: js.Array[SetTimeoutHandle] = js.Array()

override protected[airstream] def onNext(nextValue: A, transaction: Transaction): Unit = {
js.timers.setTimeout(delayMillis) {
var timerHandle: SetTimeoutHandle = null
timerHandle = js.timers.setTimeout(delayMillis) {
//println(s"> init trx from DelayEventStream.onNext($nextValue)")
timerHandles.splice(timerHandles.indexOf(timerHandle), deleteCount = 1) // Remove handle
new Transaction(fireValue(nextValue, _))
}
timerHandles.push(timerHandle)
}

override def onError(nextError: Throwable, transaction: Transaction): Unit = {
js.timers.setTimeout(delayMillis) {
var timerHandle: SetTimeoutHandle = null
timerHandle = js.timers.setTimeout(delayMillis) {
timerHandles.splice(timerHandles.indexOf(timerHandle), deleteCount = 1) // Remove handle
new Transaction(fireError(nextError, _))
}
timerHandles.push(timerHandle)
}

override protected[this] def onStop(): Unit = {
timerHandles.foreach(js.timers.clearTimeout)
timerHandles.length = 0 // Clear array
super.onStop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.raquo.airstream.eventstream

import com.raquo.airstream.AsyncUnitSpec
import com.raquo.airstream.core.Observer
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import org.scalatest.BeforeAndAfter

import scala.collection.mutable

class DelayEventStreamSpec extends AsyncUnitSpec with BeforeAndAfter {

implicit val owner = new TestableOwner

val effects = mutable.Buffer[Effect[Int]]()

val obs1 = Observer[Int](effects += Effect("obs1", _))

before {
owner.killSubscriptions()
effects.clear()
}


it("events are delayed, and purged on stop") {
val bus = new EventBus[Int]
val stream = bus.events.delay(30)

val sub = stream.addObserver(obs1)

delay {
effects shouldEqual mutable.Buffer()

// --

bus.writer.onNext(1)

effects shouldEqual mutable.Buffer()

}.flatMap[Unit] { _ =>
delay(30) {
effects shouldEqual mutable.Buffer(Effect("obs1", 1))
effects.clear()

bus.writer.onNext(2)
bus.writer.onNext(3)

effects shouldEqual mutable.Buffer()
}
}.flatMap[Unit] { _ =>
delay(30) {
effects shouldEqual mutable.Buffer(Effect("obs1", 2), Effect("obs1", 3))
effects.clear()

bus.writer.onNext(4)
bus.writer.onNext(5)

sub.kill() // this kills pending events even if we immediately restart

effects shouldEqual mutable.Buffer()

stream.addObserver(obs1)

bus.writer.onNext(6)
}
}.flatMap { _ =>
delay(40) { // a bit extra margin for the last check just to be sure that we caught any events
effects shouldEqual mutable.Buffer(Effect("obs1", 6))
effects.clear()
assert(true)
}
}
}

}

0 comments on commit eaca9ca

Please sign in to comment.