Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

race condition on via - interruption flag may not be reset #53

Closed
leonoel opened this issue Dec 17, 2021 · 2 comments
Closed

race condition on via - interruption flag may not be reset #53

leonoel opened this issue Dec 17, 2021 · 2 comments
Labels
bug Something isn't working

Comments

@leonoel
Copy link
Owner

leonoel commented Dec 17, 2021

In Thunk.java : if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.

Not a problem with executors exposed by missionary because j.u.c.ThreadPoolExecutor resets the flag between successive tasks anyways, but could be problematic if via is used with another executor implementation.

@leonoel leonoel added the bug Something isn't working label Dec 17, 2021
@PEZ
Copy link
Contributor

PEZ commented Aug 27, 2023

Could it be the reason why this locks up for me?

(let [begin (System/currentTimeMillis)
        ;; create a flow of values generated by asynchronous tasks
        inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
        values (m/ap
                (let [flow (m/seed inputs)     ;; create a flow of tasks to execute
                      task (m/?> ##Inf flow)]  ;; from here, fork on every task in **parallel**
                  (m/? task)))                 ;; get a forked task value when available

        ;; drain the flow of values and count them
        n (m/? ;; tasks are executed, and flow is consume here!
           (m/reduce (fn [acc v]
                       (assert (= "hi" v))
                       (inc acc))
                     0 values))]
    (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.

@leonoel
Copy link
Owner Author

leonoel commented Apr 17, 2024

Race condition fixed in b.37
ap issue moved to #108

@leonoel leonoel closed this as completed Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants