Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

missionary.Cancelled not thrown #83

Closed
den1k opened this issue Mar 27, 2023 · 6 comments
Closed

missionary.Cancelled not thrown #83

den1k opened this issue Mar 27, 2023 · 6 comments

Comments

@den1k
Copy link

den1k commented Mar 27, 2023

this code properly throws Cancelled

(time
   (let [delays [{:id    1
                  :delay 1000}
                 {:id    2
                  :delay 2000}
                 {:id    1
                  :delay 500}
                 {:id    1
                  :delay 200}]]
     (m/?
       (->>
         (m/ap
           (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                 m (m/?< vs)]
             (try
               (m/? (m/sleep (:delay m) (assoc m :slept true)))
               (catch missionary.Cancelled c
                 (println :cancelled m)
                 (m/amb)))))
         (m/reduce
           (fn [out v]
             (println v)
             (conj out v))
           [])))))

this one does not

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/? 
                (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true))) ; <---- difference
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb))
              (catch Throwable t ; <----- added to catch exception
                (println :thrown)
                (m/amb))
              )))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))

How can cancellation be detected in the latter case?

@den1k
Copy link
Author

den1k commented Mar 27, 2023

Fixed!

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/!) ; <---- fixed
              (m/?
                (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true)))
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb)))))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))

@den1k den1k closed this as completed Mar 27, 2023
@den1k
Copy link
Author

den1k commented Mar 27, 2023

Reopening as Cancelled still does not throw when run downstream from m/observe

(declare put)
(def c
  (let [observe-flow (m/observe (fn [!]
                                  (defn put [x]
                                    (! x))
                                  #(do)))
        task         (->>
                       (m/ap
                         (let [[k vs] (m/?> ##Inf (m/group-by :id observe-flow))
                               m (m/?< vs)]
                           (try
                             (m/!)                          ; <---- fixed
                             (m/?
                               (m/via m/blk (Thread/sleep (:delay m)) (assoc m :slept true)))
                             (catch missionary.Cancelled c
                               (println :cancelled m)
                               (m/amb))
                             (catch Throwable t
                               (println :throwing-instead-of-cancelling)
                               (m/amb)
                               ))))
                       (m/reduce
                         (fn [out v]
                           (println v)
                           (conj out v))
                         []))
        cancel       (task (fn [s] (println :success s))
                           (fn [f] (println :fail f)))]
    cancel

    ))

(put {:id    1
      :delay 200})
=> nil
{:id 1, :delay 200, :slept true}

(do (put {:id    1
          :delay 200})
    (put {:id    1
          :delay 500}))
:throwing-instead-of-cancelling
=> nil
{:id 1, :delay 500, :slept true}

@den1k den1k reopened this Mar 27, 2023
@leonoel
Copy link
Owner

leonoel commented Mar 27, 2023

On cancellation, m/via interrupts the thread. The final result depends how the body reacts to thread interruption, in this case Thread/sleep will stop and throw InterruptedException. The right fix is to catch InterruptedException.
In the first example, m/! works by accident because the switch happens immediately, before the sleep even starts.

@den1k
Copy link
Author

den1k commented Mar 27, 2023

hmm, the issue is that various exceptions can be thrown as arbitrary sandboxed code is run by the user. Is there any other way to detect Cancellation by m/?<?

@dustingetz
Copy link
Collaborator

Hey can you switch to the slack for support please, this emails 22 people

@leonoel
Copy link
Owner

leonoel commented Mar 27, 2023

@den1k more details here #51

@den1k den1k closed this as completed Mar 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants