Skip to content

Commit

Permalink
Merge pull request #925 from daveray/rxjava-clojure-bindings-final
Browse files Browse the repository at this point in the history
Rxjava clojure bindings final
  • Loading branch information
benjchristensen committed Feb 25, 2014
2 parents 4a93b4a + d19bc42 commit 886caeb
Show file tree
Hide file tree
Showing 13 changed files with 2,767 additions and 32 deletions.
125 changes: 93 additions & 32 deletions language-adaptors/rxjava-clojure/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,96 @@
# Clojure Adaptor for RxJava
Clojure bindings for RxJava.

# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).

Example for Leiningen:

```clojure
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
```

and for Gradle:

```groovy
compile 'com.netflix.rxjava:rxjava-clojure:x.y.z'
```

and for Maven:

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-clojure</artifactId>
<version>x.y.z</version>
</dependency>
```

and for Ivy:

```xml
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
```

# Clojure Bindings
This library provides convenient, idiomatic Clojure bindings for RxJava.

The bindings try to present an API that will be comfortable and familiar to a Clojure programmer that's familiar with the sequence operations in `clojure.core`. It "fixes" several issues with using RxJava with raw Java interop, for example:

* Argument lists are in the "right" order. So in RxJava, the function applied in `Observable.map` is the second argument, while here it's the first argument with one or more Observables as trailing arguments
* Operators take normal Clojure functions as arguments, bypassing need for the interop described below
* Predicates accomodate Clojure's notion of truth
* Operators are generally names as they would be in `clojure.core` rather than the Rx names

There is no object wrapping going on. That is, all functions return normal `rx.Observable` objects, so you can always drop back to Java interop for anything that's missing in this wrapper.

## Basic Usage
Most functionality resides in the `rx.lang.clojure.core` namespace and for the most part looks like normal Clojure sequence manipulation:

```clojure
(require '[rx.lang.clojure.core :as rx])

(->> my-observable
(rx/map (comp clojure.string/lower-case :first-name))
(rx/map clojure.string/lower-case)
(rx/filter #{"bob"})
(rx/distinct)
(rx/into []))
;=> An Observable that emits a single vector of names
```

Blocking operators, which are useful for testing, but should otherwise be avoided, reside in `rx.lang.clojure.blocking`. For example:

```clojure
(require '[rx.lang.clojure.blocking :as rxb])

(rxb/doseq [{:keys [first-name]} users-observable]
(println "Hey," first-name))
;=> nil
```

## Open Issues

* The missing stuff mentioned below
* `group-by` val-fn variant isn't implemented in RxJava
* There are some functions for defining customer Observables and Operators (`subscriber`, `operator*`, `observable*`). I don't think these are really enough for serious operator implementation, but I'm hesitant to guess at an abstraction at this point. These will probably change dramatically.

## What's Missing
This library is an ongoing work in progress driven primarily by the needs of one team at Netflix. As such some things are currently missing:

* Highly-specific operators that we felt cluttered the API and were easily composed from existing operators, especially since we're in not-Java land. For example, `Observable.sumLong()`.
* Most everything involving schedulers
* Most everything involving time
* `Observable.window` and `Observable.buffer`. Who knows which parts of these beasts to wrap?

Of course, contributions that cover these cases are welcome.

# Low-level Interop
This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces.

# Basic Usage
## Basic Usage

## Requiring the interop namespace
### Requiring the interop namespace
The first thing to do is to require the namespace:

```clojure
Expand All @@ -19,7 +105,7 @@ or, at the REPL:
(require '[rx.lang.clojure.interop :as rx])
```

## Using rx/fn
### Using rx/fn
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:

```clojure
Expand All @@ -34,7 +120,7 @@ If you already have a plain old Clojure function you'd like to use, you can pass
(.reduce (rx/fn* +)))
```

## Using rx/action
### Using rx/action
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:

```clojure
Expand All @@ -46,7 +132,7 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
(rx/action [] (println "Sequence complete"))))
```

## Using Observable/create
### Using Observable/create
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:

```clojure
Expand All @@ -59,35 +145,10 @@ As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnS
(.onCompleted s)))
```

# Gotchas
## Gotchas
Here are a few things to keep in mind when using this interop:

* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts
* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose.
* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`.

# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).

Example for Maven:

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-clojure</artifactId>
<version>x.y.z</version>
</dependency>
```

and for Ivy:

```xml
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
```

and for Leiningen:

```clojure
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
(ns rx.lang.clojure.blocking
"Blocking operators and functions. These should never be used in
production code except at the end of an async chain to convert from
rx land back to sync land. For example, to produce a servlet response.
If you use these, you're a bad person.
"
(:refer-clojure :exclude [first into doseq last])
(:require [rx.lang.clojure.interop :as iop] [rx.lang.clojure.core :as rx])
(:import [rx Observable]
[rx.observables BlockingObservable]))

(def ^:private -ns- *ns*)
(set! *warn-on-reflection* true)

(defmacro ^:private with-ex-unwrap
"The blocking ops wrap errors stuff in RuntimeException because of stupid Java.
This tries to unwrap them so callers get the exceptions they expect."
[& body]
`(try
~@body
(catch RuntimeException e#
(throw (or
(and (identical? RuntimeException (class e#))
(.getCause e#))
e#)))))

(defn ^BlockingObservable ->blocking
"Convert an Observable to a BlockingObservable.
If o is already a BlockingObservable it's returned unchanged.
"
[o]
(if (instance? BlockingObservable o)
o
(.toBlockingObservable ^Observable o)))

(defn o->seq
"Returns a lazy sequence of the items emitted by o
See:
rx.observables.BlockingObservable/getIterator
rx.lang.clojure.core/seq->o
"
[o]
(-> (->blocking o)
(.getIterator)
(iterator-seq)))

(defn first
"*Blocks* and waits for the first value emitted by the given observable.
If the Observable is empty, returns nil
If an error is produced it is thrown.
See:
clojure.core/first
rx/first
rx.observables.BlockingObservable/first
"
[observable]
(with-ex-unwrap
(.firstOrDefault (->blocking observable) nil)))

(defn last
"*Blocks* and waits for the last value emitted by the given observable.
If the Observable is empty, returns nil
If an error is produced it is thrown.
See:
clojure.core/last
rx/last
rx.observable.BlockingObservable/last
"
[observable]
(with-ex-unwrap
(.lastOrDefault (->blocking observable) nil)))

(defn single
"*Blocks* and waits for the first value emitted by the given observable.
An error is thrown if zero or more then one value is produced.
"
[observable]
(with-ex-unwrap
(.single (->blocking observable))))

(defn into
"*Blocks* and pours the elements emitted by the given observables into
to.
If an error is produced it is thrown.
See:
clojure.core/into
rx/into
"
[to from-observable]
(with-ex-unwrap
(clojure.core/into to (o->seq from-observable))))

(defn doseq*
"*Blocks* and executes (f x) for each x emitted by xs
Returns nil.
See:
doseq
clojure.core/doseq
"
[xs f]
(with-ex-unwrap
(-> (->blocking xs)
(.forEach (rx.lang.clojure.interop/action* f)))))

(defmacro doseq
"Like clojure.core/doseq except iterates over an observable in a blocking manner.
Unlike clojure.core/doseq, only supports a single binding
Returns nil.
Example:
(rx-blocking/doseq [{:keys [name]} users-observable]
(println \"User:\" name))
See:
doseq*
clojure.core/doseq
"
[bindings & body]
(when (not= (count bindings) 2)
(throw (IllegalArgumentException. (str "sorry, rx/doseq only supports one binding"))))
(let [[k v] bindings]
`(doseq* ~v (fn [~k] ~@body))))

Loading

0 comments on commit 886caeb

Please sign in to comment.