diff --git a/language-adaptors/rxjava-clojure/README.md b/language-adaptors/rxjava-clojure/README.md index b2f69b3853..4c5bdafe46 100644 --- a/language-adaptors/rxjava-clojure/README.md +++ b/language-adaptors/rxjava-clojure/README.md @@ -1,10 +1,96 @@ -# Clojure Adaptor for RxJava +Clojure bindings for RxJava. +# Binaries + +Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22). + +Example for Leiningen: + +```clojure +[com.netflix.rxjava/rxjava-clojure "x.y.z"] +``` + +and for Gradle: + +```groovy +compile 'com.netflix.rxjava:rxjava-clojure:x.y.z' +``` + +and for Maven: + +```xml + + com.netflix.rxjava + rxjava-clojure + x.y.z + +``` + +and for Ivy: + +```xml + +``` + +# Clojure Bindings +This library provides convenient, idiomatic Clojure bindings for RxJava. + +The bindings try to present an API that will be comfortable and familiar to a Clojure programmer that's familiar with the sequence operations in `clojure.core`. It "fixes" several issues with using RxJava with raw Java interop, for example: + +* Argument lists are in the "right" order. So in RxJava, the function applied in `Observable.map` is the second argument, while here it's the first argument with one or more Observables as trailing arguments +* Operators take normal Clojure functions as arguments, bypassing need for the interop described below +* Predicates accomodate Clojure's notion of truth +* Operators are generally names as they would be in `clojure.core` rather than the Rx names + +There is no object wrapping going on. That is, all functions return normal `rx.Observable` objects, so you can always drop back to Java interop for anything that's missing in this wrapper. + +## Basic Usage +Most functionality resides in the `rx.lang.clojure.core` namespace and for the most part looks like normal Clojure sequence manipulation: + +```clojure +(require '[rx.lang.clojure.core :as rx]) + +(->> my-observable + (rx/map (comp clojure.string/lower-case :first-name)) + (rx/map clojure.string/lower-case) + (rx/filter #{"bob"}) + (rx/distinct) + (rx/into [])) +;=> An Observable that emits a single vector of names +``` + +Blocking operators, which are useful for testing, but should otherwise be avoided, reside in `rx.lang.clojure.blocking`. For example: + +```clojure +(require '[rx.lang.clojure.blocking :as rxb]) + +(rxb/doseq [{:keys [first-name]} users-observable] + (println "Hey," first-name)) +;=> nil +``` + +## Open Issues + +* The missing stuff mentioned below +* `group-by` val-fn variant isn't implemented in RxJava +* There are some functions for defining customer Observables and Operators (`subscriber`, `operator*`, `observable*`). I don't think these are really enough for serious operator implementation, but I'm hesitant to guess at an abstraction at this point. These will probably change dramatically. + +## What's Missing +This library is an ongoing work in progress driven primarily by the needs of one team at Netflix. As such some things are currently missing: + +* Highly-specific operators that we felt cluttered the API and were easily composed from existing operators, especially since we're in not-Java land. For example, `Observable.sumLong()`. +* Most everything involving schedulers +* Most everything involving time +* `Observable.window` and `Observable.buffer`. Who knows which parts of these beasts to wrap? + +Of course, contributions that cover these cases are welcome. + +# Low-level Interop This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces. -# Basic Usage +## Basic Usage -## Requiring the interop namespace +### Requiring the interop namespace The first thing to do is to require the namespace: ```clojure @@ -19,7 +105,7 @@ or, at the REPL: (require '[rx.lang.clojure.interop :as rx]) ``` -## Using rx/fn +### Using rx/fn Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`: ```clojure @@ -34,7 +120,7 @@ If you already have a plain old Clojure function you'd like to use, you can pass (.reduce (rx/fn* +))) ``` -## Using rx/action +### Using rx/action The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts: ```clojure @@ -46,7 +132,7 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im (rx/action [] (println "Sequence complete")))) ``` -## Using Observable/create +### Using Observable/create As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables: ```clojure @@ -59,35 +145,10 @@ As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnS (.onCompleted s))) ``` -# Gotchas +## Gotchas Here are a few things to keep in mind when using this interop: * Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts * If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose. * Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`. -# Binaries - -Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22). - -Example for Maven: - -```xml - - com.netflix.rxjava - rxjava-clojure - x.y.z - -``` - -and for Ivy: - -```xml - -``` - -and for Leiningen: - -```clojure -[com.netflix.rxjava/rxjava-clojure "x.y.z"] -``` diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/blocking.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/blocking.clj new file mode 100644 index 0000000000..feee933225 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/blocking.clj @@ -0,0 +1,140 @@ +(ns rx.lang.clojure.blocking + "Blocking operators and functions. These should never be used in + production code except at the end of an async chain to convert from + rx land back to sync land. For example, to produce a servlet response. + + If you use these, you're a bad person. + " + (:refer-clojure :exclude [first into doseq last]) + (:require [rx.lang.clojure.interop :as iop] [rx.lang.clojure.core :as rx]) + (:import [rx Observable] + [rx.observables BlockingObservable])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defmacro ^:private with-ex-unwrap + "The blocking ops wrap errors stuff in RuntimeException because of stupid Java. + This tries to unwrap them so callers get the exceptions they expect." + [& body] + `(try + ~@body + (catch RuntimeException e# + (throw (or + (and (identical? RuntimeException (class e#)) + (.getCause e#)) + e#))))) + +(defn ^BlockingObservable ->blocking + "Convert an Observable to a BlockingObservable. + + If o is already a BlockingObservable it's returned unchanged. + " + [o] + (if (instance? BlockingObservable o) + o + (.toBlockingObservable ^Observable o))) + +(defn o->seq + "Returns a lazy sequence of the items emitted by o + + See: + rx.observables.BlockingObservable/getIterator + rx.lang.clojure.core/seq->o + " + [o] + (-> (->blocking o) + (.getIterator) + (iterator-seq))) + +(defn first + "*Blocks* and waits for the first value emitted by the given observable. + + If the Observable is empty, returns nil + + If an error is produced it is thrown. + + See: + clojure.core/first + rx/first + rx.observables.BlockingObservable/first + " + [observable] + (with-ex-unwrap + (.firstOrDefault (->blocking observable) nil))) + +(defn last + "*Blocks* and waits for the last value emitted by the given observable. + + If the Observable is empty, returns nil + + If an error is produced it is thrown. + + See: + clojure.core/last + rx/last + rx.observable.BlockingObservable/last + " + [observable] + (with-ex-unwrap + (.lastOrDefault (->blocking observable) nil))) + +(defn single + "*Blocks* and waits for the first value emitted by the given observable. + + An error is thrown if zero or more then one value is produced. + " + [observable] + (with-ex-unwrap + (.single (->blocking observable)))) + +(defn into + "*Blocks* and pours the elements emitted by the given observables into + to. + + If an error is produced it is thrown. + + See: + clojure.core/into + rx/into + " + [to from-observable] + (with-ex-unwrap + (clojure.core/into to (o->seq from-observable)))) + +(defn doseq* + "*Blocks* and executes (f x) for each x emitted by xs + + Returns nil. + + See: + doseq + clojure.core/doseq + " + [xs f] + (with-ex-unwrap + (-> (->blocking xs) + (.forEach (rx.lang.clojure.interop/action* f))))) + +(defmacro doseq + "Like clojure.core/doseq except iterates over an observable in a blocking manner. + + Unlike clojure.core/doseq, only supports a single binding + + Returns nil. + + Example: + + (rx-blocking/doseq [{:keys [name]} users-observable] + (println \"User:\" name)) + + See: + doseq* + clojure.core/doseq + " + [bindings & body] + (when (not= (count bindings) 2) + (throw (IllegalArgumentException. (str "sorry, rx/doseq only supports one binding")))) + (let [[k v] bindings] + `(doseq* ~v (fn [~k] ~@body)))) + diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/chunk.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/chunk.clj new file mode 100644 index 0000000000..d53c8ce322 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/chunk.clj @@ -0,0 +1,100 @@ +(ns rx.lang.clojure.chunk + (:refer-clojure :exclude [chunk]) + (:require [rx.lang.clojure.core :as rx])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defn chunk + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + TODO RxJava's much bigger since this was written. Is there something built in? + + Same as rx.Observable.merge(Observable>) but the input Observables + are \"chunked\" so that at most chunk-size of them are \"in flight\" at any given + time. + + The order of the input Observables is not preserved. + + The main purpose here is to allow a large number of Hystrix observables to + be processed in a controlled way so that the Hystrix execution queues aren't + overwhelmed. + + Example: + + (->> users + (rx/map #(-> (GetUserCommand. %) .toObservable)) + (chunk 10)) + + See: + http://netflix.github.io/RxJava/javadoc/rx/Observable.html#merge(rx.Observable) + http://netflix.github.io/RxJava/javadoc/rx/Observable.html#mergeDelayError(rx.Observable) + " + ([chunk-size observable-source] (chunk chunk-size {} observable-source)) + ([chunk-size options observable-source] + (let [new-state-atom #(atom {:in-flight #{} ; observables currently in-flight + :buffered [] ; observables waiting to be emitted + :complete false ; true if observable-source is complete + :observer % }) ; the observer + ps #(do (printf "%s/%d/%d%n" + (:complete %) + (-> % :buffered count) + (-> % :in-flight count)) + (flush)) + + ; Given the current state, returns [action new-state]. action is the + ; next Observable or Throwable to emit, or :complete if we're done. + next-state (fn [{:keys [complete buffered in-flight] :as old}] + (cond + (empty? buffered) [complete old] + + (< (count in-flight) chunk-size) (let [next-o (first buffered)] + [next-o + (-> old + (update-in [:buffered] next) + (update-in [:in-flight] conj next-o))]) + + :else [nil old])) + + ; Advance the state, performing side-effects as necessary + advance! (fn advance! [state-atom] + (let [old-state @state-atom + [action new-state] (next-state old-state)] + (if (compare-and-set! state-atom old-state new-state) + (let [observer (:observer new-state)] + (if (:debug options) (ps new-state)) + (cond + (= :complete action) + (rx/on-completed observer) + + (instance? Throwable action) + (rx/on-error observer action) + + (instance? rx.Observable action) + (rx/on-next observer + (.finallyDo ^rx.Observable action + (reify rx.util.functions.Action0 + (call [this] + (swap! state-atom update-in [:in-flight] disj action) + (advance! state-atom))))))) + (recur state-atom)))) + + subscribe (fn [state-atom] + (rx/subscribe observable-source + (fn [o] + (swap! state-atom update-in [:buffered] conj o) + (advance! state-atom)) + + (fn [e] + (swap! state-atom assoc :complete e) + (advance! state-atom)) + + (fn [] + (swap! state-atom assoc :complete :complete) + (advance! state-atom)))) + observable (rx/observable* (fn [observer] + (subscribe (new-state-atom observer)))) ] + (if (:delay-error? options) + (rx.Observable/mergeDelayError observable) + (rx.Observable/merge observable))))) + diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj new file mode 100644 index 0000000000..d2768d1b37 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj @@ -0,0 +1,964 @@ +(ns rx.lang.clojure.core + (:refer-clojure :exclude [concat cons count cycle + distinct do drop drop-while + empty every? + filter first future + group-by + interleave interpose into iterate + keep keep-indexed + map mapcat map-indexed + merge next nth partition-all + range reduce reductions + rest seq some sort sort-by split-with + take take-while throw]) + (:require [rx.lang.clojure.interop :as iop] + [rx.lang.clojure.graph :as graph] + [rx.lang.clojure.realized :as realized]) + (:import [rx + Observable + Observer Observable$Operator Observable$OnSubscribe + Subscriber Subscription] + [rx.observables + BlockingObservable + GroupedObservable] + [rx.subscriptions Subscriptions] + [rx.util.functions Action0 Action1 Func0 Func1 Func2])) + +(set! *warn-on-reflection* true) + +(declare concat* concat map* map map-indexed reduce take take-while) + +(defn ^Func1 fn->predicate + "Turn f into a predicate that returns true/false like Rx predicates should" + [f] + (iop/fn* (comp boolean f))) + +;################################################################################ + +(defn observable? + "Returns true if o is an rx.Observable" + [o] + (instance? Observable o)) + +;################################################################################ + +(defn on-next + "Call onNext on the given observer and return o." + [^Observer o value] + (.onNext o value) + o) + +(defn on-completed + "Call onCompleted on the given observer and return o." + [^Observer o] + (.onCompleted o) + o) + +(defn on-error + "Call onError on the given observer and return o." + [^Observer o e] + (.onError o e) + o) + +(defmacro catch-error-value + "Experimental + + TODO: Better name, better abstraction. + + Evaluate body and return its value. If an exception e is thrown, inject the + given value into the exception's cause and call (on-error error-observer e), + returning e. + + This is meant to facilitate implementing Observers that call user-supplied code + safely. The general pattern is something like: + + (fn [o v] + (rx/catch-error-value o v + (rx/on-next o (some-func v)))) + + If (some-func v) throws an exception, it is caught, v is injected into the + exception's cause (with OnErrorThrowable/addValueAsLastCause) and + (rx/on-error o e) is invoked. + + See: + rx.exceptions.OnErrorThrowable/addValueAsLastCause + " + [error-observer value & body] + `(try + ~@body + (catch Throwable e# + (on-error ~error-observer + (rx.exceptions.OnErrorThrowable/addValueAsLastCause e# ~value)) + e#))) + +;################################################################################ +; Tools for creating new operators and observables + +(declare unsubscribed?) + +(defn ^Subscriber subscriber + "Experimental, subject to change or deletion." + ([o on-next-action] (subscriber o on-next-action nil nil)) + ([o on-next-action on-error-action] (subscriber o on-next-action on-error-action nil)) + ([^Subscriber o on-next-action on-error-action on-completed-action] + (proxy [Subscriber] [o] + (onCompleted [] + (if on-completed-action + (on-completed-action o) + (on-completed o))) + (onError [e] + (if on-error-action + (on-error-action o e) + (on-error o e))) + (onNext [t] + (if on-next-action + (on-next-action o t) + (on-next o t)))))) + +(defn ^Subscription subscription + "Create a new subscription that calls the given no-arg handler function when + unsubscribe is called + + See: + rx.subscriptions.Subscriptions/create + " + [handler] + (Subscriptions/create ^Action0 (iop/action* handler))) + +(defn ^Observable$Operator operator* + "Experimental, subject to change or deletion. + + Returns a new implementation of rx.Observable$Operator that calls the given + function with a rx.Subscriber. The function should return a rx.Subscriber. + + See: + lift + rx.Observable$Operator + " + [f] + {:pre [(fn? f)]} + (reify Observable$Operator + (call [this o] + (f o)))) + +(defn ^Observable observable* + "Create an Observable from the given function. + + When subscribed to, (f subscriber) is called at which point, f can start emitting values, etc. + The passed subscriber is of type rx.Subscriber. + + See: + rx.Subscriber + rx.Observable/create + " + [f] + (Observable/create ^Observable$OnSubscribe (iop/action* f))) + +(defn wrap-on-completed + "Wrap handler with code that automaticaly calls rx.Observable.onCompleted." + [handler] + (fn [^Observer observer] + (handler observer) + (when-not (unsubscribed? observer) + (.onCompleted observer)))) + +(defn wrap-on-error + "Wrap handler with code that automaticaly calls (on-error) if an exception is thrown" + [handler] + (fn [^Observer observer] + (try + (handler observer) + (catch Throwable e + (when-not (unsubscribed? observer) + (.onError observer e)))))) + +(defn lift + "Lift the Operator op over the given Observable xs + + Example: + + (->> my-observable + (rx/lift (rx/operator ...)) + ...) + + See: + rx.Observable/lift + operator + " + [^Observable$Operator op ^Observable xs] + (.lift xs op)) + +;################################################################################ + +(defn ^Subscription subscribe + "Subscribe to the given observable. + + on-X-action is a normal clojure function. + + See: + rx.Observable/subscribe + " + + ([^Observable o on-next-action] + (.subscribe o + ^Action1 (iop/action* on-next-action))) + + ([^Observable o on-next-action on-error-action] + (.subscribe o + ^Action1 (iop/action* on-next-action) + ^Action1 (iop/action* on-error-action))) + + ([^Observable o on-next-action on-error-action on-completed-action] + (.subscribe o + ^Action1 (iop/action* on-next-action) + ^Action1 (iop/action* on-error-action) + ^Action0 (iop/action* on-completed-action)))) + +(defn unsubscribe + "Unsubscribe from Subscription s and return it." + [^Subscription s] + (.unsubscribe s) + s) + +(defn subscribe-on + "Cause subscriptions to the given observable to happen on the given scheduler. + + Returns a new Observable. + + See: + rx.Observable/subscribeOn + " + [^rx.Scheduler s ^Observable xs] + (.subscribeOn xs s)) + +(defn unsubscribe-on + "Cause unsubscriptions from the given observable to happen on the given scheduler. + + Returns a new Observable. + + See: + rx.Observable/unsubscribeOn + " + [^rx.Scheduler s ^Observable xs] + (.unsubscribeOn xs s)) + +(defn unsubscribed? + "Returns true if the given Subscription (or Subscriber) is unsubscribed. + + See: + rx.Observable/create + observable* + " + [^Subscription s] + (.isUnsubscribed s)) + +;################################################################################ +; Functions for creating Observables + +(defn ^Observable never + "Returns an Observable that never emits any values and never completes. + + See: + rx.Observable/never + " + [] + (Observable/never)) + +(defn ^Observable empty + "Returns an Observable that completes immediately without emitting any values. + + See: + rx.Observable/empty + " + [] + (Observable/empty)) + +(defn ^Observable return + "Returns an observable that emits a single value. + + See: + rx.Observable/just + " + [value] + (Observable/just value)) + +(defn ^Observable seq->o + "Make an observable out of some seq-able thing. The rx equivalent of clojure.core/seq." + [xs] + (if-let [s (clojure.core/seq xs)] + (Observable/from ^Iterable s) + (empty))) + +;################################################################################ +; Operators + +(defn synchronize + "Synchronize execution. + + See: + rx.Observable/synchronize + " + ([^Observable xs] + (.synchronize xs)) + ([lock ^Observable xs] + (.synchronize xs lock))) + +(defn merge* + "Merge an Observable of Observables into a single Observable + + If you want clojure.core/merge, it's just this: + + (rx/reduce clojure.core/merge {} maps) + + See: + merge + merge-delay-error* + rx.Observable/merge + " + [^Observable xs] + (Observable/merge xs)) + +(defn ^Observable merge + "Merge one or more Observables into a single observable. + + If you want clojure.core/merge, it's just this: + + (rx/reduce clojure.core/merge {} maps) + + See: + merge* + merge-delay-error + rx.Observable/merge + " + [& os] + (merge* (seq->o os))) + +(defn ^Observable merge-delay-error* + "Same as merge*, but all values are emitted before errors are propagated" + [^Observable xs] + (Observable/mergeDelayError xs)) + +(defn ^Observable merge-delay-error + "Same as merge, but all values are emitted before errors are propagated" + [& os] + (merge-delay-error* (seq->o os))) + +(defn cache + "caches the observable value so that multiple subscribers don't re-evaluate it. + + See: + rx.Observable/cache" + [^Observable xs] + (.cache xs)) + +(defn cons + "cons x to the beginning of xs" + [x xs] + (concat (return x) xs)) + +(defn ^Observable concat + "Concatenate the given Observables one after the another. + + Note that xs is separate Observables which are concatentated. To concatenate an + Observable of Observables, use concat* + + See: + rx.Observable/concat + concat* + " + [& xs] + (Observable/concat (seq->o xs))) + +(defn ^Observable concat* + "Concatenate the given Observable of Observables one after another. + + See: + rx.Observable/concat + concat + " + [^Observable os] + (Observable/concat os)) + +(defn count + "Returns an Observable that emits the number of items is xs as a long. + + See: + rx.Observable/longCount + " + [^Observable xs] + (.longCount xs)) + +(defn cycle + "Returns an Observable that emits the items of xs repeatedly, forever. + + TODO: Other sigs. + + See: + rx.Observable/repeat + clojure.core/cycle + " + [^Observable xs] + (.repeat xs)) + +(defn distinct + "Returns an Observable of the elements of Observable xs with duplicates + removed. key-fn, if provided, is a one arg function that determines the + key used to determined duplicates. key-fn defaults to identity. + + This implementation doesn't use rx.Observable/distinct because it doesn't + honor Clojure's equality semantics. + + See: + clojure.core/distinct + " + ([xs] (distinct identity xs)) + ([key-fn ^Observable xs] + (let [op (operator* (fn [o] + (let [seen (atom #{})] + (subscriber o + (fn [o v] + (let [key (key-fn v)] + (when-not (contains? @seen key) + (swap! seen conj key) + (on-next o v))))))))] + (lift op xs)))) + +(defn ^Observable do + "Returns a new Observable that, for each x in Observable xs, executes (do-fn x), + presumably for its side effects, and then passes x along unchanged. + + If do-fn throws an exception, that exception is emitted via onError and the sequence + is finished. + + Example: + + (->> (rx/seq->o [1 2 3]) + (rx/do println) + ...) + + Will print 1, 2, 3. + + See: + rx.Observable/doOnNext + " + [do-fn ^Observable xs] + (.doOnNext xs (iop/action* do-fn))) + +(defn ^Observable drop + [n ^Observable xs] + (.skip xs n)) + +(defn ^Observable drop-while + [p ^Observable xs] + (.skipWhile xs (fn->predicate p))) + +(defn ^Observable every? + "Returns an Observable that emits a single true value if (p x) is true for + all x in xs. Otherwise emits false. + + See: + clojure.core/every? + rx.Observable/all + " + [p ^Observable xs] + (.all xs (fn->predicate p))) + +(defn ^Observable filter + [p ^Observable xs] + (.filter xs (fn->predicate p))) + +(defn ^Observable first + "Returns an Observable that emits the first item emitted by xs, or an + empty Observable if xs is empty. + + See: + rx.Observable/takeFirst + " + [^Observable xs] + (.takeFirst xs)) + +(defn ^Observable group-by + "Returns an Observable of clojure.lang.MapEntry where the key is the result of + (key-fn x) and the val is an Observable of x for each key. + + This returns a clojure.lang.MapEntry rather than rx.observables.GroupedObservable + for some vague consistency with clojure.core/group-by and so that clojure.core/key, + clojure.core/val and destructuring will work as expected. + + See: + clojure.core/group-by + rx.Observable/groupBy + rx.observables.GroupedObservable + " + ([key-fn ^Observable xs] + (->> (.groupBy xs (iop/fn* key-fn)) + (map (fn [^GroupedObservable go] + (clojure.lang.MapEntry. (.getKey go) go)))))) + +(defn interleave* + "Returns an Observable of the first item in each Observable emitted by observables, then + the second etc. + + observables is an Observable of Observables + + See: + interleave + clojure.core/interleave + " + [observables] + (->> (map* #(seq->o %&) observables) + (concat*))) + +(defn interleave + "Returns an Observable of the first item in each Observable, then the second etc. + + Each argument is an individual Observable + + See: + observable* + clojure.core/interleave + " + [o1 & observables] + (->> (apply map #(seq->o %&) o1 observables) + (concat*))) + +(defn interpose + "Returns an Observable of the elements of xs separated by sep + + See: + clojure.core/interpose + " + [sep xs] + (let [op (operator* (fn [o] + (let [first? (atom true)] + (subscriber o (fn [o v] + (if-not (compare-and-set! first? true false) + (on-next o sep)) + (on-next o v))))))] + (lift op xs))) + +(defn into + "Returns an observable that emits a single value which is all of the + values of from-observable conjoined onto to + + See: + clojure.core/into + rx.Observable/toList + " + [to ^Observable from] + ; clojure.core/into uses transients if to is IEditableCollection + ; I don't think we have any guarantee that all on-next calls will be on the + ; same thread, so we can't do that here. + (reduce conj to from)) + +(defn iterate + "Returns an Observable of x, (f x), (f (f x)) etc. f must be free of side-effects + + See: + clojure.core/iterate + " + [f x] + (observable* (fn [s] + (loop [x x] + (when-not (unsubscribed? s) + (on-next s x) + (recur (f x))))))) + +(defn keep + [f xs] + (filter (complement nil?) (map f xs))) + +(defn keep-indexed + [f xs] + (filter (complement nil?) (map-indexed f xs))) + +(defn ^Observable map* + "Map a function over an Observable of Observables. + + Each item from the first emitted Observable is the first arg, each + item from the second emitted Observable is the second arg, and so on. + + See: + map + clojure.core/map + rx.Observable/zip + " + [f ^Observable observable] + (Observable/zip observable + ^rx.functions.FuncN (iop/fnN* f))) + +(defn ^Observable map + "Map a function over one or more observable sequences. + + Each item from the first Observable is the first arg, each item + from the second Observable is the second arg, and so on. + + See: + clojure.core/map + rx.Observable/zip + " + [f & observables] + (Observable/zip ^Iterable observables + ^rx.functions.FuncN (iop/fnN* f))) + +(defn ^Observable mapcat* + "Same as multi-arg mapcat, but input is an Observable of Observables. + + See: + mapcat + clojure.core/mapcat + " + [f ^Observable xs] + (->> xs + (map* f) + (concat*))) + +(defn ^Observable mapcat + "Returns an observable which, for each value x in xs, calls (f x), which must + return an Observable. The resulting observables are concatentated together + into one observable. + + If multiple Observables are given, the arguments to f are the first item from + each observable, then the second item, etc. + + See: + clojure.core/mapcat + rx.Observable/flatMap + " + [f & xs] + (if (clojure.core/next xs) + (mapcat* f (seq->o xs)) + ; use built-in flatMap for single-arg case + (.flatMap ^Observable (clojure.core/first xs) (iop/fn* f)))) + +(defn map-indexed + "Returns an observable that invokes (f index value) for each value of the input + observable. index starts at 0. + + See: + clojure.core/map-indexed + " + [f xs] + (let [op (operator* (fn [o] + (let [n (atom -1)] + (subscriber o + (fn [o v] + (catch-error-value o v + (on-next o (f (swap! n inc) v))))))))] + (lift op xs))) + +(def next + "Returns an observable that emits all but the first element of the input observable. + + See: + clojure.core/next + " + (partial drop 1)) + +(defn nth + "Returns an Observable that emits the value at the index in the given + Observable. nth throws an IndexOutOfBoundsException unless not-found + is supplied. + + Note that the Observable is the *first* arg! + " + ([^Observable xs index] + (.elementAt xs index)) + ([^Observable xs index not-found] + (.elementAtOrDefault xs index not-found))) + +(defn ^Observable partition-all + "Returns an Observable of Observables of n items each, at offsets step + apart. If step is not supplied, defaults to n, i.e. the partitions + do not overlap. May include partitions with fewer than n items at the end. + + See: + clojure.core/partition-all + rx.Observable/window + " + ([n ^Observable xs] (.window xs (int n))) + ([n step ^Observable xs] (.window xs (int n) (int step)))) + +(defn range + "Returns an Observable nums from start (inclusive) to end + (exclusive), by step, where start defaults to 0, step to 1, and end + to infinity. + + Note: this is not implemented on rx.Observable/range + + See: + clojure.core/range + " + ([] (range 0 Double/POSITIVE_INFINITY 1)) + ([end] (range 0 end 1)) + ([start end] (range start end 1)) + ([start end step] + (observable* (fn [s] + (let [comp (if (pos? step) < >)] + (loop [i start] + (if-not (unsubscribed? s) + (if (comp i end) + (do + (on-next s i) + (recur (+ i step))) + (on-completed s))))))))) + +(defn ^Observable reduce + ([f ^Observable xs] (.reduce xs (iop/fn* f))) + ([f val ^Observable xs] (.reduce xs val (iop/fn* f)))) + +(defn ^Observable reductions + ([f ^Observable xs] (.scan xs (iop/fn* f))) + ([f val ^Observable xs] (.scan xs val (iop/fn* f)))) + +(def rest + "Same as rx/next" + next) + +(defn some + "Returns an observable that emits the first logical true value of (pred x) for + any x in xs, else completes immediately. + + See: + clojure.core/some + " + [p ^Observable xs] + (->> xs + (map p) + (filter identity) + first)) + +(defn ^:private sorted-list-by + ([keyfn coll] (sorted-list-by keyfn clojure.core/compare coll)) + ([keyfn comp ^Observable coll] + (.toSortedList coll (iop/fn [a b] + ; force to int so rxjava doesn't have a fit + (int (comp (keyfn a) (keyfn b))))))) + +(defn sort + "Returns an observable that emits the items in xs, where the sort order is + determined by comparing items. If no comparator is supplied, uses compare. + comparator must implement java.util.Comparator. + + See: + clojure.core/sort + " + ([xs] + (sort clojure.core/compare xs)) + ([comp xs] + (->> xs + (sorted-list-by identity comp) + (mapcat seq->o)))) + +(defn sort-by + "Returns an observable that emits the items in xs, where the sort order is + determined by comparing (keyfn item). If no comparator is supplied, uses + compare. comparator must implement java.util.Comparator. + + See: + clojure.core/sort-by + " + ([keyfn xs] + (sort-by keyfn clojure.core/compare xs)) + ([keyfn comp ^Observable xs] + (->> xs + (sorted-list-by keyfn comp) + (mapcat seq->o)))) + +(defn split-with + "Returns an observable that emits a pair of observables + + [(take-while p xs) (drop-while p xs)] + + See: + rx.lang.clojure/take-while + rx.lang.clojure/drop-while + clojure.core/split-with + " + [p xs] + (return [(take-while p xs) (drop-while p xs)])) + +(defn ^Observable take + "Returns an observable that emits the first n elements of xs. + + See: + clojure.core/take + " + [n ^Observable xs] + {:pre [(>= n 0)]} + (.take xs n)) + +(defn take-while + "Returns an Observable that emits xs until the first x such that + (p x) is falsey. + + See: + clojure.core/take-while + rx.Observable/takeWhile + " + [p ^Observable xs] + (.takeWhile xs (fn->predicate p))) + +;################################################################################; + +(defn throw + "Returns an Observable the simply emits the given exception with on-error + + See: + rx.Observable/error + " + [^Throwable e] + (Observable/error e)) + +(defn catch* + "Returns an observable that, when Observable o triggers an error, e, continues with + Observable returned by (f e) if (p e) is true. If (p e) returns a Throwable + that value is passed as e. + + If p is a class object, a normal instance? check is performed rather than calling it + as a function. If the value returned by (p e) is not true, the error is propagated. + + Examples: + + (->> my-observable + + ; On IllegalArgumentException, just emit 1 + (catch* IllegalArgumentException + (fn [e] (rx/return 1))) + + ; If exception message contains \"WAT\", emit [\\W \\A \\T] + (catch* (fn [e] (-> e .getMessage (.contains \"WAT\"))) + (fn [e] (rx/seq->o [\\W \\A \\T])))) + + See: + rx.Observable/onErrorResumeNext + http://netflix.github.io/RxJava/javadoc/rx/Observable.html#onErrorResumeNext(rx.util.functions.Func1) + " + [p f ^Observable o] + (let [p (if (class? p) + (fn [e] (.isInstance ^Class p e)) + p)] + (.onErrorResumeNext o + ^Func1 (iop/fn [e] + (if-let [maybe-e (p e)] + (f (if (instance? Throwable maybe-e) + maybe-e + e)) + (rx.lang.clojure.core/throw e)))))) + +(defmacro catch + "Macro version of catch*. + + The body of the catch is wrapped in an implicit (do). It must evaluate to an Observable. + + Note that the source observable is the last argument so this works with ->> but may look + slightly odd when used standalone. + + Example: + + (->> my-observable + ; just emit 0 on IllegalArgumentException + (catch IllegalArgumentException e + (rx/return 0)) + + (catch DependencyException e + (if (.isMinor e) + (rx/return 0) + (rx/throw (WebException. 503))))) + + See: + catch* + " + {:arglists '([p binding & body observable])} + [p binding & body] + (let [o (last body) + body (butlast body)] + `(catch* ~p + (fn [~binding] ~@body) + ~o))) + +(defn finally* + "Returns an Observable that, as a side-effect, executes (f) when the given + Observable completes regardless of success or failure. + + Example: + + (->> my-observable + (finally* (fn [] (println \"Done\")))) + + " + [f ^Observable o] + (.finallyDo o ^Action0 (iop/action* f))) + +(defmacro finally + "Macro version of finally*. + + Note that the source observable is the last argument so this works with ->> but may look + slightly odd when used standalone. + + Example: + + (->> my-observable + (finally (println \"Done\"))) + + See: + finally* + " + {:arglists '([& body observable])} + [& body] + (let [o (last body) + body (butlast body)] + `(finally* (fn [] ~@body) ~o))) + +;################################################################################; + +(defn generator* + "Creates an observable that calls (f observer & args) which should emit values + with (rx/on-next observer value). + + Automatically calls on-completed on return, or on-error if any exception is thrown. + + f should exit early if (rx/unsubscribed? observable) returns true + + Examples: + + ; An observable that emits just 99 + (rx/generator* on-next 99) + " + [f & args] + (observable* (-> #(apply f % args) + wrap-on-completed + wrap-on-error))) + +(defmacro generator + "Create an observable that executes body which should emit values with + (rx/on-next observer value) where observer comes from bindings. + + Automatically calls on-completed on return, or on-error if any exception is thrown. + + The body should exit early if (rx/unsubscribed? observable) returns true + + Examples: + + ; make an observer that emits [0 1 2 3 4] + (generator [observer] + (dotimes [i 5] + (on-next observer i))) + + " + [bindings & body] + `(generator* (fn ~bindings ~@body))) + +;################################################################################; + +; Import public graph symbols here. I want them in this namespace, but implementing +; them here with all the clojure.core symbols excluded is a pain. +(intern *ns* (with-meta 'let-o* (meta #'graph/let-o*)) @#'graph/let-o*) +(intern *ns* (with-meta 'let-o (meta #'graph/let-o)) @#'graph/let-o) + +;################################################################################; + +; Import some public realized symbols here. I want them in this namespace, but implementing +; them here with all the clojure.core symbols excluded is a pain. +(intern *ns* (with-meta 'let-realized (meta #'realized/let-realized)) @#'realized/let-realized) + diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj new file mode 100644 index 0000000000..83b56b27b6 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/future.clj @@ -0,0 +1,71 @@ +(ns rx.lang.clojure.future + "Functions and macros for making rx-ified futures. That is, run some code in some + other thread and return an Observable of its result. + " + (:require [rx.lang.clojure.interop :as iop] + [rx.lang.clojure.core :as rx])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defn future* + "Exerimental/Possibly a bad idea + + Execute (f & args) in a separate thread and pass the result to onNext. + If an exception is thrown, onError is called with the exception. + + runner is a function that takes a no-arg function argument and returns a future + representing the execution of that function. + + Returns an Observable. If the subscriber unsubscribes, the future will be canceled + with clojure.core/future-cancel + + Examples: + + (subscribe (rx/future future-call + #(slurp \"input.txt\")) + (fn [v] (println \"Got: \" v))) + ; eventually outputs content of input.txt + " + [runner f & args] + {:pre [(ifn? runner) (ifn? f)]} + (rx/observable* (fn [^rx.Subscriber observer] + (let [wrapped (-> #(rx/on-next % (apply f args)) + rx/wrap-on-completed + rx/wrap-on-error) + fu (runner #(wrapped observer))] + (.add observer + (rx/subscription #(future-cancel fu))))))) + +(defn future-generator* + "Exerimental/Possibly a bad idea + + Same as rx/generator* except f is invoked in a separate thread. + + runner is a function that takes a no-arg function argument and returns a future + representing the execution of that function. + + Returns an Observable. If the subscriber unsubscribes, the future will be canceled + with clojure.core/future-cancel + + Example: + + (future-generator* future-call + (fn [o] + (rx/on-next o 1) + (Thread/sleep 1000) + (rx/on-next o 2))) + + See: + rx.lang.clojure.core/generator* + " + [runner f & args] + {:pre [(ifn? runner) (ifn? f)]} + (rx/observable* (fn [^rx.Subscriber observer] + (let [wrapped (-> (fn [o] + (apply f o args)) + rx/wrap-on-completed + rx/wrap-on-error) + fu (runner #(wrapped observer))] + (.add observer + (rx/subscription #(future-cancel fu))))))) diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/graph.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/graph.clj new file mode 100644 index 0000000000..a67ebba47c --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/graph.clj @@ -0,0 +1,141 @@ +(ns rx.lang.clojure.graph + "This is an implementation namespace. Don't use it directly. Use the symbols + in rx.lang.clojure.core + " + (:require [clojure.set :as set])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defn ^:private ->let-o*-observable + [^rx.Observable o n name] + (if (= n 1) + o + ; TODO This is a shortcut. We know the expected number of subscriptions so + ; we only need to cache values until we get the nth subscription at which + ; point, it just becomes a pass through. I haven't found a cache/replay-ish + ; operator that gives this level of control over the cached values + (.cache o))) + +(defn let-o* + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + Given a graph description, returns an observable that emits a single + map of observables all hooked up and ready for subscription. + + A graph is a map from name to a map with keys: + + :deps A vector of dependency names + :factory A function that takes a map from name to Observable + for the names in :deps and returns an Observable + + Returns a map from name to Observable. Additionally, there will be a + ::non-terminals key in the map with a vector of non-terminal names. + + See: + let-o + " + [description] + (let [in-dep-counts (->> description + vals + (mapcat :deps) + frequencies) + terminals (set/difference (set (keys description)) (set (keys in-dep-counts))) + non-terminals (vec (keys in-dep-counts)) + + resolve-node (fn resolve-node [state {:keys [id deps factory] :as node}] + (let [existing (state id)] + (cond + ; It's already resolving up the stack. We've hit a cycle. + (= ::resolving existing) (throw (IllegalArgumentException. (format "Cycle found at '%s'" id))) + + ; It's already resolved. Done. + (not (nil? existing)) state + + :else + ; recursively resolve dependencies + (let [new-state (reduce (fn [s dep] + (if-let [dep-node (description dep)] + (resolve-node s (assoc dep-node :id dep)) + (throw (IllegalArgumentException. (format "Unknown node '%s' referenced from '%s'" dep id))))) + (assoc state id ::resolving) + deps) + ; execute the factory function and wrap it in an observable that delays dependencies + o (-> (select-keys new-state deps) + factory + (->let-o*-observable (in-dep-counts id 1) id))] + ; return the updated state with the resolved node + (assoc new-state id o)))))] + ; resolve the graph and build the result map + (-> (reduce (fn [s [id node]] + (resolve-node s (assoc node :id id))) + {} + description) + (select-keys terminals) + (assoc ::non-terminals non-terminals)))) + +(defmacro let-o + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + Similar to clojure.core/let, but bindings are Observables and the result of the body + must be an Observable. Binding names must start with ?. Binding order doesn't matter + and any binding is visible to all other expressions as long as no cycles are produced + in the resulting Observable expression. + + The key difference here is that the macro can identify the dependencies between Observables + and correctly connect them, protecting from variations in subscribe behavior as well as + the idiosyncracies of setting up multiple subscriptions to Observables. + + This is only very useful for constructing graphs of Observables where you'd usually have + to fiddle around with publish, connect, replay and all that stuff. If you have a linear + sequence of operators, just chain them together. + + Current limitations: + + * All Observables are cache()'d so watch out for large sequences. This will be + fixed eventually. + * let-o cannot be nested. Some deep-walking macro-magic will be required for this. + + Example: + + ; Note that both ?c and ?d depend on ?b and the result Observable depends on + ; ?c and ?d. + (let-o [?a (rx/return 99) + ?b (... some observable network request ...) + ?c (rx/map vector ?a ?b) + ?d (rx/map ... ?b)] + (rx/map vector ?c ?d)) + + See: + let-o* + " + [bindings & result-body] + (let [sym->dep-sym (fn [s] + (when (and (symbol? s) + (not (namespace s)) + (.startsWith (name s) "?")) + s)) + body->dep-syms (fn [body] + (->> body + flatten + (keep sym->dep-sym) + distinct + vec)) + ->node-map (fn [[id & body]] + (let [dep-syms (body->dep-syms body) + dep-keys (->> dep-syms (map (comp keyword name)) vec)] + [(keyword (name id)) {:deps dep-keys + :factory `(fn [{:keys ~dep-syms}] ~@body) }])) + node-map (let [base-map (->> bindings + (partition 2) + (map ->node-map) + (into {})) + result-dep-syms (body->dep-syms result-body)] + (assoc base-map + :rx.lang.clojure.core/result + {:deps (mapv keyword result-dep-syms) + :factory `(fn [{:keys ~result-dep-syms}] ~@result-body) }))] + `(->> ~node-map + let-o* + :rx.lang.clojure.core/result))) + diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/realized.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/realized.clj new file mode 100644 index 0000000000..2926633924 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/realized.clj @@ -0,0 +1,134 @@ +(ns rx.lang.clojure.realized + (:require [rx.lang.clojure.interop :as iop])) + +(def ^:private -ns- *ns*) +(set! *warn-on-reflection* true) + +(defrecord ^:private PostProc [o f]) + +(defn all + "Tell realized map to capture all output of the observable, not just the last one" + [o] + (->PostProc o identity)) + +(defn only + "Tell realized map to capture the only value emitted by the observable. + If there are 0 or more than one values, an IllegalStateException is thrown + which should propagate to onError. + + This is the default mode of realized-map and let-realized. + " + [o] + (->PostProc o (fn [values] + (condp = (count values) + 1 (first values) + (throw (IllegalStateException. "Observable did not produce exactly one value")))))) + +(defn ^:private ->post-proc + [v] + (cond + (instance? rx.Observable v) (only v) + (instance? PostProc v) v + (vector? v) (->PostProc (first v) + (apply comp (reverse (next v)))) + :else (->post-proc (rx.Observable/just v)))) + +(defn realized-map + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + See let-realized. + + Given a map from key to observable, returns an observable that emits a single + map from the same keys to the values emitted by their corresponding observable. + + keyvals is a list of key/value pairs where key is a key in the emitted map and val + can be one of the following: + + rx.Observable The only value of the emitted sequence is bound to the key. This is the + default since this is often a singleton response from a request. If the + Observable produces 0 or more than 1 values, an IllegalStateException is + produced. + + vector The first element of the vector must be an Observable. Remaining elements + are functions applied in sequence to the list of values emitted by the + observable. For example [my-observable first] will result in a single + value in the emitted map rather than a vector of values. + + other The value is placed in the emitted map as is + + Note the observable can also be wrapped with realized/all to get the full list rather than + just the last value. + + The purpose of this is to simplify the messy pattern of mapping observables to + single key maps, merging and then folding all the separate maps together. So code + like this: + + ; TODO update + (->> (rx/merge (->> (user-info-o user-id) + (rx/map (fn [u] {:user u}))) + (->> (user-likes-o user-id) + (rx/map (fn [u] {:likes u})))) + (rx/reduce merge {})) + + becomes: + + (realized-map :user (user-info-o user-id) + :likes (user-likes-o user-id)) + + See: + let-realized + " + [& keyvals] + (let [o (->> keyvals + (partition 2) + ; generate a sequence of observables + (map (fn [[k v]] + (let [{:keys [^rx.Observable o f]} (->post-proc v)] + ; pour the observable into a single list and apply post-proc func to it + (-> o + .toList + (.map (iop/fn [list] {k (f list)})))))))] + + (-> ^Iterable o + (rx.Observable/merge) ; funnel all the observables into a single sequence + (.reduce {} (iop/fn* merge))))) ; do the map merge dance + +(defn ^rx.Observable realized-map* + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + Same as realized-map, but takes a map argument rather than key-value pairs." + [map-description] + (apply realized-map (apply concat map-description))) + +(defmacro let-realized + "EXTREMELY EXPERIMENTAL AND SUBJECT TO CHANGE OR DELETION + + 'let' version of realized map. + + (let-realized [a (make-observable)] + (* 2 a)) + + is equivalent to: + + (->> (realized-map :a (make-observable)) + (map (fn [{:keys [a]}] (* 2 a)))) + + That is, it eliminates the repition of the map keys when you want to do something + with the final result. + + Evaluates to an Observable that emits the value of the let body. + + See: + rx.lang.clojure.realized/realized-map + rx.lang.clojure.realized/all + " + [bindings & body] + (let [b-parts (partition 2 bindings) + b-map (->> b-parts + (map (fn [[k v]] + [(keyword (name k)) v])) + (into {})) + b-names (mapv first b-parts)] + `(.map (realized-map* ~b-map) + (iop/fn [{:keys ~b-names}] ~@body)))) + diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/blocking_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/blocking_test.clj new file mode 100644 index 0000000000..df8e9fae1e --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/blocking_test.clj @@ -0,0 +1,65 @@ +(ns rx.lang.clojure.blocking-test + (:require [rx.lang.clojure.blocking :as b] + [rx.lang.clojure.core :as rx] + [clojure.test :refer [deftest testing is]])) + +(deftest test-->blocking + (testing "returns a BlockingObservable from an Observable" + (is (instance? rx.observables.BlockingObservable (b/->blocking (rx/return 0))))) + + (testing "is idempotent" + (is (instance? rx.observables.BlockingObservable (b/->blocking (b/->blocking (rx/return 0))))))) + + +(deftest test-o->seq + (is (= [1 2 3] (b/o->seq (rx/seq->o [1 2 3]))))) + +(deftest test-first + (testing "returns first element of observable" + (is (= 1 (b/first (rx/seq->o [1 2 3]))))) + (testing "returns nil for empty observable" + (is (nil? (b/first (rx/empty))))) + (testing "rethrows errors" + (is (thrown? java.io.FileNotFoundException + (b/first (rx/throw (java.io.FileNotFoundException. "boo"))))))) + +(deftest test-last + (testing "returns last element of observable" + (is (= 3 (b/last (rx/seq->o [1 2 3]))))) + (testing "returns nil for empty observable" + (is (nil? (b/last (rx/empty))))) + (testing "rethrows errors" + (is (thrown? java.io.FileNotFoundException + (b/last (rx/throw (java.io.FileNotFoundException. "boo"))))))) + +(deftest test-single + (testing "returns one element" + (is (= 1 (b/single (rx/return 1))))) + (testing "throw if empty" + (is (thrown? java.lang.IllegalArgumentException (b/single (rx/empty))))) + (testing "throw if many" + (is (thrown? java.lang.IllegalArgumentException (b/single (rx/seq->o [1 2]))))) + (testing "rethrows errors" + (is (thrown? java.io.FileNotFoundException + (b/single (rx/throw (java.io.FileNotFoundException. "boo"))))))) + +(deftest test-into + (is (= [1 2 3] + (b/into [1] (rx/seq->o [2 3])))) + (testing "rethrows errors" + (is (thrown? java.io.FileNotFoundException + (b/into #{} (rx/throw (java.io.FileNotFoundException. "boo"))))))) + +(deftest test-doseq + (is (= (range 3) + (let [capture (atom [])] + (b/doseq [{:keys [value]} (rx/seq->o (map #(hash-map :value %) (range 3)))] + (println value) + (swap! capture conj value)) + @capture))) + + (testing "rethrows errors" + (is (thrown? java.io.FileNotFoundException + (b/doseq [i (rx/seq->o (range 3))] + (throw (java.io.FileNotFoundException. "boo"))))))) + diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/chunk_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/chunk_test.clj new file mode 100644 index 0000000000..58ef044c9d --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/chunk_test.clj @@ -0,0 +1,65 @@ +(ns rx.lang.clojure.chunk-test + (:require [rx.lang.clojure.chunk :as rx-chunk] + [rx.lang.clojure.core :as rx] + [rx.lang.clojure.future :as rx-future] + [rx.lang.clojure.blocking :as rx-blocking] + [clojure.test :refer [deftest testing is]])) + + +(deftest test-chunk + (let [n 20 + chunk-size 10 + factory (rx-future/future-generator* + future-call + (fn[o] + (doseq [i (range n)] + (Thread/sleep (rand-int 50)) + (rx/on-next o (rx-future/future* + future-call + #(let [t (rand-int 500)] + (Thread/sleep t) + i))))))] + (is (= (range n) + (sort (rx-blocking/into [] + (rx-chunk/chunk chunk-size {:debug true} factory))))))) + +(deftest test-chunk-with-error + (testing "error from source is propagated" + (let [n 20 + chunk-size 4 + factory (rx-future/future-generator* + future-call + (fn [o] + (doseq [i (range n)] + (Thread/sleep (rand-int 50)) + (rx/on-next o (rx-future/future* + future-call + #(let [t (rand-int 1000)] + (Thread/sleep t) + i)))) + (throw (IllegalArgumentException. "hi"))))] + (is (thrown-with-msg? IllegalArgumentException #"hi" + (rx-blocking/into [] + (rx-chunk/chunk chunk-size {:debug true} factory)))))) + + (testing "error from single observable is propagated" + (let [n 20 + chunk-size 4 + factory (rx-future/future-generator* + future-call + (fn [o] + (doseq [i (range n)] + (Thread/sleep (rand-int 50)) + (rx/on-next o (rx-future/future* + future-call + #(let [t (rand-int 1000)] + (throw (IllegalArgumentException. "byebye")) + (Thread/sleep t) + i))))))] + (is (thrown? rx.exceptions.CompositeException + (rx-blocking/into [] + (rx-chunk/chunk chunk-size + {:debug true + :delay-error? true } + factory))))))) + diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj new file mode 100644 index 0000000000..d77b184bd8 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj @@ -0,0 +1,681 @@ +(ns rx.lang.clojure.core-test + (:require [rx.lang.clojure.core :as rx] + [rx.lang.clojure.blocking :as b] + [rx.lang.clojure.future :as f] + [clojure.test :refer [deftest is testing are]])) + +(deftest test-observable? + (is (rx/observable? (rx/return 99))) + (is (not (rx/observable? "I'm not an observable")))) + +(deftest test-on-next + (testing "calls onNext" + (let [called (atom []) + o (reify rx.Observer (onNext [this value] (swap! called conj value)))] + (is (identical? o (rx/on-next o 1))) + (is (= [1] @called))))) + +(deftest test-on-completed + (testing "calls onCompleted" + (let [called (atom 0) + o (reify rx.Observer (onCompleted [this] (swap! called inc)))] + (is (identical? o (rx/on-completed o))) + (is (= 1 @called))))) + +(deftest test-on-error + (testing "calls onError" + (let [called (atom []) + e (java.io.FileNotFoundException. "yum") + o (reify rx.Observer (onError [this e] (swap! called conj e)))] + (is (identical? o (rx/on-error o e))) + (is (= [e] @called))))) + +(deftest test-catch-error-value + (testing "if no exception, returns body" + (let [o (reify rx.Observer)] + (is (= 3 (rx/catch-error-value o 99 + (+ 1 2)))))) + + (testing "exceptions call onError on observable and inject value in exception" + (let [called (atom []) + e (java.io.FileNotFoundException. "boo") + o (reify rx.Observer + (onError [this e] + (swap! called conj e))) + result (rx/catch-error-value o 100 + (throw e)) + cause (.getCause e)] + (is (identical? e result)) + (is (= [e] @called)) + (when (is (instance? rx.exceptions.OnErrorThrowable$OnNextValue cause)) + (is (= 100 (.getValue cause))))))) + +(deftest test-subscribe + (testing "subscribe overload with only onNext" + (let [o (rx/return 1) + called (atom nil)] + (rx/subscribe o (fn [v] (swap! called (fn [_] v)))) + (is (= 1 @called))))) + +(deftest test-fn->predicate + (are [f arg result] (= result (.call (rx/fn->predicate f) arg)) + identity nil false + identity false false + identity 1 true + identity "true" true + identity true true)) + +(deftest test-subscription + (let [called (atom 0) + s (rx/subscription #(swap! called inc))] + (is (identical? s (rx/unsubscribe s))) + (is (= 1 @called)))) + +(deftest test-unsubscribed? + (let [s (rx/subscription #())] + (is (not (rx/unsubscribed? s))) + (rx/unsubscribe s) + (is (rx/unsubscribed? s)))) + + +(deftest test-observable* + (let [o (rx/observable* (fn [s] + (rx/on-next s 0) + (rx/on-next s 1) + (when-not (rx/unsubscribed? s) (rx/on-next s 2)) + (rx/on-completed s)))] + (is (= [0 1 2] (b/into [] o))))) + +(deftest test-operator* + (let [o (rx/operator* #(rx/subscriber % + (fn [o v] + (if (even? v) + (rx/on-next o v))))) + result (->> (rx/seq->o [1 2 3 4 5]) + (rx/lift o) + (b/into []))] + (is (= [2 4] result)))) + +(deftest test-syncrhonize + ; I'm going to believe synchronize works and just exercise it + ; here for sanity. + (is (= [1 2 3] + (->> [1 2 3] + (rx/seq->o) + (rx/synchronize) + (b/into [])))) + (let [lock (Object.)] + (is (= [1 2 3] + (->> [1 2 3] + (rx/seq->o) + (rx/synchronize lock) + (b/into [])))))) + +(let [expected-result [[1 3 5] [2 4 6]] + sleepy-o #(f/future-generator* + future-call + (fn [o] + (doseq [x %] + (Thread/sleep 10) + (rx/on-next o x)))) + make-inputs (fn [] (mapv sleepy-o expected-result)) + make-output (fn [r] [(keep #{1 3 5} r) + (keep #{2 4 6} r)])] + (deftest test-merge* + (is (= expected-result + (->> (make-inputs) + (rx/seq->o) + (rx/merge*) + (b/into []) + (make-output))))) + (deftest test-merge + (is (= expected-result + (->> (make-inputs) + (apply rx/merge) + (b/into []) + (make-output))))) + (deftest test-merge-delay-error* + (is (= expected-result + (->> (make-inputs) + (rx/seq->o) + (rx/merge-delay-error*) + (b/into []) + (make-output))))) + (deftest test-merge-delay-error + (is (= expected-result + (->> (make-inputs) + (apply rx/merge-delay-error) + (b/into []) + (make-output)))))) + +(deftest test-generator + (testing "calls on-completed automatically" + (let [o (rx/generator [o]) + called (atom nil)] + (rx/subscribe o (fn [v]) (fn [_]) #(reset! called "YES")) + (is (= "YES" @called)))) + + (testing "exceptions automatically go to on-error" + (let [expected (IllegalArgumentException. "hi") + actual (atom nil)] + (rx/subscribe (rx/generator [o] (throw expected)) + #() + #(reset! actual %)) + (is (identical? expected @actual))))) + +(deftest test-seq->o + (is (= [] (b/into [] (rx/seq->o [])))) + (is (= [] (b/into [] (rx/seq->o nil)))) + (is (= [\a \b \c] (b/into [] (rx/seq->o "abc")))) + (is (= [0 1 2 3] (b/first (rx/into [] (rx/seq->o (range 4)))))) + (is (= #{0 1 2 3} (b/first (rx/into #{} (rx/seq->o (range 4)))))) + (is (= {:a 1 :b 2 :c 3} (b/first (rx/into {} (rx/seq->o [[:a 1] [:b 2] [:c 3]])))))) + +(deftest test-return + (is (= [0] (b/into [] (rx/return 0))))) + +(deftest test-cache + (let [value (atom 0) + o (->> + (rx/return 0) + (rx/map (fn [x] (swap! value inc))) + (rx/cache))] + (is (= 1 (b/single o))) + (is (= 1 @value)) + (is (= 1 (b/single o))) + (is (= 1 @value)) + (is (= 1 (b/single o))))) + +(deftest test-cons + (is (= [1] (b/into [] (rx/cons 1 (rx/empty))))) + (is (= [1 2 3 4] (b/into [] (rx/cons 1 (rx/seq->o [2 3 4])))))) + +(deftest test-concat + (is (= [:q :r] + (b/into [] (rx/concat (rx/seq->o [:q :r]))))) + (is (= [:q :r 1 2 3] + (b/into [] (rx/concat (rx/seq->o [:q :r]) + (rx/seq->o [1 2 3])))))) + +(deftest test-concat* + (is (= [:q :r] + (b/into [] (rx/concat* (rx/return (rx/seq->o [:q :r])))))) + (is (= [:q :r 1 2 3] + (b/into [] (rx/concat* (rx/seq->o [(rx/seq->o [:q :r]) + (rx/seq->o [1 2 3])])))))) + +(deftest test-count + (are [xs] (= (count xs) (->> xs (rx/seq->o) (rx/count) (b/single))) + [] + [1] + [5 6 7] + (range 10000))) + +(deftest test-cycle + (is (= [1 2 3 1 2 3 1 2 3 1 2] + (->> [1 2 3] + (rx/seq->o) + (rx/cycle) + (rx/take 11) + (b/into []))))) + +(deftest test-distinct + (let [input [{:a 1} {:a 1} {:b 1} {"a" (int 1)} {:a (int 1)}]] + (is (= (distinct input) + (->> input + (rx/seq->o) + (rx/distinct) + (b/into []))))) + (let [input [{:name "Bob" :x 2} {:name "Jim" :x 99} {:name "Bob" :x 3}]] + (is (= [{:name "Bob" :x 2} {:name "Jim" :x 99}] + (->> input + (rx/seq->o) + (rx/distinct :name) + (b/into [])))))) + +(deftest test-do + (testing "calls a function with each element" + (let [collected (atom [])] + (is (= [1 2 3] + (->> (rx/seq->o [1 2 3]) + (rx/do (fn [v] + (swap! collected conj (* 2 v)))) + (rx/do (partial println "GOT")) + (b/into [])))) + (is (= [2 4 6] @collected)))) + (testing "ends sequence with onError if action code throws an exception" + (let [collected (atom []) + o (->> (rx/seq->o [1 2 3]) + (rx/do (fn [v] + (if (= v 2) + (throw (IllegalStateException. (str "blah" v))) + (swap! collected conj (* 99 v))))))] + (is (thrown-with-msg? IllegalStateException #"blah2" + (b/into [] o))) + (is (= [99] @collected))))) + +(deftest test-drop-while + (is (= (into [] (drop-while even? [2 4 6 8 1 2 3])) + (b/into [] (rx/drop-while even? (rx/seq->o [2 4 6 8 1 2 3]))))) + (is (= (into [] (drop-while even? [2 4 6 8 1 2 3])) + (b/into [] (rx/drop-while even? (rx/seq->o [2 4 6 8 1 2 3])))))) + +(deftest test-every? + (are [xs p result] (= result (->> xs (rx/seq->o) (rx/every? p) (b/single))) + [2 4 6 8] even? true + [2 4 3 8] even? false + [1 2 3 4] #{1 2 3 4} true + [1 2 3 4] #{1 3 4} false)) + +(deftest test-filter + (is (= (into [] (->> [:a :b :c :d :e :f :G :e] + (filter #{:b :e :G}))) + (b/into [] (->> (rx/seq->o [:a :b :c :d :e :f :G :e]) + (rx/filter #{:b :e :G})))))) + +(deftest test-first + (is (= [3] + (b/into [] (rx/first (rx/seq->o [3 4 5]))))) + (is (= [] + (b/into [] (rx/first (rx/empty)))))) + +(deftest test-group-by + (let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]] + (testing "with just a key-fn" + (is (= [[:a {:k :a :v 1}] + [:b {:k :b :v 2}] + [:a {:k :a :v 3}] + [:c {:k :c :v 4}]] + (->> xs + (rx/seq->o) + (rx/group-by :k) + (rx/mapcat (fn [[k vo :as me]] + (is (instance? clojure.lang.MapEntry me)) + (rx/map #(vector k %) vo))) + (b/into []))))) + + ; TODO reinstate once this is implemented + ; see https://github.com/Netflix/RxJava/commit/02ccc4d727a9297f14219549208757c6e0efce2a + #_(testing "with a val-fn" + (is (= [[:a 1] + [:b 2] + [:a 3] + [:c 4]] + (->> xs + (rx/seq->o) + (rx/group-by :k :v) + (rx/mapcat (fn [[k vo :as me]] + (is (instance? clojure.lang.MapEntry me)) + (rx/map #(vector k %) vo))) + (b/into []))))))) + +(deftest test-interleave + (are [inputs] (= (apply interleave inputs) + (->> (apply rx/interleave (map rx/seq->o inputs)) + (b/into []))) + [[] []] + [[] [1]] + [(range 5) (range 10) (range 10) (range 3)] + [(range 50) (range 10)] + [(range 5) (range 10 60) (range 10) (range 50)]) + + ; one-arg case, not supported by clojure.core/interleave + (is (= (range 10) + (->> (rx/interleave (rx/seq->o (range 10))) + (b/into []))))) + +(deftest test-interleave* + (are [inputs] (= (apply interleave inputs) + (->> (rx/interleave* (->> inputs + (map rx/seq->o) + (rx/seq->o))) + (b/into []))) + [[] []] + [[] [1]] + [(range 5) (range 10) (range 10) (range 3)] + [(range 50) (range 10)] + [(range 5) (range 10 60) (range 10) (range 50)])) + +(deftest test-interpose + (is (= (interpose \, [1 2 3]) + (b/into [] (rx/interpose \, (rx/seq->o [1 2 3])))))) + +(deftest test-into + (are [input to] (= (into to input) + (b/single (rx/into to (rx/seq->o input)))) + [6 7 8] [9 10 [11]] + #{} [1 2 3 2 4 5] + {} [[1 2] [3 2] [4 5]] + {} [] + '() (range 50))) + +(deftest test-iterate + (are [f x n] (= (->> (iterate f x) (take n)) + (->> (rx/iterate f x) (rx/take n) (b/into []))) + inc 0 10 + dec 20 100 + #(conj % (count %)) [] 5 + #(cons (count %) % ) nil 5)) + +(deftest test-keep + (is (= (into [] (keep identity [true true false])) + (b/into [] (rx/keep identity (rx/seq->o [true true false]))))) + + (is (= (into [] (keep #(if (even? %) (* 2 %)) (range 9))) + (b/into [] (rx/keep #(if (even? %) (* 2 %)) (rx/seq->o (range 9))))))) + +(deftest test-keep-indexed + (is (= (into [] (keep-indexed (fn [i v] + (if (even? i) v)) + [true true false])) + (b/into [] (rx/keep-indexed (fn [i v] + (if (even? i) v)) + (rx/seq->o [true true false])))))) + +(deftest test-map + (is (= (into {} (map (juxt identity name) + [:q :r :s :t :u])) + (b/into {} (rx/map (juxt identity name) + (rx/seq->o [:q :r :s :t :u]))))) + (is (= (into [] (map vector + [:q :r :s :t :u] + (range 10) + ["a" "b" "c" "d" "e"] )) + (b/into [] (rx/map vector + (rx/seq->o [:q :r :s :t :u]) + (rx/seq->o (range 10) ) + (rx/seq->o ["a" "b" "c" "d" "e"] ))))) + ; check > 4 arg case + (is (= (into [] (map vector + [:q :r :s :t :u] + [:q :r :s :t :u] + [:q :r :s :t :u] + (range 10) + (range 10) + (range 10) + ["a" "b" "c" "d" "e"] + ["a" "b" "c" "d" "e"] + ["a" "b" "c" "d" "e"])) + (b/into [] (rx/map vector + (rx/seq->o [:q :r :s :t :u]) + (rx/seq->o [:q :r :s :t :u]) + (rx/seq->o [:q :r :s :t :u]) + (rx/seq->o (range 10)) + (rx/seq->o (range 10)) + (rx/seq->o (range 10)) + (rx/seq->o ["a" "b" "c" "d" "e"]) + (rx/seq->o ["a" "b" "c" "d" "e"]) + (rx/seq->o ["a" "b" "c" "d" "e"])))))) + +(deftest test-map* + (is (= [[1 2 3 4 5 6 7 8]] + (b/into [] (rx/map* vector + (rx/seq->o [(rx/seq->o [1]) + (rx/seq->o [2]) + (rx/seq->o [3]) + (rx/seq->o [4]) + (rx/seq->o [5]) + (rx/seq->o [6]) + (rx/seq->o [7]) + (rx/seq->o [8])])))))) +(deftest test-map-indexed + (is (= (map-indexed vector [:a :b :c]) + (b/into [] (rx/map-indexed vector (rx/seq->o [:a :b :c]))))) + (testing "exceptions from fn have error value injected" + (try + (->> (rx/seq->o [:a :b :c]) + (rx/map-indexed (fn [i v] + (if (= 1 i) + (throw (java.io.FileNotFoundException. "blah"))) + v)) + (b/into [])) + (catch java.io.FileNotFoundException e + (is (= :b (-> e .getCause .getValue))))))) + +(deftest test-mapcat* + (let [f (fn [a b c d e] + [(+ a b) (+ c d) e])] + (is (= (->> (range 5) + (map (fn [_] (range 5))) + (apply mapcat f)) + (->> (range 5) + (map (fn [_] (rx/seq->o (range 5)))) + (rx/seq->o) + (rx/mapcat* (fn [& args] (rx/seq->o (apply f args)))) + (b/into [])))))) + +(deftest test-mapcat + (let [f (fn [v] [v (* v v)]) + xs (range 10)] + (is (= (mapcat f xs) + (b/into [] (rx/mapcat (comp rx/seq->o f) (rx/seq->o xs)))))) + + (let [f (fn [a b] [a b (* a b)]) + as (range 10) + bs (range 15)] + (is (= (mapcat f as bs) + (b/into [] (rx/mapcat (comp rx/seq->o f) + (rx/seq->o as) + (rx/seq->o bs))))))) + +(deftest test-next + (let [in [:q :r :s :t :u]] + (is (= (next in) (b/into [] (rx/next (rx/seq->o in))))))) + +(deftest test-nth + (is (= [:a] + (b/into [] (rx/nth (rx/seq->o [:s :b :a :c]) 2)))) + (is (= [:fallback] + (b/into [] (rx/nth (rx/seq->o [:s :b :a :c]) 25 :fallback))))) + +(deftest test-rest + (let [in [:q :r :s :t :u]] + (is (= (rest in) (b/into [] (rx/rest (rx/seq->o in))))))) + +(deftest test-partition-all + (are [input-size part-size step] (= (->> (range input-size) + (partition-all part-size step)) + (->> (range input-size) + (rx/seq->o) + (rx/partition-all part-size step) + (rx/map #(rx/into [] %)) + (rx/concat*) + (b/into []))) + 0 1 1 + 10 2 2 + 10 3 2 + 15 30 4) + + (are [input-size part-size] (= (->> (range input-size) + (partition-all part-size)) + (->> (range input-size) + (rx/seq->o) + (rx/partition-all part-size) + (rx/map #(rx/into [] %)) + (rx/concat*) + (b/into []))) + 0 1 + 10 2 + 10 3 + 15 30)) + +(deftest test-range + (are [start end step] (= (range start end step) + (->> (rx/range start end step) (b/into []))) + 0 10 2 + 0 -100 -1 + 5 100 9) + + (are [start end] (= (range start end) + (->> (rx/range start end) (b/into []))) + 0 10 + 0 -100 + 5 100) + + (are [start] (= (->> (range start) (take 100)) + (->> (rx/range start) (rx/take 100) (b/into []))) + 50 + 0 + 5 + -20) + (is (= (->> (range) (take 500)) + (->> (rx/range) (rx/take 500) (b/into []))))) + +(deftest test-reduce + (is (= (reduce + 0 (range 4)) + (b/first (rx/reduce + 0 (rx/seq->o (range 4))))))) + +(deftest test-reductions + (is (= (into [] (reductions + 0 (range 4))) + (b/into [] (rx/reductions + 0 (rx/seq->o (range 4))))))) + +(deftest test-some + (is (= [:r] (b/into [] (rx/some #{:r :s :t} (rx/seq->o [:q :v :r]))))) + (is (= [] (b/into [] (rx/some #{:r :s :t} (rx/seq->o [:q :v])))))) + +(deftest test-sort + (are [in cmp] (= (if cmp + (sort cmp in) + (sort in)) + (->> in + (rx/seq->o) + (#(if cmp (rx/sort cmp %) (rx/sort %))) + (b/into []))) + [] nil + [] (comp - compare) + [3 1 2] nil + [1 2 3] nil + [1 2 3] (comp - compare) + [2 1 3] (comp - compare))) + +(deftest test-sort-by + (are [rin cmp] (let [in (map #(hash-map :foo %) rin)] + (= (if cmp + (sort-by :foo cmp in) + (sort-by :foo in)) + (->> in + (rx/seq->o) + (#(if cmp (rx/sort-by :foo cmp %) (rx/sort-by :foo %))) + (b/into [])))) + [] nil + [] (comp - compare) + [3 1 2] nil + [1 2 3] nil + [1 2 3] (comp - compare) + [2 1 3] (comp - compare))) + + +(deftest test-split-with + (is (= (split-with (partial >= 3) (range 6)) + (->> (rx/seq->o (range 6)) + (rx/split-with (partial >= 3)) + b/first + (map (partial b/into [])))))) + +(deftest test-take-while + (is (= (into [] (take-while even? [2 4 6 8 1 2 3])) + (b/into [] (rx/take-while even? (rx/seq->o [2 4 6 8 1 2 3])))))) + +(deftest test-throw + (let [expected (IllegalArgumentException. "HI") + called (atom nil)] + (rx/subscribe (rx/throw expected) + (fn [_]) + (fn [e] (reset! called expected)) + (fn [_])) + (is (identical? expected @called)))) + +(deftest test-catch* + (testing "Is just a passthrough if there's no error" + (is (= [1 2 3] + (->> (rx/seq->o [1 2 3]) + (rx/catch* Exception (fn [e] (throw "OH NO"))) + (b/into []))))) + + (testing "Can catch a particular exception type and continue with an observable" + (is (= [1 2 4 5 6 "foo"] + (->> (rx/generator [o] + (rx/on-next o 1) + (rx/on-next o 2) + (rx/on-error o (IllegalStateException. "foo"))) + (rx/catch* IllegalStateException + (fn [e] + (rx/seq->o [4 5 6 (.getMessage e)]))) + (b/into []))))) + + (testing "if exception isn't matched, it's passed to on-error" + (let [expected (IllegalArgumentException. "HI") + called (atom nil)] + (rx/subscribe (->> (rx/generator [o] + (rx/on-next o 1) + (rx/on-next o 2) + (rx/on-error o expected)) + (rx/catch* IllegalStateException (fn [e] + (rx/return "WAT?")))) + (fn [_]) + (fn [e] (reset! called expected)) + (fn [_])) + (is (identical? expected @called)))) + + (testing "if p returns Throwable, that's passed as e" + (let [cause (IllegalArgumentException. "HI") + wrapper (java.util.concurrent.ExecutionException. cause)] + (is (= [cause] + (->> (rx/throw wrapper) + (rx/catch #(.getCause %) e + (rx/return e)) + (b/into []))))))) + + +(deftest test-finally + (testing "Supports a finally clause" + (testing "called on completed" + (let [completed (atom nil) + called (atom nil)] + (rx/subscribe (->> (rx/seq->o [1 2 3]) + (rx/finally* (fn [] (reset! called (str "got it"))))) + (fn [_]) + (fn [_] (throw (IllegalStateException. "WAT"))) + (fn [] (reset! completed "DONE"))) + (is (= "got it" @called)) + (is (= "DONE" @completed)))) + + (testing "called on error" + (let [expected (IllegalStateException. "expected") + completed (atom nil) + called (atom nil)] + (rx/subscribe (->> (rx/generator [o] + (rx/on-next o 1) + (rx/on-next o 2) + (rx/on-error o expected)) + (rx/finally + (reset! called "got it"))) + (fn [_]) + (fn [e] (reset! completed e)) + (fn [] (throw (IllegalStateException. "WAT")))) + (is (= "got it" @called)) + (is (identical? expected @completed)))))) + + +;################################################################################ + +(deftest test-graph-imports + (is (= 99 + (-> {:a {:deps [] :factory (fn [_] (rx/return 99))}} + rx/let-o* + :a + b/single))) + (is (= 100 + (b/single (rx/let-o [?a (rx/return 100)] + ?a))))) + +;################################################################################ + +(deftest test-realized-imports + (is (= {:a 1 :b 2} + (->> (rx/let-realized [a (rx/return 1) + b (rx/return 2)] + {:a a :b b}) + b/single)))) + + diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/future_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/future_test.clj new file mode 100644 index 0000000000..ba2344e4e2 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/future_test.clj @@ -0,0 +1,61 @@ +(ns rx.lang.clojure.future-test + (:require [rx.lang.clojure.core :as rx] + [rx.lang.clojure.blocking :as b] + [rx.lang.clojure.future :as f]) + (:require [clojure.test :refer [deftest testing is]])) + +(deftest test-future-generator + (is (not= [(.getId (Thread/currentThread))] + (b/into [] + (f/future-generator* future-call + #(rx/on-next % (.getId (Thread/currentThread)))))))) + +(deftest test-future + (is (= [15] (b/into [] (f/future* future-call + 1 2 3 4 5))))) + +(deftest test-future-exception + (is (= "Caught: boo" + (->> (f/future* future-call #(throw (java.io.FileNotFoundException. "boo"))) + (rx/catch java.io.FileNotFoundException e + (rx/return (str "Caught: " (.getMessage e)))) + (b/single))))) + +(deftest test-future-cancel + (let [exited? (atom nil) + o (f/future* future-call + (fn [] (Thread/sleep 1000) + (reset! exited? true) + "WAT")) + result (->> o + (rx/take 0) + (b/into []))] + (Thread/sleep 2000) + (is (= [nil []] + [@exited? result])))) + +(deftest test-future-generator-cancel + (let [exited? (atom nil) + o (f/future-generator* future-call + (fn [o] + (rx/on-next o "FIRST") + (Thread/sleep 1000) + (reset! exited? true))) + result (->> o + (rx/take 1) + (b/into []))] + (Thread/sleep 2000) + (is (= [nil ["FIRST"]] + [@exited? result])))) + +(deftest test-future-generator-exception + (let [e (java.io.FileNotFoundException. "snake")] + (is (= [1 2 e] + (->> (f/future-generator* + future-call + (fn [o] + (rx/on-next o 1) + (rx/on-next o 2) + (throw e))) + (rx/catch java.io.FileNotFoundException e + (rx/return e)) + (b/into [])))))) diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/graph_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/graph_test.clj new file mode 100644 index 0000000000..56ddfc9ff3 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/graph_test.clj @@ -0,0 +1,122 @@ +(ns rx.lang.clojure.graph-test + (:require [rx.lang.clojure.graph :as graph] + [rx.lang.clojure.core :as rx] + [rx.lang.clojure.future :as rx-future] + [rx.lang.clojure.blocking :as rx-blocking] + [clojure.test :refer [deftest testing is]])) + +(deftest test-let-o* + (testing "throws on cycle" + (is (thrown-with-msg? IllegalArgumentException #"Cycle found" + (graph/let-o* {:a {:deps [:a]}})))) + + (testing "throws on unknown" + (is (thrown-with-msg? IllegalArgumentException #"Unknown node" + (graph/let-o* {:a {:deps [:b]}})))) + + (testing "it works in a simple case" + (let [d {:a {:deps [] + :factory (fn [_] (rx/seq->o [1 2 3 4 5]))} + :b {:deps [:a] + :factory (fn [{:keys [a]}] (rx/map #(* % %) a)) } + :c {:deps [:a :b] + :factory (fn [{:keys [a b]}] (rx/map #(+ %1 %2) a b)) } + :d {:deps [:c :b] + :factory (fn [{:keys [c b]}] (rx/map #(+ %1 %2) c b)) } + } + f (graph/let-o* d) ] + (println f) + ; (n^2 + n) + n^2 + (is (= [3 10 21 36 55] + (rx-blocking/into [] (:d f))))))) + +(deftest test-let-o + (testing "it works" + (let [f (graph/let-o [?a (rx/seq->o [1 2 3]) + ?b (rx/seq->o [4 5 6])] + (rx/map + ?a ?b))] + (is (= [5 7 9] + (rx-blocking/into [] f))))) + + (testing "it still works" + (is (= {:a 99 :b 100 :z "hi"} + (rx-blocking/single + (-> (let [z (rx/return "hi")] ; an observable from "somewhere else" + (graph/let-o + [?a (rx-future/future* future-call #(do (Thread/sleep 50) 99)) + ?b (rx-future/future* future-call #(do (Thread/sleep 500) 100)) + ?c (rx/map #(hash-map :a %1 :b %2 :z %3) ?a ?b ?z) + ?z z] + (rx/reduce merge {} ?c))))))))) + +(deftest test-complicated-graph + ; These funcs model network requests for various stuff. They all return observable. + (let [request-vhs (fn [] + (rx-future/future-generator* + future-call + (fn [o] + (Thread/sleep 50) + (doseq [i (range 3)] + (rx/on-next o {:id i}))))) + request-user (fn [id] + (rx-future/future* + future-call + #(do (Thread/sleep (rand-int 250)) + {:id id + :name (str "friend" id) }))) + request-ab (fn [u] + (rx-future/future* + future-call + #(do (Thread/sleep (rand-int 250)) + {:user-id (:id u) + :cell (* 2 (:id u))}))) + + request-video-md (fn [v] + (rx/return {:video v + :title (str "title" (:id v)) })) + + ; Now we can stitch all these requests together into an rx graph to + ; produce a response. + o (graph/let-o [?user-info (rx-future/future* + future-call + #(do (Thread/sleep 20) + {:name "Bob" + :id 12345 + :friend-ids [1 2 3] })) + + ?friends (->> ?user-info + (rx/mapcat (fn [ui] + (rx/mapcat request-user + (rx/seq->o (:friend-ids ui)))))) + + ?ab (->> (rx/concat ?user-info ?friends) + (rx/mapcat request-ab)) + + ?ab-lookup (->> ?ab + (rx/map (juxt :user-id #(dissoc % :user-id))) + (rx/into {})) + + ?vhs (request-vhs) + + + ?metadata (->> ?vhs + (rx/mapcat request-video-md))] + (rx/map (fn [u m f ab-lookup] + {:user (dissoc u :friend-ids) + :videos m + :friends (sort-by :id f) + :ab ab-lookup}) + ?user-info + (rx/into [] ?metadata) + (rx/into [] ?friends) + ?ab-lookup))] + + (is (= {:user {:name "Bob" :id 12345} + :videos [{:video {:id 0} :title "title0"} + {:video {:id 1} :title "title1"} + {:video {:id 2} :title "title2"}] + :friends [{:name "friend1" :id 1}{:name "friend2" :id 2}{:name "friend3" :id 3}] + :ab {12345 {:cell 24690} 1 {:cell 2} 2 {:cell 4} 3 {:cell 6}} } + (rx-blocking/single o))))) + + diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/realized_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/realized_test.clj new file mode 100644 index 0000000000..3bf9b16872 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/realized_test.clj @@ -0,0 +1,130 @@ +(ns rx.lang.clojure.realized-test + (:require [rx.lang.clojure.realized :as r] + [rx.lang.clojure.core :as rx] + [rx.lang.clojure.future :as rx-future] + [rx.lang.clojure.blocking :as rx-blocking] + [clojure.test :refer [deftest testing is]])) + + + +(deftest test-realized-map + (testing "Turns map of observables into observable of map" + (let [o (r/realized-map :a (r/all (rx/seq->o [1 2 3])) + :a2 (rx/seq->o [99 100 101]) + :b (rx/return "hi") + :c [(->> [1 2 3] + rx/seq->o + (rx/map #(* % %))) + next] + :d (rx/return "just one") + :e "just a value") + result (rx-blocking/single o)] + (is (= {:a [1 2 3] + :a2 101 + :b "hi" + :c [4 9] + :d "just one" + :e "just a value" } + result))))) + +(deftest test-realized-map + (testing "works like realized-map, but takes a map instead of key/value pairs" + (is (= {:a [1 2] + :b 500 } + (->> {:a (r/all (rx/seq->o [1 2])) + :b 500 } + r/realized-map* + rx-blocking/single))))) + +(deftest test-let-realized + (is (= {:a* 2 + :b* 500 + :c* 1000 } + (->> (r/let-realized [a [(rx/seq->o [1 2]) last] + b 500 + c (rx/return 1000) ] + {:a* a + :b* b + :c* c }) + rx-blocking/single)))) + +(deftest test-only + (testing "raises IllegalStateException if sequence is empty" + (is (thrown-with-msg? IllegalStateException #"did not produce" + (->> (r/let-realized [a (rx/seq->o [1 2])] + {:a a}) + rx-blocking/single))) + ; Just to be sure, make sure it goes through onError. + (let [values (atom []) + errors (atom [])] + (rx/subscribe (r/let-realized [a (rx/seq->o [1 2])] + {:a a}) + #(swap! values conj %) + #(swap! errors conj %)) + (is (empty? @values)) + (is (= 1 (count @errors))) + (let [[e] @errors] + (is (instance? IllegalStateException e)))))) + +(deftest test-all + (testing "collects all values from an observable" + (is (= [1 2 3] + (->> (r/let-realized [a (r/all (rx/seq->o [1 2 3]))] + a) + rx-blocking/single))))) + +; Playing with some expressing some of the video stuff with this. +(comment + (->> (get-list-of-lists user-id) + (rx/mapcat (fn [list] + (->> (video-list->videos list) + (rx/take 10)))) + (rx/mapcat (fn [video] + (->> (r/let-realized [md (video->metadata video) + bm (video->bookmark video) + rt (video->rating video user-id)] + {:id (:id video) + :title (:title md) + :length (:duration md) + :bookmark bm + :rating {:actual (:actual-star-rating rt) + :average (:average-star-rating rt) + :predicted (:predicted-star-rating rt) } }))))) + + (->> (get-list-of-lists user-id) + (rx/mapcat (fn [list] + (->> (video-list->videos list) + (rx/take 10)))) + (rx/mapcat (fn [video] + (->> (r/realized-map :md (video->metadata video) + :bm (video->bookmark video) + :rt (video->rating video user-id)) + (rx/map (fn [{:keys [md bm rt]}] + {:id (:id video) + :title (:title md) + :length (:duration md) + :bookmark bm + :rating {:actual (:actual-star-rating rt) + :average (:average-star-rating rt) + :predicted (:predicted-star-rating rt) } })))))) + + (->> (get-list-of-lists user-id) + (rx/mapcat (fn [list] + (->> (video-list->videos list) + (rx/take 10)))) + (rx/mapcat (fn [video] + (->> (r/realized-map :id (:id video) + :md [(video->metadata video) + first + #(select-keys % [:title :duration])] + :bookmark (video->bookmark video) + :rating [(video->rating video user-id) + first + #(hash-map :actual (:actual-star-rating %) + :average (:average-star-rating %) + :predicted (:predicted-star-rating %))]) + (rx/map (fn [m] + (-> m + (merge (:md m)) + (dissoc :md))))))))) +