From 2ba55e3b1a32ee3f605f134e283bff1c584b3f07 Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Tue, 16 Nov 2021 03:56:57 -0800 Subject: [PATCH] API: New collectSome / collectOpt operators; rename contramapOpt -> contracollectOpt --- README.md | 4 +- .../raquo/airstream/core/EventStream.scala | 23 +++- .../com/raquo/airstream/core/Observable.scala | 3 - .../com/raquo/airstream/core/Observer.scala | 19 ++- .../airstream/misc/CollectEventStream.scala | 31 +++++ .../airstream/core/EventStreamSpec.scala | 130 +++++++++++++++--- .../airstream/errors/ObserverErrorSpec.scala | 8 +- 7 files changed, 182 insertions(+), 36 deletions(-) create mode 100644 src/main/scala/com/raquo/airstream/misc/CollectEventStream.scala diff --git a/README.md b/README.md index cd87e405..e0ec5fd5 100644 --- a/README.md +++ b/README.md @@ -250,7 +250,7 @@ Observers have a few convenience methods: `def contracollect[B](pf: PartialFunction[B, A]): Observer[B]` – when you want to both `contramap` and `filter` at once. -`def contramapOpt[B](project: B => Option[A]): Observer[B]` – like `contracollect` but designed for APIs that return Options, such as `NonEmptyList.fromList`. +`def contracollectOpt[B](project: B => Option[A]): Observer[B]` – like `contracollect` but designed for APIs that return Options, such as `NonEmptyList.fromList`. `delay(ms: Int)` – creates an observer that calls the original observer after the specified delay (for both events and errors) @@ -979,7 +979,7 @@ Remember that all of this happens synchronously. There can be no async boundarie ### Operators -Airstream offers standard observables operators like `map` / `filter` / `compose` / `combineWith` etc. You will need to read the [API doc](https://javadoc.io/doc/com.raquo/airstream_sjs1_2.13/latest/com/raquo/airstream/index.html) or the actual code or use IDE autocompletion to discover those that aren't documented here or in other section of the Documentation. In the code, see `BaseObservable`, `Observable`, `EventStream`, and `Signal` traits and their companion objects. +Airstream offers standard observables operators like `map` / `filter` / `collect` / `compose` / `combineWith` etc. You will need to read the [API doc](https://javadoc.io/doc/com.raquo/airstream_sjs1_2.13/latest/com/raquo/airstream/index.html) or the actual code or use IDE autocompletion to discover those that aren't documented here or in other section of the documentation. In the code, see `BaseObservable`, `Observable`, `EventStream`, and `Signal` traits and their companion objects. Some of the more interesting / non-standard operators are documented below: diff --git a/src/main/scala/com/raquo/airstream/core/EventStream.scala b/src/main/scala/com/raquo/airstream/core/EventStream.scala index 7c468005..82a0f642 100644 --- a/src/main/scala/com/raquo/airstream/core/EventStream.scala +++ b/src/main/scala/com/raquo/airstream/core/EventStream.scala @@ -9,8 +9,8 @@ import com.raquo.airstream.debug.{DebuggableEventStream, Debugger, DebuggerEvent import com.raquo.airstream.distinct.DistinctEventStream import com.raquo.airstream.eventbus.EventBus import com.raquo.airstream.misc.generated._ -import com.raquo.airstream.misc.{FilterEventStream, FoldLeftSignal, MapEventStream} -import com.raquo.airstream.split._ +import com.raquo.airstream.misc.{CollectEventStream, DropEventStream, FilterEventStream, FoldLeftSignal, MapEventStream, SignalFromEventStream} +import com.raquo.airstream.split.{SplittableEventStream, SplittableOneEventStream} import com.raquo.airstream.timing.{FutureEventStream, _} import scala.annotation.unused @@ -31,10 +31,21 @@ trait EventStream[+A] extends Observable[A] with BaseObservable[EventStream, A] def filterNot(predicate: A => Boolean): EventStream[A] = filter(!predicate(_)) - /** @param pf Note: guarded against exceptions */ - def collect[B](pf: PartialFunction[A, B]): EventStream[B] = { - // @TODO[Performance] Use applyOrElse - filter(pf.isDefinedAt).map(pf) + /** Apply `pf` to event and emit the resulting value, or emit nothing if `pf` is not defined for that event. + * + * @param pf Note: guarded against exceptions + */ + def collect[B](pf: PartialFunction[A, B]): EventStream[B] = collectOpt(pf.lift) + + /** Emit `x` if parent stream emits `Some(x)`, nothing otherwise */ + def collectSome[B](implicit ev: A <:< Option[B]): EventStream[B] = collectOpt(ev(_)) + + /** Apply `fn` to parent stream event, and emit resulting x if it returns Some(x) + * + * @param fn Note: guarded against exceptions + */ + def collectOpt[B](fn: A => Option[B]): EventStream[B] = { + new CollectEventStream(parent = this, fn) } /** @param ms milliseconds of delay */ diff --git a/src/main/scala/com/raquo/airstream/core/Observable.scala b/src/main/scala/com/raquo/airstream/core/Observable.scala index 766aa04f..2a55702d 100644 --- a/src/main/scala/com/raquo/airstream/core/Observable.scala +++ b/src/main/scala/com/raquo/airstream/core/Observable.scala @@ -36,7 +36,4 @@ object Observable { strategy.flatten(parent) } } - - @deprecated("0.13.0", "Use `Protected.topoRank` instead of `Observable.debugTopoRank`") - def debugTopoRank(observable: Observable[_]): Int = Protected.topoRank(observable) } diff --git a/src/main/scala/com/raquo/airstream/core/Observer.scala b/src/main/scala/com/raquo/airstream/core/Observer.scala index 039d0a2e..181f9757 100644 --- a/src/main/scala/com/raquo/airstream/core/Observer.scala +++ b/src/main/scala/com/raquo/airstream/core/Observer.scala @@ -56,13 +56,18 @@ trait Observer[-A] extends Sink[A] with Named { ) } + // #TODO[API] Does this operator even make sense? + //def contracollectSome: Observer[Option[A]] = { + // contracollectOpt[Option[A]](identity) + //} + /** Like [[contramap]], but original observer only fires if `project` returns Some(value) * * So, similar to [[contracollect]] but optimized for APIs like `NonEmptyList.fromList` that return an Option. * * @param project Note: guarded against exceptions */ - def contramapOpt[B](project: B => Option[A]): Observer[B] = { + def contracollectOpt[B](project: B => Option[A]): Observer[B] = { Observer.withRecover( nextValue => project(nextValue).foreach(onNext), { case nextError => onError(nextError) } @@ -80,7 +85,17 @@ trait Observer[-A] extends Sink[A] with Named { }) } - /** Creates another Observer such that calling it calls the original observer after the specified delay. */ + /** Creates another Observer such that calling it calls the original observer after the specified delay. + * + * Note: unlike Observable operators, Observer operators are not ownership-aware, so this can fire the + * observer even after the subscription that bound this observer to the observable has been killed. + * So in Laminar for example, it's possible for such a delayed observer to fire even after the element + * that owns this subscription was unmounted. Use the Observable delay operator to avoid that. + * + * Of course, whether anything happens if the observer is fired is a separate issue altogether. + * For example, if the observer is an EventBus writer, firing into it won't do anything if the EventBus + * stream is stopped. + */ def delay(ms: Int): Observer[A] = { Observer.fromTry { case nextValue => js.timers.setTimeout(ms.toDouble) { diff --git a/src/main/scala/com/raquo/airstream/misc/CollectEventStream.scala b/src/main/scala/com/raquo/airstream/misc/CollectEventStream.scala new file mode 100644 index 00000000..84233989 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/misc/CollectEventStream.scala @@ -0,0 +1,31 @@ +package com.raquo.airstream.misc + +import com.raquo.airstream.common.{InternalNextErrorObserver, SingleParentObservable} +import com.raquo.airstream.core.{EventStream, Protected, Transaction, WritableEventStream} + +import scala.util.Try + +/** This stream applies `fn` to the parent stream's events, and emits `x` from the resulting `Some(x)` value (if `None`, nothing is fired). + * + * This stream emits an error if the parent stream emits an error (Note: no filtering applied), or if `fn` throws + * + * @param fn Note: guarded against exceptions + */ +class CollectEventStream[A, B]( + override protected val parent: EventStream[A], + fn: A => Option[B], +) extends WritableEventStream[B] with SingleParentObservable[A, B] with InternalNextErrorObserver[A] { + + override protected val topoRank: Int = Protected.topoRank(parent) + 1 + + override protected def onNext(nextParentValue: A, transaction: Transaction): Unit = { + Try(fn(nextParentValue)).fold( + onError(_, transaction), + nextValue => nextValue.foreach(fireValue(_, transaction)) + ) + } + + override protected def onError(nextError: Throwable, transaction: Transaction): Unit = { + fireError(nextError, transaction) + } +} diff --git a/src/test/scala/com/raquo/airstream/core/EventStreamSpec.scala b/src/test/scala/com/raquo/airstream/core/EventStreamSpec.scala index bfdaa1f2..9695e9ff 100644 --- a/src/test/scala/com/raquo/airstream/core/EventStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/EventStreamSpec.scala @@ -4,6 +4,7 @@ import com.raquo.airstream.UnitSpec import com.raquo.airstream.eventbus.EventBus import com.raquo.airstream.fixtures.{Effect, TestableOwner} import com.raquo.airstream.ownership.Owner +import org.scalactic.anyvals.NonEmptyList import scala.collection.mutable @@ -39,64 +40,155 @@ class EventStreamSpec extends UnitSpec { effects.toList shouldBe range.filterNot(f).map(i => Effect("obs0", i)) } - // @TODO Enable this when we implement it - ignore("distinct") { + it("collect") { implicit val owner: Owner = new TestableOwner - case class Foo(id: String, version: Int) - - val bus = new EventBus[Foo] + val bus = new EventBus[Either[String, Int]] val effects = mutable.Buffer[Effect[_]]() - - bus.events/**.distinct(_.id)*/.foreach { ev => - effects += Effect("obs", ev) - } + bus + .events + .collect { case Right(i) => i } + .foreach(v => effects += Effect("obs", v)) effects shouldBe mutable.Buffer() // -- - bus.writer.onNext(Foo("bar", 1)) + bus.emit(Right(1)) + + effects shouldBe mutable.Buffer( + Effect("obs", 1) + ) + effects.clear() + + // -- + + bus.emit(Right(2)) - effects shouldBe mutable.Buffer(Effect("obs", Foo("bar", 1))) + effects shouldBe mutable.Buffer( + Effect("obs", 2) + ) effects.clear() // -- - bus.writer.onNext(Foo("bar", 2)) + bus.emit(Left("yo")) effects shouldBe mutable.Buffer() // -- - bus.writer.onNext(Foo("bar", 3)) + bus.emit(Right(3)) + + effects shouldBe mutable.Buffer( + Effect("obs", 3) + ) + effects.clear() + + } + + it("collectSome") { + + implicit val owner: Owner = new TestableOwner + + val bus = new EventBus[Option[Int]] + + val effects = mutable.Buffer[Effect[_]]() + bus + .events + .collectSome + .foreach(v => effects += Effect("obs", v)) effects shouldBe mutable.Buffer() // -- - bus.writer.onNext(Foo("baz", 1)) + bus.emit(Some(1)) - effects shouldBe mutable.Buffer(Effect("obs", Foo("baz", 1))) + effects shouldBe mutable.Buffer( + Effect("obs", 1) + ) effects.clear() // -- - bus.writer.onNext(Foo("baz", 2)) + bus.emit(Some(2)) - effects shouldBe mutable.Buffer(Effect("obs", Foo("baz", 2))) + effects shouldBe mutable.Buffer( + Effect("obs", 2) + ) effects.clear() // -- - bus.writer.onNext(Foo("bar", 4)) + bus.emit(None) - effects shouldBe mutable.Buffer(Effect("obs", Foo("bar", 4))) + effects shouldBe mutable.Buffer() + + // -- + + bus.emit(Some(3)) + + effects shouldBe mutable.Buffer( + Effect("obs", 3) + ) effects.clear() } + it("collectOpt") { + + //def NonEmptyList[A](list: List[A]): Option[List[A]] = { + // if (list.nonEmpty) Some(list) else None + //} + + implicit val owner: Owner = new TestableOwner + + val bus = new EventBus[List[Int]] + + val effects = mutable.Buffer[Effect[_]]() + bus + .events + .collectOpt(NonEmptyList.from(_)) + .foreach(v => effects += Effect("obs", v.head)) + + effects shouldBe mutable.Buffer() + + // -- + + bus.emit(List(1)) + + effects shouldBe mutable.Buffer( + Effect("obs", 1) + ) + effects.clear() + + // -- + + bus.emit(List(2)) + + effects shouldBe mutable.Buffer( + Effect("obs", 2) + ) + effects.clear() + + // -- + + bus.emit(Nil) + + effects shouldBe mutable.Buffer() + + // -- + + bus.emit(List(3)) + + effects shouldBe mutable.Buffer( + Effect("obs", 3) + ) + effects.clear() + + } } diff --git a/src/test/scala/com/raquo/airstream/errors/ObserverErrorSpec.scala b/src/test/scala/com/raquo/airstream/errors/ObserverErrorSpec.scala index 060554f3..885d8122 100644 --- a/src/test/scala/com/raquo/airstream/errors/ObserverErrorSpec.scala +++ b/src/test/scala/com/raquo/airstream/errors/ObserverErrorSpec.scala @@ -340,13 +340,13 @@ class ObserverErrorSpec extends UnitSpec with BeforeAndAfter { effects.clear() } - it("contramapOpt emits value only if transformation returns a Some") { + it("contracollectOpt emits value only if transformation returns a Some") { val effects = mutable.Buffer[Try[Int]]() val topObs = Observer.fromTry[Int] { case t => effects += t } - val lowObs = topObs.contramapOpt[Int](v => if (v != 2) Some(v) else None) + val lowObs = topObs.contracollectOpt[Int](v => if (v != 2) Some(v) else None) lowObs.onNext(1) lowObs.onNext(2) @@ -355,13 +355,13 @@ class ObserverErrorSpec extends UnitSpec with BeforeAndAfter { assert(effects.toList == List(Success(1), Success(3))) } - it("contramapOpt handles thrown exception from transformation ") { + it("contracollectOpt handles thrown exception from transformation ") { val effects = mutable.Buffer[Try[Int]]() val topObs = Observer.fromTry[Int] { case t => effects += t } val propagatedError = ExpectedError("propagated") - val lowObs = topObs.contramapOpt[Int](v => if (v == 2) throw ExpectedError("it's 2") else Some(v)) + val lowObs = topObs.contracollectOpt[Int](v => if (v == 2) throw ExpectedError("it's 2") else Some(v)) lowObs.onNext(1) lowObs.onNext(2)