diff --git a/java/missionary/Cancelled.java b/java/missionary/Cancelled.java new file mode 100644 index 0000000..a9f14cd --- /dev/null +++ b/java/missionary/Cancelled.java @@ -0,0 +1,11 @@ +package missionary; + +public class Cancelled extends Throwable { + public Cancelled() { + this(null); + } + + public Cancelled(String message) { + super(message, null, false, false); + } +} diff --git a/java/missionary/impl/Ambiguous.java b/java/missionary/impl/Ambiguous.java index 0b5f1ed..ffef542 100644 --- a/java/missionary/impl/Ambiguous.java +++ b/java/missionary/impl/Ambiguous.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -259,11 +260,9 @@ final class Gather implements Fiber { @Override public Object poll() { - if (choice == null ? token == null : - choice.token == null || (choice.type == Choice.SWITCH && choice.ready == null && !choice.done)) - throw new ExceptionInfo("Process cancelled.", RT.map( - Keyword.intern(null, "cancelled"), Keyword.intern("missionary", "ap"))); - return null; + return choice == null ? token == null : + choice.token == null || (choice.type == Choice.SWITCH && choice.ready == null && !choice.done) ? + clojure.lang.Util.sneakyThrow(new Cancelled("Process cancelled.")) : null; } @Override diff --git a/java/missionary/impl/Dataflow.java b/java/missionary/impl/Dataflow.java index 49a797e..36cb781 100644 --- a/java/missionary/impl/Dataflow.java +++ b/java/missionary/impl/Dataflow.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.Iterator; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -53,9 +54,7 @@ public void cancel(Event e) { IPersistentSet set = (IPersistentSet) s; if (!(set.contains(e))) break; if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) { - e.failure.invoke(new ExceptionInfo("Dataflow variable derefence cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "dfv-deref")))); + e.failure.invoke(new Cancelled("Dataflow variable derefence cancelled.")); break; } } diff --git a/java/missionary/impl/Enumerate.java b/java/missionary/impl/Enumerate.java index 2fee44e..1bcb447 100644 --- a/java/missionary/impl/Enumerate.java +++ b/java/missionary/impl/Enumerate.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.Iterator; @@ -38,9 +39,7 @@ public Object deref() { Iterator i = iterator; if (i == null) { terminator.invoke(); - throw new ExceptionInfo("Enumeration cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "enumerate"))); + clojure.lang.Util.sneakyThrow(new Cancelled("Seed cancelled.")); } Object x = i.next(); more(i); diff --git a/java/missionary/impl/Fiber.java b/java/missionary/impl/Fiber.java index 00df5a7..cb2568a 100644 --- a/java/missionary/impl/Fiber.java +++ b/java/missionary/impl/Fiber.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; @@ -10,9 +11,8 @@ public interface Fiber { Fiber THREAD = new Fiber() { @Override public Object poll() { - if (Thread.currentThread().isInterrupted()) - throw new ExceptionInfo("Thread interrupted.", PersistentArrayMap.EMPTY); - return null; + return Thread.currentThread().isInterrupted() ? + clojure.lang.Util.sneakyThrow(new Cancelled("Thread interrupted.")) : null; } @Override diff --git a/java/missionary/impl/GroupBy.java b/java/missionary/impl/GroupBy.java index 87270a1..335f827 100644 --- a/java/missionary/impl/GroupBy.java +++ b/java/missionary/impl/GroupBy.java @@ -1,13 +1,10 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; public interface GroupBy { - Throwable CANCELLED = new ExceptionInfo("Group consumer cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "group-by"))); - class Process extends AFn implements IDeref { static { @@ -136,7 +133,7 @@ static Object consume(Group g) { Process p = g.process; if (p == null) { g.terminator.invoke(); - return clojure.lang.Util.sneakyThrow(CANCELLED); + return clojure.lang.Util.sneakyThrow(new Cancelled("Group consumer cancelled.")); } else synchronized (p) { Object v = p.value; p.value = p; diff --git a/java/missionary/impl/Mailbox.java b/java/missionary/impl/Mailbox.java index 1484d05..11bf8a0 100644 --- a/java/missionary/impl/Mailbox.java +++ b/java/missionary/impl/Mailbox.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -61,9 +62,7 @@ public void cancel(Event e) { IPersistentSet set = (IPersistentSet) s; if (!(set.contains(e))) break; if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) { - e.failure.invoke(new ExceptionInfo("Mailbox fetch cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "mbx-fetch")))); + e.failure.invoke(new Cancelled("Mailbox fetch cancelled.")); break; } } diff --git a/java/missionary/impl/Never.java b/java/missionary/impl/Never.java index aedfcfe..ba88076 100644 --- a/java/missionary/impl/Never.java +++ b/java/missionary/impl/Never.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -18,9 +19,7 @@ public Never(IFn f) { public Object invoke() { IFn f = failure; if (f != null && FAILURE.compareAndSet(this, f, null)) - f.invoke(new ExceptionInfo("Never cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "never")))); + f.invoke(new Cancelled("Never cancelled.")); return null; } } diff --git a/java/missionary/impl/Rendezvous.java b/java/missionary/impl/Rendezvous.java index 4a85e49..9ac5b1d 100644 --- a/java/missionary/impl/Rendezvous.java +++ b/java/missionary/impl/Rendezvous.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -47,9 +48,7 @@ public void cancel(Event e) { IPersistentSet set = (IPersistentSet) s; if (!(set.contains(e))) break; if (STATE.compareAndSet(Rendezvous.this, s, set.count() == 1 ? null : set.disjoin(e))) { - e.failure.invoke(new ExceptionInfo("Rendez-vous give cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "rdv-give")))); + e.failure.invoke(new Cancelled("Rendez-vous give cancelled.")); break; } } @@ -89,9 +88,7 @@ public void cancel(Event e) { IPersistentSet set = (IPersistentSet) s; if (!(set.contains(e))) break; if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) { - e.failure.invoke(new ExceptionInfo("Rendez-vous take cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "rdv-take")))); + e.failure.invoke(new Cancelled("Rendez-vous take cancelled.")); break; } } diff --git a/java/missionary/impl/Semaphore.java b/java/missionary/impl/Semaphore.java index 5b9dd93..8e787b1 100644 --- a/java/missionary/impl/Semaphore.java +++ b/java/missionary/impl/Semaphore.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -59,9 +60,7 @@ public void cancel(Event e) { IPersistentSet set = (IPersistentSet) s; if (!(set.contains(e))) break; if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) { - e.failure.invoke(new ExceptionInfo("Semaphore acquire cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "sem-acquire")))); + e.failure.invoke(new Cancelled("Semaphore acquire cancelled.")); break; } } diff --git a/java/missionary/impl/Sequential.java b/java/missionary/impl/Sequential.java index db59fb0..cf3be97 100644 --- a/java/missionary/impl/Sequential.java +++ b/java/missionary/impl/Sequential.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -76,9 +77,7 @@ public Object invoke() { @Override public Object poll() { - if (token == null) throw new ExceptionInfo("Process cancelled.", RT.map( - Keyword.intern(null, "cancelled"), Keyword.intern("missionary", "sp"))); - return null; + return token == null ? clojure.lang.Util.sneakyThrow(new Cancelled("Process cancelled.")) : null; } @Override diff --git a/java/missionary/impl/Sleep.java b/java/missionary/impl/Sleep.java index b59e63b..13e66c8 100644 --- a/java/missionary/impl/Sleep.java +++ b/java/missionary/impl/Sleep.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -49,9 +50,7 @@ void cancel(Sleep s) { if (ss.equals(item)) break; else n = p.assoc(s.time, ss); } if (PENDING.compareAndSet(this, p, n)) { - s.failure.invoke(new ExceptionInfo("Sleep cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "sleep")))); + s.failure.invoke(new Cancelled("Sleep cancelled.")); break; } } diff --git a/java/missionary/impl/Sub.java b/java/missionary/impl/Sub.java index 75c137b..bc38de2 100644 --- a/java/missionary/impl/Sub.java +++ b/java/missionary/impl/Sub.java @@ -1,6 +1,7 @@ package missionary.impl; import clojure.lang.*; +import missionary.Cancelled; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -79,10 +80,7 @@ public Object deref() { Object x = current; if (x == null) { terminator.invoke(); - throw new ExceptionInfo("Subscription cancelled.", RT.map( - Keyword.intern(null, "cancelled"), - Keyword.intern("missionary", "subscribe") - )); + clojure.lang.Util.sneakyThrow(new Cancelled("Subscription cancelled.")); } else { current = null; notifier.invoke(); diff --git a/src/missionary/Cancelled.js b/src/missionary/Cancelled.js new file mode 100644 index 0000000..9dc53b9 --- /dev/null +++ b/src/missionary/Cancelled.js @@ -0,0 +1,16 @@ +goog.provide('missionary.Cancelled'); + +/** + * @param {string} message + * @constructor + */ +missionary.Cancelled = function(message) { + 'use strict'; + + /** + * The error message. + * @const {string} + */ + this.message = message; + +}; diff --git a/src/missionary/impl.cljs b/src/missionary/impl.cljs index ffc7c22..189e1e1 100644 --- a/src/missionary/impl.cljs +++ b/src/missionary/impl.cljs @@ -1,4 +1,5 @@ -(ns ^:no-doc missionary.impl) +(ns ^:no-doc missionary.impl + (:import missionary.Cancelled)) (defn nop []) @@ -41,8 +42,7 @@ #(when-not bound (when (contains? watch !) (set! watch (disj! watch !)) - (f! (ex-info "Dataflow variable dereference cancelled." - {:cancelled :missionary/dfv-deref})))))))) + (f! (Cancelled. "Dataflow variable dereference cancelled.")))))))) (defn dataflow [] (->Dataflow false nil (transient #{}))) @@ -64,8 +64,7 @@ (set! writers (assoc writers ! t)) #(when (contains? writers !) (set! writers (dissoc writers !)) - (f! (ex-info "Rendez-vous give cancelled." - {:cancelled :missionary/rdv-give}))))))) + (f! (Cancelled. "Rendez-vous give cancelled."))))))) (-invoke [_ s! f!] (if-some [[[! t]] (seq writers)] (do (set! writers (dissoc writers !)) @@ -74,8 +73,7 @@ (set! readers (conj readers !)) #(when (contains? readers !) (set! readers (disj readers !)) - (f! (ex-info "Rendez-vous take cancelled." - {:cancelled :missionary/rdv-take}))))))) + (f! (Cancelled. "Rendez-vous take cancelled."))))))) (defn rendezvous [] (->Rendezvous #{} {})) @@ -100,7 +98,7 @@ (set! readers (conj readers !)) #(when (contains? readers !) (set! readers (disj readers !)) - (f! (ex-info "Mailbox fetch cancelled." {:cancelled :missionary/mbx-fetch})))) + (f! (Cancelled. "Mailbox fetch cancelled.")))) (let [tmp enqueue] (set! enqueue dequeue) (set! dequeue (.reverse tmp)) @@ -128,7 +126,7 @@ (set! readers (conj readers !)) #(when (contains? readers !) (set! readers (disj readers !)) - (f! (ex-info "Semaphore acquire cancelled." {:cancelled :missionary/sem-acquire})))) + (f! (Cancelled. "Semaphore acquire cancelled.")))) (do (set! available (dec available)) (s! nil) nop)))) @@ -200,7 +198,7 @@ (when (.-pending s) (set! (.-pending s) false) (js/clearTimeout (.-handler s)) - ((.-failure s) (ex-info "Sleep cancelled." {:cancelled :missionary/sleep})))) + ((.-failure s) (Cancelled. "Sleep cancelled.")))) (defn sleep [d x s f] (let [slp (->Sleep f nil true)] @@ -225,7 +223,7 @@ (-invoke [_] (when alive (set! alive false) - (f (ex-info "Never cancelled." {:cancelled :missionary/never}))))) + (f (Cancelled. "Never cancelled."))))) (defn never [f] (->Never f true)) @@ -260,7 +258,7 @@ Fiber (poll [_] (when (nil? token) - (throw (ex-info "Process cancelled." {:cancelled :missionary/sp})))) + (throw (Cancelled. "Process cancelled.")))) (task [_ t] (let [c (t resume rethrow)] (if (nil? token) @@ -329,7 +327,7 @@ (nil? (.-ready c)) (not (.-done c)))) (nil? token)) - (throw (ex-info "Process cancelled." {:cancelled :missionary/ap})))) + (throw (Cancelled. "Process cancelled.")))) (task [_ t] (ap-swap _ (t resume rethrow)) nop) (flow-concat [_ f] @@ -527,8 +525,7 @@ (let [x (.next i)] (enumerate-pull e) x) (do ((.-terminator e)) - (throw (ex-info "Enumeration cancelled" - {:cancelled :missionary/enumerate}))))) + (throw (Cancelled. "Seed cancelled"))))) (defn enumerate [coll n t] (doto (->Enumerate (iter coll) n t) (enumerate-pull))) diff --git a/src/missionary/impl/GroupBy.cljs b/src/missionary/impl/GroupBy.cljs index 810c182..c26e920 100644 --- a/src/missionary/impl/GroupBy.cljs +++ b/src/missionary/impl/GroupBy.cljs @@ -1,6 +1,5 @@ -(ns ^:no-doc missionary.impl.GroupBy) - -(def cancelled (ex-info "Group consumer cancelled." {:cancelled :missionary/group-by})) +(ns ^:no-doc missionary.impl.GroupBy + (:import missionary.Cancelled)) (declare sample cancel consume) @@ -120,7 +119,7 @@ (set! (.-key p) p) (transfer p) x) (do ((.-terminator g)) - (throw cancelled)))) + (throw (Cancelled. "Group consumer cancelled."))))) (defn run [k f n t] (let [p (->Process k n t nil nil nil (object-array 8) 0 true false)] diff --git a/test/missionary/core_test.cljc b/test/missionary/core_test.cljc index 560a313..d21e843 100644 --- a/test/missionary/core_test.cljc +++ b/test/missionary/core_test.cljc @@ -1,7 +1,8 @@ (ns missionary.core-test (:require [missionary.core :as m :include-macros true] - [missionary.tck :refer [if-try deftask defflow] :include-macros true])) + [missionary.tck :refer [if-try deftask defflow] :include-macros true]) + (:import missionary.Cancelled)) (def =? (partial partial =)) (def fine! #(throw (ex-info "this is fine." {:fine true}))) @@ -18,7 +19,7 @@ https://stackoverflow.com/questions/12925988/how-to-generate-strings-that-share- (deftask sleep-failure {:cancel 0 :timeout 10 - :failure (comp :cancelled ex-data)} + :failure (partial instance? Cancelled)} (m/sleep 100)) (deftask semaphore @@ -36,7 +37,7 @@ https://stackoverflow.com/questions/12925988/how-to-generate-strings-that-share- (deftask rendezvous {:cancel 150 :timeout 200 - :failure (comp :cancelled ex-data)} + :failure (partial instance? Cancelled)} (m/sp (let [rdv (m/rdv)] (m/? (->> (m/sp (while true @@ -51,7 +52,7 @@ https://stackoverflow.com/questions/12925988/how-to-generate-strings-that-share- (deftask mailbox {:cancel 150 :timeout 200 - :failure (comp :cancelled ex-data)} + :failure (partial instance? Cancelled)} (m/sp (let [mbx (m/mbx)] (m/? (->> (m/sp (while true