Skip to content

Commit

Permalink
FIX: special type for cancellation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
leonoel committed Dec 23, 2021
1 parent f40d140 commit 0793060
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 67 deletions.
11 changes: 11 additions & 0 deletions java/missionary/Cancelled.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package missionary;

public class Cancelled extends Throwable {
public Cancelled() {
this(null);
}

public Cancelled(String message) {
super(message, null, false, false);
}
}
9 changes: 4 additions & 5 deletions java/missionary/impl/Ambiguous.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -259,11 +260,9 @@ final class Gather implements Fiber {

@Override
public Object poll() {
if (choice == null ? token == null :
choice.token == null || (choice.type == Choice.SWITCH && choice.ready == null && !choice.done))
throw new ExceptionInfo("Process cancelled.", RT.map(
Keyword.intern(null, "cancelled"), Keyword.intern("missionary", "ap")));
return null;
return choice == null ? token == null :
choice.token == null || (choice.type == Choice.SWITCH && choice.ready == null && !choice.done) ?
clojure.lang.Util.sneakyThrow(new Cancelled("Process cancelled.")) : null;
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Dataflow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -53,9 +54,7 @@ public void cancel(Event e) {
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new ExceptionInfo("Dataflow variable derefence cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "dfv-deref"))));
e.failure.invoke(new Cancelled("Dataflow variable derefence cancelled."));
break;
}
}
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Enumerate.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.Iterator;

Expand Down Expand Up @@ -38,9 +39,7 @@ public Object deref() {
Iterator i = iterator;
if (i == null) {
terminator.invoke();
throw new ExceptionInfo("Enumeration cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "enumerate")));
clojure.lang.Util.sneakyThrow(new Cancelled("Seed cancelled."));
}
Object x = i.next();
more(i);
Expand Down
6 changes: 3 additions & 3 deletions java/missionary/impl/Fiber.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;
Expand All @@ -10,9 +11,8 @@ public interface Fiber {
Fiber THREAD = new Fiber() {
@Override
public Object poll() {
if (Thread.currentThread().isInterrupted())
throw new ExceptionInfo("Thread interrupted.", PersistentArrayMap.EMPTY);
return null;
return Thread.currentThread().isInterrupted() ?
clojure.lang.Util.sneakyThrow(new Cancelled("Thread interrupted.")) : null;
}

@Override
Expand Down
7 changes: 2 additions & 5 deletions java/missionary/impl/GroupBy.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

public interface GroupBy {

Throwable CANCELLED = new ExceptionInfo("Group consumer cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "group-by")));

class Process extends AFn implements IDeref {

static {
Expand Down Expand Up @@ -136,7 +133,7 @@ static Object consume(Group g) {
Process p = g.process;
if (p == null) {
g.terminator.invoke();
return clojure.lang.Util.sneakyThrow(CANCELLED);
return clojure.lang.Util.sneakyThrow(new Cancelled("Group consumer cancelled."));
} else synchronized (p) {
Object v = p.value;
p.value = p;
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Mailbox.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -61,9 +62,7 @@ public void cancel(Event e) {
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new ExceptionInfo("Mailbox fetch cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "mbx-fetch"))));
e.failure.invoke(new Cancelled("Mailbox fetch cancelled."));
break;
}
}
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Never.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand All @@ -18,9 +19,7 @@ public Never(IFn f) {
public Object invoke() {
IFn f = failure;
if (f != null && FAILURE.compareAndSet(this, f, null))
f.invoke(new ExceptionInfo("Never cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "never"))));
f.invoke(new Cancelled("Never cancelled."));
return null;
}
}
9 changes: 3 additions & 6 deletions java/missionary/impl/Rendezvous.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -47,9 +48,7 @@ public void cancel(Event e) {
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(Rendezvous.this, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new ExceptionInfo("Rendez-vous give cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "rdv-give"))));
e.failure.invoke(new Cancelled("Rendez-vous give cancelled."));
break;
}
}
Expand Down Expand Up @@ -89,9 +88,7 @@ public void cancel(Event e) {
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new ExceptionInfo("Rendez-vous take cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "rdv-take"))));
e.failure.invoke(new Cancelled("Rendez-vous take cancelled."));
break;
}
}
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Semaphore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -59,9 +60,7 @@ public void cancel(Event e) {
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(this, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new ExceptionInfo("Semaphore acquire cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "sem-acquire"))));
e.failure.invoke(new Cancelled("Semaphore acquire cancelled."));
break;
}
}
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Sequential.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -76,9 +77,7 @@ public Object invoke() {

@Override
public Object poll() {
if (token == null) throw new ExceptionInfo("Process cancelled.", RT.map(
Keyword.intern(null, "cancelled"), Keyword.intern("missionary", "sp")));
return null;
return token == null ? clojure.lang.Util.sneakyThrow(new Cancelled("Process cancelled.")) : null;
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions java/missionary/impl/Sleep.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -49,9 +50,7 @@ void cancel(Sleep s) {
if (ss.equals(item)) break; else n = p.assoc(s.time, ss);
}
if (PENDING.compareAndSet(this, p, n)) {
s.failure.invoke(new ExceptionInfo("Sleep cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "sleep"))));
s.failure.invoke(new Cancelled("Sleep cancelled."));
break;
}
}
Expand Down
6 changes: 2 additions & 4 deletions java/missionary/impl/Sub.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package missionary.impl;

import clojure.lang.*;
import missionary.Cancelled;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -79,10 +80,7 @@ public Object deref() {
Object x = current;
if (x == null) {
terminator.invoke();
throw new ExceptionInfo("Subscription cancelled.", RT.map(
Keyword.intern(null, "cancelled"),
Keyword.intern("missionary", "subscribe")
));
clojure.lang.Util.sneakyThrow(new Cancelled("Subscription cancelled."));
} else {
current = null;
notifier.invoke();
Expand Down
16 changes: 16 additions & 0 deletions src/missionary/Cancelled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
goog.provide('missionary.Cancelled');

/**
* @param {string} message
* @constructor
*/
missionary.Cancelled = function(message) {
'use strict';

/**
* The error message.
* @const {string}
*/
this.message = message;

};
27 changes: 12 additions & 15 deletions src/missionary/impl.cljs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
(ns ^:no-doc missionary.impl)
(ns ^:no-doc missionary.impl
(:import missionary.Cancelled))

(defn nop [])

Expand Down Expand Up @@ -41,8 +42,7 @@
#(when-not bound
(when (contains? watch !)
(set! watch (disj! watch !))
(f! (ex-info "Dataflow variable dereference cancelled."
{:cancelled :missionary/dfv-deref}))))))))
(f! (Cancelled. "Dataflow variable dereference cancelled."))))))))

(defn dataflow []
(->Dataflow false nil (transient #{})))
Expand All @@ -64,8 +64,7 @@
(set! writers (assoc writers ! t))
#(when (contains? writers !)
(set! writers (dissoc writers !))
(f! (ex-info "Rendez-vous give cancelled."
{:cancelled :missionary/rdv-give})))))))
(f! (Cancelled. "Rendez-vous give cancelled.")))))))
(-invoke [_ s! f!]
(if-some [[[! t]] (seq writers)]
(do (set! writers (dissoc writers !))
Expand All @@ -74,8 +73,7 @@
(set! readers (conj readers !))
#(when (contains? readers !)
(set! readers (disj readers !))
(f! (ex-info "Rendez-vous take cancelled."
{:cancelled :missionary/rdv-take})))))))
(f! (Cancelled. "Rendez-vous take cancelled.")))))))

