Skip to content

Commit

Permalink
API: Re-evaluate signal's current value when restarting signal
Browse files Browse the repository at this point in the history
 - This is a big change to fix #43
 - Not quite complete yet, still need to work on various timing streams
 - Also removed some Future-related stuff that I don't want to support anymore
  • Loading branch information
raquo committed Jan 4, 2023
1 parent 1de61b2 commit 5d4b629
Show file tree
Hide file tree
Showing 73 changed files with 2,060 additions and 771 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.raquo.airstream.combine

import com.raquo.airstream.common.InternalParentObserver
import com.raquo.airstream.core.{EventStream, Protected, WritableEventStream}
import com.raquo.airstream.common.{InternalParentObserver, MultiParentEventStream}
import com.raquo.airstream.core.{EventStream, Protected}

import scala.util.Try

/** @param combinator Must not throw! */
class CombineEventStreamN[A, Out](
parents: Seq[EventStream[A]],
override protected[this] val parents: Seq[EventStream[A]],
combinator: Seq[A] => Out
) extends WritableEventStream[Out] with CombineObservable[Out] {
) extends MultiParentEventStream[A, Out] with CombineObservable[Out] {

// @TODO[API] Maybe this should throw if parents.isEmpty

Expand Down Expand Up @@ -40,9 +40,4 @@ class CombineEventStreamN[A, Out](
}: _*
)

override protected[this] def onStop(): Unit = {
maybeLastParentValues.indices.foreach(maybeLastParentValues.update(_, None))
super.onStop()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package com.raquo.airstream.combine

import com.raquo.airstream.common.InternalParentObserver
import com.raquo.airstream.core.AirstreamError.CombinedError
import com.raquo.airstream.core.{ SyncObservable, Transaction, WritableObservable }
import com.raquo.airstream.core.{SyncObservable, Transaction, WritableObservable}
import org.scalajs.dom

import scala.scalajs.js
import scala.util.{ Failure, Success, Try }
import scala.util.{Failure, Success, Try}

trait CombineObservable[A] extends SyncObservable[A] { this: WritableObservable[A] =>

Expand Down Expand Up @@ -45,7 +45,7 @@ trait CombineObservable[A] extends SyncObservable[A] { this: WritableObservable[
}

override protected[this] def onStart(): Unit = {
parentObservers.foreach(_.addToParent())
parentObservers.foreach(_.addToParent(shouldCallMaybeWillStart = false))
super.onStart()
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/raquo/airstream/combine/CombineSignalN.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
package com.raquo.airstream.combine

import com.raquo.airstream.common.InternalParentObserver
import com.raquo.airstream.core.{Protected, Signal, WritableSignal}
import com.raquo.airstream.common.{InternalParentObserver, MultiParentSignal}
import com.raquo.airstream.core.{Protected, Signal}

import scala.util.Try

/** @param combinator Must not throw! */
class CombineSignalN[A, Out](
protected[this] val parents: Seq[Signal[A]],
override protected[this] val parents: Seq[Signal[A]],
protected[this] val combinator: Seq[A] => Out
) extends WritableSignal[Out] with CombineObservable[Out] {
) extends MultiParentSignal[A, Out] with CombineObservable[Out] {

// @TODO[API] Maybe this should throw if parents.isEmpty

override protected val topoRank: Int = Protected.maxParentTopoRank(parents) + 1

override protected[this] def initialValue: Try[Out] = combinedValue

override protected[this] def inputsReady: Boolean = true

override protected[this] def combinedValue: Try[Out] = {
CombineObservable.seqCombinator(parents.map(_.tryNow()), combinator)
}

override protected def currentValueFromParent(): Try[Out] = combinedValue

parentObservers.push(
parents.map { parent =>
InternalParentObserver.fromTry[A](parent, (_, transaction) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.raquo.airstream.combine

import com.raquo.airstream.common.{InternalParentObserver, Observation}
import com.raquo.airstream.core.{Observable, Protected, SyncObservable, Transaction, WritableEventStream}
import com.raquo.airstream.common.{InternalParentObserver, MultiParentEventStream, Observation}
import com.raquo.airstream.core.{EventStream, Observable, Protected, SyncObservable, Transaction, WritableEventStream}
import com.raquo.airstream.util.JsPriorityQueue

import scala.scalajs.js
Expand All @@ -14,8 +14,8 @@ import scala.scalajs.js
* does not make sense, conceptually (what do you even do with their current values?).
*/
class MergeEventStream[A](
parents: Iterable[Observable[A]]
) extends WritableEventStream[A] with SyncObservable[A] {
override protected[this] val parents: Seq[EventStream[A]],
) extends WritableEventStream[A] with SyncObservable[A] with MultiParentEventStream[A, A] {

override protected val topoRank: Int = Protected.maxParentTopoRank(parents) + 1

Expand Down Expand Up @@ -50,7 +50,7 @@ class MergeEventStream[A](
}

override protected[this] def onStart(): Unit = {
parentObservers.foreach(_.addToParent())
parentObservers.foreach(_.addToParent(shouldCallMaybeWillStart = false))
super.onStart()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.raquo.airstream.combine

import com.raquo.airstream.common.InternalParentObserver
import com.raquo.airstream.core.{EventStream, Protected, Signal, Transaction, WritableEventStream}
import com.raquo.airstream.common.{InternalParentObserver, MultiParentEventStream}
import com.raquo.airstream.core.{EventStream, Observable, Protected, Signal, Transaction}

import scala.util.Try

Expand All @@ -18,14 +18,16 @@ class SampleCombineEventStreamN[A, Out](
samplingStream: EventStream[A],
sampledSignals: Seq[Signal[A]],
combinator: Seq[A] => Out
) extends WritableEventStream[Out] with CombineObservable[Out] {
) extends MultiParentEventStream[A, Out] with CombineObservable[Out] {

override protected val topoRank: Int = Protected.maxParentTopoRank(samplingStream +: sampledSignals) + 1

private[this] var maybeLastSamplingValue: Option[Try[A]] = None

override protected[this] def inputsReady: Boolean = maybeLastSamplingValue.nonEmpty

override protected[this] val parents: Seq[Observable[A]] = samplingStream +: sampledSignals

override protected[this] def combinedValue: Try[Out] = {
val parentValues = maybeLastSamplingValue.get +: sampledSignals.map(_.tryNow())
CombineObservable.seqCombinator(parentValues, combinator)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.raquo.airstream.combine

import com.raquo.airstream.common.InternalParentObserver
import com.raquo.airstream.core.{Protected, Signal, WritableSignal}
import com.raquo.airstream.common.{InternalParentObserver, MultiParentSignal}
import com.raquo.airstream.core.{Observable, Protected, Signal}

import scala.util.Try

Expand All @@ -18,19 +18,21 @@ class SampleCombineSignalN[A, Out](
samplingSignal: Signal[A],
sampledSignals: Seq[Signal[A]],
combinator: Seq[A] => Out
) extends WritableSignal[Out] with CombineObservable[Out] {
) extends MultiParentSignal[A, Out] with CombineObservable[Out] {

override protected val topoRank: Int = Protected.maxParentTopoRank(samplingSignal +: sampledSignals) + 1

override protected[this] def initialValue: Try[Out] = combinedValue

override protected[this] def inputsReady: Boolean = true

override protected[this] val parents: Seq[Observable[A]] = samplingSignal +: sampledSignals

override protected[this] def combinedValue: Try[Out] = {
val values = (samplingSignal +: sampledSignals).map(_.tryNow())
CombineObservable.seqCombinator(values, combinator)
}

override protected def currentValueFromParent(): Try[Out] = combinedValue

parentObservers.push(
InternalParentObserver.fromTry[A](samplingSignal, (_, transaction) => {
onInputsReady(transaction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ trait InternalParentObserver[A] extends InternalObserver[A] {

protected[this] val parent: Observable[A]

def addToParent(): Unit = {
parent.addInternalObserver(this)
def addToParent(shouldCallMaybeWillStart: Boolean): Unit = {
parent.addInternalObserver(this, shouldCallMaybeWillStart)
}

def removeFromParent(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.raquo.airstream.common

import com.raquo.airstream.core.{Observable, Protected, WritableEventStream}

/** A simple stream that has multiple parents. */
trait MultiParentEventStream[+I, O] extends WritableEventStream[O] {

protected[this] val parents: Seq[Observable[I]]

override protected def onWillStart(): Unit = {
parents.foreach(Protected.maybeWillStart)
}

}
23 changes: 23 additions & 0 deletions src/main/scala/com/raquo/airstream/common/MultiParentSignal.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.raquo.airstream.common

import com.raquo.airstream.core.{Observable, Protected, WritableSignal}

import scala.util.Try

/** A simple signal that has multiple parents. */
trait MultiParentSignal[+I, O] extends WritableSignal[O] {

protected[this] val parents: Seq[Observable[I]]

override protected def onWillStart(): Unit = {
parents.foreach(Protected.maybeWillStart)
updateCurrentValueFromParent()
}

protected def updateCurrentValueFromParent(): Try[O] = {
val nextValue = currentValueFromParent()
setCurrentValue(nextValue)
nextValue
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.raquo.airstream.common

import com.raquo.airstream.core.{InternalObserver, Observable, Protected, Transaction, WritableEventStream}

/** A simple stream that only has one parent. */
trait SingleParentEventStream[I, O] extends WritableEventStream[O] with InternalObserver[I] {

protected[this] val parent: Observable[I]

override protected def onWillStart(): Unit = {
//println(s"${this} >>>> onWillStart")
Protected.maybeWillStart(parent)
}

override protected[this] def onStart(): Unit = {
//println(s"${this} >>>> onStart")
parent.addInternalObserver(this, shouldCallMaybeWillStart = false)
super.onStart()
}

override protected[this] def onStop(): Unit = {
Transaction.removeInternalObserver(parent, observer = this)
super.onStop()
}
}

This file was deleted.

39 changes: 39 additions & 0 deletions src/main/scala/com/raquo/airstream/common/SingleParentSignal.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.raquo.airstream.common

import com.raquo.airstream.core.{InternalObserver, Observable, Protected, Transaction, WritableSignal}

import scala.util.Try

/** A simple stream that only has one parent. */
trait SingleParentSignal[I, O] extends WritableSignal[O] with InternalObserver[I] {

protected[this] val parent: Observable[I]

override protected def onWillStart(): Unit = {
//println(s"${this} >>>> onWillStart")
Protected.maybeWillStart(parent)
updateCurrentValueFromParent()
}

/** Note: this is overridden in:
* - [[com.raquo.airstream.distinct.DistinctSignal]]
* - [[com.raquo.airstream.flatten.SwitchSignal]]
*/
protected def updateCurrentValueFromParent(): Try[O] = {
//println(s"${this} >> updateCurrentValueFromParent")
val nextValue = currentValueFromParent()
setCurrentValue(nextValue)
//println(s"${this} << updateCurrentValueFromParent")
nextValue
}

override protected[this] def onStart(): Unit = {
parent.addInternalObserver(this, shouldCallMaybeWillStart = false)
super.onStart()
}

override protected[this] def onStop(): Unit = {
Transaction.removeInternalObserver(parent, observer = this)
super.onStop()
}
}
42 changes: 39 additions & 3 deletions src/main/scala/com/raquo/airstream/core/BaseObservable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.raquo.airstream.debug.Debugger
import com.raquo.airstream.flatten.FlattenStrategy
import com.raquo.airstream.ownership.{Owner, Subscription}

import scala.annotation.unused
import scala.util.{Failure, Success, Try}

/** This trait represents a reactive value that can be subscribed to.
Expand Down Expand Up @@ -151,12 +150,12 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name

protected[this] def addExternalObserver(observer: Observer[A], owner: Owner): Subscription

protected[this] def onAddedExternalObserver(@unused observer: Observer[A]): Unit
protected[this] def onAddedExternalObserver(observer: Observer[A]): Unit

/** Child observable should call this method on its parents when it is started.
* This observable calls [[onStart]] if this action has given it its first observer (internal or external).
*/
protected[airstream] def addInternalObserver(observer: InternalObserver[A]): Unit
protected[airstream] def addInternalObserver(observer: InternalObserver[A], shouldCallMaybeWillStart: Boolean): Unit

/** Child observable should call Transaction.removeInternalObserver(parent, childInternalObserver) when it is stopped.
* This observable calls [[onStop]] if this action has removed its last observer (internal or external).
Expand All @@ -170,6 +169,28 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name

protected def isStarted: Boolean = numAllObservers > 0

/** When starting an observable, this is called recursively on every one of its parents that are not started.
* This whole chain happens before onStart callback is called. This chain serves to prepare the internal states
* of observables that are about to start, e.g. you should update the signal's value to match its parent signal's
* value in this callback, if applicable.
*
* Default implementation, for observables that don't need anything,
* should be to call `parent.maybeWillStart()` for every parent observable.
*
* If custom behaviour is required, you should generally call `parent.maybeWillStart()`
* BEFORE your custom logic. Then your logic will be able to make use of parent's
* updated value.
*
* Note: THIS METHOD MUST NOT CREATE TRANSACTIONS OR FIRE ANY EVENTS! DO IT IN ONSTART IF NEEDED.
*/
protected def onWillStart(): Unit

protected def maybeWillStart(): Unit = {
if (!isStarted) {
onWillStart()
}
}

/** This method is fired when this observable starts working (listening for parent events and/or firing its own events),
* that is, when it gets its first Observer (internal or external).
*
Expand All @@ -184,11 +205,26 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name
*/
protected def onStop(): Unit = ()

/** Airstream may internally use Scala library functions which use `==` or `hashCode` for equality, for example List.contains.
* Comparing observables by structural equality pretty much never makes sense, yet it's not that hard to run into that, all
* you need is to create a `case class` subclass, and the Scala compiler will generate a structural-equality `equals` and
* `hashCode` methods for you behind the scenes.
*
* To prevent that, we make equals and hashCode methods final, using the default implementation (which is reference equality).
*/
final override def equals(obj: Any): Boolean = super.equals(obj)

/** Force reference equality checks. See comment for `equals`. */
final override def hashCode(): Int = super.hashCode()
}

object BaseObservable {

@inline private[airstream] def topoRank[O[+_] <: Observable[_]](observable: BaseObservable[O, _]): Int = {
observable.topoRank
}

@inline private[airstream] def maybeWillStart[O[+_] <: Observable[_]](observable: BaseObservable[O, _]): Unit = {
observable.maybeWillStart()
}
}
Loading

0 comments on commit 5d4b629

Please sign in to comment.