Skip to content

Commit

Permalink
API: New collectSome / collectOpt operators; rename contramapOpt -> c…
Browse files Browse the repository at this point in the history
…ontracollectOpt
  • Loading branch information
raquo committed Jan 4, 2023
1 parent ce32592 commit 2ba55e3
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 36 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Observers have a few convenience methods:

`def contracollect[B](pf: PartialFunction[B, A]): Observer[B]` – when you want to both `contramap` and `filter` at once.

`def contramapOpt[B](project: B => Option[A]): Observer[B]` – like `contracollect` but designed for APIs that return Options, such as `NonEmptyList.fromList`.
`def contracollectOpt[B](project: B => Option[A]): Observer[B]` – like `contracollect` but designed for APIs that return Options, such as `NonEmptyList.fromList`.

`delay(ms: Int)` – creates an observer that calls the original observer after the specified delay (for both events and errors)

Expand Down Expand Up @@ -979,7 +979,7 @@ Remember that all of this happens synchronously. There can be no async boundarie

### Operators

Airstream offers standard observables operators like `map` / `filter` / `compose` / `combineWith` etc. You will need to read the [API doc](https://javadoc.io/doc/com.raquo/airstream_sjs1_2.13/latest/com/raquo/airstream/index.html) or the actual code or use IDE autocompletion to discover those that aren't documented here or in other section of the Documentation. In the code, see `BaseObservable`, `Observable`, `EventStream`, and `Signal` traits and their companion objects.
Airstream offers standard observables operators like `map` / `filter` / `collect` / `compose` / `combineWith` etc. You will need to read the [API doc](https://javadoc.io/doc/com.raquo/airstream_sjs1_2.13/latest/com/raquo/airstream/index.html) or the actual code or use IDE autocompletion to discover those that aren't documented here or in other section of the documentation. In the code, see `BaseObservable`, `Observable`, `EventStream`, and `Signal` traits and their companion objects.

Some of the more interesting / non-standard operators are documented below:

Expand Down
23 changes: 17 additions & 6 deletions src/main/scala/com/raquo/airstream/core/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import com.raquo.airstream.debug.{DebuggableEventStream, Debugger, DebuggerEvent
import com.raquo.airstream.distinct.DistinctEventStream
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.misc.generated._
import com.raquo.airstream.misc.{FilterEventStream, FoldLeftSignal, MapEventStream}
import com.raquo.airstream.split._
import com.raquo.airstream.misc.{CollectEventStream, DropEventStream, FilterEventStream, FoldLeftSignal, MapEventStream, SignalFromEventStream}
import com.raquo.airstream.split.{SplittableEventStream, SplittableOneEventStream}
import com.raquo.airstream.timing.{FutureEventStream, _}

import scala.annotation.unused
Expand All @@ -31,10 +31,21 @@ trait EventStream[+A] extends Observable[A] with BaseObservable[EventStream, A]

def filterNot(predicate: A => Boolean): EventStream[A] = filter(!predicate(_))

/** @param pf Note: guarded against exceptions */
def collect[B](pf: PartialFunction[A, B]): EventStream[B] = {
// @TODO[Performance] Use applyOrElse
filter(pf.isDefinedAt).map(pf)
/** Apply `pf` to event and emit the resulting value, or emit nothing if `pf` is not defined for that event.
*
* @param pf Note: guarded against exceptions
*/
def collect[B](pf: PartialFunction[A, B]): EventStream[B] = collectOpt(pf.lift)

/** Emit `x` if parent stream emits `Some(x)`, nothing otherwise */
def collectSome[B](implicit ev: A <:< Option[B]): EventStream[B] = collectOpt(ev(_))

/** Apply `fn` to parent stream event, and emit resulting x if it returns Some(x)
*
* @param fn Note: guarded against exceptions
*/
def collectOpt[B](fn: A => Option[B]): EventStream[B] = {
new CollectEventStream(parent = this, fn)
}

/** @param ms milliseconds of delay */
Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/com/raquo/airstream/core/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,4 @@ object Observable {
strategy.flatten(parent)
}
}

@deprecated("0.13.0", "Use `Protected.topoRank` instead of `Observable.debugTopoRank`")
def debugTopoRank(observable: Observable[_]): Int = Protected.topoRank(observable)
}
19 changes: 17 additions & 2 deletions src/main/scala/com/raquo/airstream/core/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,18 @@ trait Observer[-A] extends Sink[A] with Named {
)
}

// #TODO[API] Does this operator even make sense?
//def contracollectSome: Observer[Option[A]] = {
// contracollectOpt[Option[A]](identity)
//}

/** Like [[contramap]], but original observer only fires if `project` returns Some(value)
*
* So, similar to [[contracollect]] but optimized for APIs like `NonEmptyList.fromList` that return an Option.
*
* @param project Note: guarded against exceptions
*/
def contramapOpt[B](project: B => Option[A]): Observer[B] = {
def contracollectOpt[B](project: B => Option[A]): Observer[B] = {
Observer.withRecover(
nextValue => project(nextValue).foreach(onNext),
{ case nextError => onError(nextError) }
Expand All @@ -80,7 +85,17 @@ trait Observer[-A] extends Sink[A] with Named {
})
}

/** Creates another Observer such that calling it calls the original observer after the specified delay. */
/** Creates another Observer such that calling it calls the original observer after the specified delay.
*
* Note: unlike Observable operators, Observer operators are not ownership-aware, so this can fire the
* observer even after the subscription that bound this observer to the observable has been killed.
* So in Laminar for example, it's possible for such a delayed observer to fire even after the element
* that owns this subscription was unmounted. Use the Observable delay operator to avoid that.
*
* Of course, whether anything happens if the observer is fired is a separate issue altogether.
* For example, if the observer is an EventBus writer, firing into it won't do anything if the EventBus
* stream is stopped.
*/
def delay(ms: Int): Observer[A] = {
Observer.fromTry { case nextValue =>
js.timers.setTimeout(ms.toDouble) {
Expand Down
31 changes: 31 additions & 0 deletions src/main/scala/com/raquo/airstream/misc/CollectEventStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.raquo.airstream.misc

import com.raquo.airstream.common.{InternalNextErrorObserver, SingleParentObservable}
import com.raquo.airstream.core.{EventStream, Protected, Transaction, WritableEventStream}

import scala.util.Try

/** This stream applies `fn` to the parent stream's events, and emits `x` from the resulting `Some(x)` value (if `None`, nothing is fired).
*
* This stream emits an error if the parent stream emits an error (Note: no filtering applied), or if `fn` throws
*
* @param fn Note: guarded against exceptions
*/
class CollectEventStream[A, B](
override protected val parent: EventStream[A],
fn: A => Option[B],
) extends WritableEventStream[B] with SingleParentObservable[A, B] with InternalNextErrorObserver[A] {

override protected val topoRank: Int = Protected.topoRank(parent) + 1

override protected def onNext(nextParentValue: A, transaction: Transaction): Unit = {
Try(fn(nextParentValue)).fold(
onError(_, transaction),
nextValue => nextValue.foreach(fireValue(_, transaction))
)
}

override protected def onError(nextError: Throwable, transaction: Transaction): Unit = {
fireError(nextError, transaction)
}
}
130 changes: 111 additions & 19 deletions src/test/scala/com/raquo/airstream/core/EventStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.raquo.airstream.UnitSpec
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import com.raquo.airstream.ownership.Owner
import org.scalactic.anyvals.NonEmptyList

import scala.collection.mutable

Expand Down Expand Up @@ -39,64 +40,155 @@ class EventStreamSpec extends UnitSpec {
effects.toList shouldBe range.filterNot(f).map(i => Effect("obs0", i))
}

// @TODO Enable this when we implement it
ignore("distinct") {
it("collect") {

implicit val owner: Owner = new TestableOwner

case class Foo(id: String, version: Int)

val bus = new EventBus[Foo]
val bus = new EventBus[Either[String, Int]]

val effects = mutable.Buffer[Effect[_]]()

bus.events/**.distinct(_.id)*/.foreach { ev =>
effects += Effect("obs", ev)
}
bus
.events
.collect { case Right(i) => i }
.foreach(v => effects += Effect("obs", v))

effects shouldBe mutable.Buffer()

// --

bus.writer.onNext(Foo("bar", 1))
bus.emit(Right(1))

effects shouldBe mutable.Buffer(
Effect("obs", 1)
)
effects.clear()

// --

bus.emit(Right(2))

effects shouldBe mutable.Buffer(Effect("obs", Foo("bar", 1)))
effects shouldBe mutable.Buffer(
Effect("obs", 2)
)
effects.clear()

// --

bus.writer.onNext(Foo("bar", 2))
bus.emit(Left("yo"))

effects shouldBe mutable.Buffer()

// --

bus.writer.onNext(Foo("bar", 3))
bus.emit(Right(3))

effects shouldBe mutable.Buffer(
Effect("obs", 3)
)
effects.clear()

}

it("collectSome") {

implicit val owner: Owner = new TestableOwner

val bus = new EventBus[Option[Int]]

val effects = mutable.Buffer[Effect[_]]()
bus
.events
.collectSome
.foreach(v => effects += Effect("obs", v))

effects shouldBe mutable.Buffer()

// --

bus.writer.onNext(Foo("baz", 1))
bus.emit(Some(1))

effects shouldBe mutable.Buffer(Effect("obs", Foo("baz", 1)))
effects shouldBe mutable.Buffer(
Effect("obs", 1)
)
effects.clear()

// --

bus.writer.onNext(Foo("baz", 2))
bus.emit(Some(2))

effects shouldBe mutable.Buffer(Effect("obs", Foo("baz", 2)))
effects shouldBe mutable.Buffer(
Effect("obs", 2)
)
effects.clear()

// --

bus.writer.onNext(Foo("bar", 4))
bus.emit(None)

effects shouldBe mutable.Buffer(Effect("obs", Foo("bar", 4)))
effects shouldBe mutable.Buffer()

// --

bus.emit(Some(3))

effects shouldBe mutable.Buffer(
Effect("obs", 3)
)
effects.clear()

}

it("collectOpt") {

//def NonEmptyList[A](list: List[A]): Option[List[A]] = {
// if (list.nonEmpty) Some(list) else None
//}

implicit val owner: Owner = new TestableOwner

val bus = new EventBus[List[Int]]

val effects = mutable.Buffer[Effect[_]]()
bus
.events
.collectOpt(NonEmptyList.from(_))
.foreach(v => effects += Effect("obs", v.head))

effects shouldBe mutable.Buffer()

// --

bus.emit(List(1))

effects shouldBe mutable.Buffer(
Effect("obs", 1)
)
effects.clear()

// --

bus.emit(List(2))

effects shouldBe mutable.Buffer(
Effect("obs", 2)
)
effects.clear()

// --

bus.emit(Nil)

effects shouldBe mutable.Buffer()

// --

bus.emit(List(3))

effects shouldBe mutable.Buffer(
Effect("obs", 3)
)
effects.clear()

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ class ObserverErrorSpec extends UnitSpec with BeforeAndAfter {
effects.clear()
}

it("contramapOpt emits value only if transformation returns a Some") {
it("contracollectOpt emits value only if transformation returns a Some") {

val effects = mutable.Buffer[Try[Int]]()

val topObs = Observer.fromTry[Int] { case t => effects += t }

val lowObs = topObs.contramapOpt[Int](v => if (v != 2) Some(v) else None)
val lowObs = topObs.contracollectOpt[Int](v => if (v != 2) Some(v) else None)

lowObs.onNext(1)
lowObs.onNext(2)
Expand All @@ -355,13 +355,13 @@ class ObserverErrorSpec extends UnitSpec with BeforeAndAfter {
assert(effects.toList == List(Success(1), Success(3)))
}

it("contramapOpt handles thrown exception from transformation ") {
it("contracollectOpt handles thrown exception from transformation ") {
val effects = mutable.Buffer[Try[Int]]()

val topObs = Observer.fromTry[Int] { case t => effects += t }

val propagatedError = ExpectedError("propagated")
val lowObs = topObs.contramapOpt[Int](v => if (v == 2) throw ExpectedError("it's 2") else Some(v))
val lowObs = topObs.contracollectOpt[Int](v => if (v == 2) throw ExpectedError("it's 2") else Some(v))

lowObs.onNext(1)
lowObs.onNext(2)
Expand Down

0 comments on commit 2ba55e3

Please sign in to comment.