(defn rendezvous []
(->Rendezvous #{} {}))
Expand All @@ -100,7 +98,7 @@
(set! readers (conj readers !))
#(when (contains? readers !)
(set! readers (disj readers !))
(f! (ex-info "Mailbox fetch cancelled." {:cancelled :missionary/mbx-fetch}))))
(f! (Cancelled. "Mailbox fetch cancelled."))))
(let [tmp enqueue]
(set! enqueue dequeue)
(set! dequeue (.reverse tmp))
Expand Down Expand Up @@ -128,7 +126,7 @@
(set! readers (conj readers !))
#(when (contains? readers !)
(set! readers (disj readers !))
(f! (ex-info "Semaphore acquire cancelled." {:cancelled :missionary/sem-acquire}))))
(f! (Cancelled. "Semaphore acquire cancelled."))))
(do (set! available (dec available))
(s! nil) nop))))

Expand Down Expand Up @@ -200,7 +198,7 @@
(when (.-pending s)
(set! (.-pending s) false)
(js/clearTimeout (.-handler s))
((.-failure s) (ex-info "Sleep cancelled." {:cancelled :missionary/sleep}))))
((.-failure s) (Cancelled. "Sleep cancelled."))))

(defn sleep [d x s f]
(let [slp (->Sleep f nil true)]
Expand All @@ -225,7 +223,7 @@
(-invoke [_]
(when alive
(set! alive false)
(f (ex-info "Never cancelled." {:cancelled :missionary/never})))))
(f (Cancelled. "Never cancelled.")))))

(defn never [f] (->Never f true))

Expand Down Expand Up @@ -260,7 +258,7 @@
Fiber
(poll [_]
(when (nil? token)
(throw (ex-info "Process cancelled." {:cancelled :missionary/sp}))))
(throw (Cancelled. "Process cancelled."))))
(task [_ t]
(let [c (t resume rethrow)]
(if (nil? token)
Expand Down Expand Up @@ -329,7 +327,7 @@
(nil? (.-ready c))
(not (.-done c))))
(nil? token))
(throw (ex-info "Process cancelled." {:cancelled :missionary/ap}))))
(throw (Cancelled. "Process cancelled."))))
(task [_ t]
(ap-swap _ (t resume rethrow)) nop)
(flow-concat [_ f]
Expand Down Expand Up @@ -527,8 +525,7 @@
(let [x (.next i)]
(enumerate-pull e) x)
(do ((.-terminator e))
(throw (ex-info "Enumeration cancelled"
{:cancelled :missionary/enumerate})))))
(throw (Cancelled. "Seed cancelled")))))

(defn enumerate [coll n t]
(doto (->Enumerate (iter coll) n t) (enumerate-pull)))
Expand Down
7 changes: 3 additions & 4 deletions src/missionary/impl/GroupBy.cljs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ^:no-doc missionary.impl.GroupBy)

(def cancelled (ex-info "Group consumer cancelled." {:cancelled :missionary/group-by}))
(ns ^:no-doc missionary.impl.GroupBy
(:import missionary.Cancelled))

(declare sample cancel consume)

Expand Down Expand Up @@ -120,7 +119,7 @@
(set! (.-key p) p)
(transfer p) x)
(do ((.-terminator g))
(throw cancelled))))
(throw (Cancelled. "Group consumer cancelled.")))))

(defn run [k f n t]
(let [p (->Process k n t nil nil nil (object-array 8) 0 true false)]
Expand Down
Loading

0 comments on commit 0793060

Please sign in to comment.