From 5ddf4ba25f0bd5a66d4d1b8abcf4f723e0c20755 Mon Sep 17 00:00:00 2001 From: Dave Ray Date: Mon, 11 Aug 2014 23:00:06 -0700 Subject: [PATCH] Fix for mapcat, issue #1556 Corrected mapcat to actually concat its results rather than merging. Added warning to docstring thta this might not be ideal. Added flatmap to cover map+merge case. --- .../src/main/clojure/rx/lang/clojure/core.clj | 33 ++++++++++++++- .../clojure/rx/lang/clojure/core_test.clj | 40 ++++++++++++++++++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj index 9af3891494..a5b9f440b8 100644 --- a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj @@ -2,7 +2,7 @@ (:refer-clojure :exclude [concat cons count cycle distinct do drop drop-while empty every? - filter first future + filter first future flatmap group-by interleave interpose into iterate keep keep-indexed @@ -616,17 +616,46 @@ return an Observable. The resulting observables are concatentated together into one observable. + WARNING: This operator, like clojure.core/mapcat, preserves ordering of the + generated Observables. In an asynchronous context, this may cause unintended + blocking. Try flatmap instead. + If multiple Observables are given, the arguments to f are the first item from each observable, then the second item, etc. See: clojure.core/mapcat - rx.Observable/flatMap + flatmap + rx.Observable/concatMap " [f & xs] (if (clojure.core/next xs) (mapcat* f (seq->o xs)) ; use built-in flatMap for single-arg case + (.concatMap ^Observable (clojure.core/first xs) (iop/fn* f)))) + +(defn ^Observable flatmap* + "Same as multi-arg flatmap, but input is an Observable of Observables. + + See: + flatmap + " + [f ^Observable xs] + (->> xs + (map* f) + (merge*))) + +(defn ^Observable flatmap + "Like mapcat, but the Observables produced by f are merged rather than concatenated. + This behavior is preferable in asynchronous contexts where order is not important. + + See: + mapcat + rx.Observable/flatMap + " + [f & xs] + (if (clojure.core/next xs) + (flatmap* f (seq->o xs)) (.flatMap ^Observable (clojure.core/first xs) (iop/fn* f)))) (defn map-indexed diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj index a4d842e32c..9c88667323 100644 --- a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj @@ -277,8 +277,8 @@ (let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]] (testing "with just a key-fn" (is (= [[:a {:k :a :v 1}] - [:b {:k :b :v 2}] [:a {:k :a :v 3}] + [:b {:k :b :v 2}] [:c {:k :c :v 4}]] (->> xs (rx/seq->o) @@ -452,6 +452,44 @@ (rx/seq->o as) (rx/seq->o bs))))))) +(deftest test-flatmap + (let [f (fn [v] [v (* v v)]) + xs (range 10)] + (is (= (mapcat f xs) + (b/into [] (rx/flatmap (comp rx/seq->o f) (rx/seq->o xs)))))) + + ; group-by is a good way to test merge behavior without truly async code + ; here the :a and :b observables are interleaved when merged + (let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]] + (is (= [[:a {:k :a :v 1}] + [:b {:k :b :v 2}] + [:a {:k :a :v 3}] + [:c {:k :c :v 4}]] + (->> xs + (rx/seq->o) + (rx/group-by :k) + (rx/flatmap (fn [[k vo :as me]] + (is (instance? clojure.lang.MapEntry me)) + (rx/map #(vector k %) vo))) + (b/into []))))) + + ; still looking for a simple demo of merging for the multi-arg case + ; Here, because ys is "inline", the interleaving is removed. sigh. + (let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}] + ys [:ay :by :cy]] + (is (= [[:a {:k :a :v 1} :ay] + [:a {:k :a :v 3} :ay] + [:b {:k :b :v 2} :by] + [:c {:k :c :v 4} :cy]] + (->> (rx/flatmap (fn [[k vo :as me] y] + (is (instance? clojure.lang.MapEntry me)) + (rx/map #(vector k % y) vo)) + (->> xs + rx/seq->o + (rx/group-by :k)) + (rx/seq->o ys)) + (b/into [])))))) + (deftest test-next (let [in [:q :r :s :t :u]] (is (= (next in) (b/into [] (rx/next (rx/seq->o in)))))))