From 79758bc2f3c6a77013ec47c6cd8ed7cc8c9ebd1b Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Thu, 29 Aug 2019 23:03:39 -0400 Subject: [PATCH 1/4] Adds `Threads.@spawncall`, which evaluates arguments immediately This acts more like a "parallel function call", where the arguments are evaluated immediately, in the current thread, and the function is then invoked in any available thread with the stored argument values. This prevents variables being "boxed" in order to capture them in the closure. --- base/threadingconstructs.jl | 38 +++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 9f0e4f8307418..72b24a5efe354 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -126,3 +126,41 @@ macro spawn(expr) task end end + +""" + @spawncall f(args...) + +Like [`Threads.@spawn`](@ref), creates a [`Task`](@ref) that will run the given function +call on any available thread. The arguments to the provided function, `args`, will be +evaluated immediately in the current task, like a normal function call. + +This is different from [`@spawn`](@ref), where `@spawn` runs an arbitrary provided +expression, wrapped in an empty closure, so that the entire expression is evaluated in the +new task. Instead, the argument to `@spawncall` must be a function call, and that function, +`f` is invoked with the values of the arguments evaluated in the current task. (This +prevents variables from being "boxed" to capture them in the closure.) + +# Examples: +- `@spawncall println(i)` +- `@spawncall peakflops()` + +!!! note + This feature is currently considered experimental. + +!!! compat "Julia 1.3" + This macro is available as of Julia 1.3. +""" +macro spawncall(expr) + @assert expr.head == :call "The argument to @spawncall must be a function call, e.g. " * + "`@spawncall f(x)``. Provided expr `$expr` is not a function call." + escf = esc(expr.args[1]) + args = expr.args[2:end] + + newargs = [gensym(string(a)) for a in args] + letargs = [:($a=$(esc(b))) for (a,b) in zip(newargs,args)] + quote + let $(letargs...) + @spawn $escf($(newargs...)) + end + end +end From 34219a1ef349aa29f09d58c67ad2a1ab87e3dc24 Mon Sep 17 00:00:00 2001 From: Nathan Date: Wed, 18 Dec 2019 10:05:03 +0530 Subject: [PATCH 2/4] Adds `Threads.@spawncall`, which evaluates arguments immediately This acts more like a "parallel function call", where the arguments are evaluated immediately, in the current thread, and the function is then invoked in any available thread with the stored argument values. This prevents variables being "boxed" in order to capture them in the closure. Fix at-spawncall to work w/ kwargs; add unit tests Rough implementation of `$`-interpolation in at-spawn --- base/expr.jl | 2 ++ base/threadingconstructs.jl | 71 +++++++++++++++---------------------- test/threads_exec.jl | 24 +++++++++++++ 3 files changed, 55 insertions(+), 42 deletions(-) diff --git a/base/expr.jl b/base/expr.jl index 45726a624d4bc..e05366bc9cd25 100644 --- a/base/expr.jl +++ b/base/expr.jl @@ -330,6 +330,8 @@ function is_short_function_def(ex) return false end + + function findmeta(ex::Expr) if ex.head === :function || is_short_function_def(ex) body::Expr = ex.args[2] diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 72b24a5efe354..3e7e970fd65d8 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -107,6 +107,8 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to wait and then obtain its return value. +(Values can be interpolated into `@spawn` via `$` to evaluate them in the current task.) + !!! note This feature is currently considered experimental. @@ -114,53 +116,38 @@ to wait and then obtain its return value. This macro is available as of Julia 1.3. """ macro spawn(expr) - thunk = esc(:(()->($expr))) - var = esc(Base.sync_varname) - quote - local task = Task($thunk) - task.sticky = false - if $(Expr(:islocal, var)) - push!($var, task) + # Capture interpolated variables in $() and move them to let-block + letargs = Any[] # store the new gensymed arguments + lift_one_interp!(v) = v + function lift_one_interp!(expr::Expr) + if expr.head == :quote # Don't try to lift $ out of quotes + return expr end - schedule(task) - task + if expr.head == :$ + newarg = gensym() + push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) + return newarg # Don't recurse into the $() exprs + end + for (i,e) in enumerate(expr.args) + expr.args[i] = lift_one_interp(e) + end + expr end -end - -""" - @spawncall f(args...) - -Like [`Threads.@spawn`](@ref), creates a [`Task`](@ref) that will run the given function -call on any available thread. The arguments to the provided function, `args`, will be -evaluated immediately in the current task, like a normal function call. - -This is different from [`@spawn`](@ref), where `@spawn` runs an arbitrary provided -expression, wrapped in an empty closure, so that the entire expression is evaluated in the -new task. Instead, the argument to `@spawncall` must be a function call, and that function, -`f` is invoked with the values of the arguments evaluated in the current task. (This -prevents variables from being "boxed" to capture them in the closure.) - -# Examples: -- `@spawncall println(i)` -- `@spawncall peakflops()` + lifted_expr = lift_one_interp!(expr) -!!! note - This feature is currently considered experimental. - -!!! compat "Julia 1.3" - This macro is available as of Julia 1.3. -""" -macro spawncall(expr) - @assert expr.head == :call "The argument to @spawncall must be a function call, e.g. " * - "`@spawncall f(x)``. Provided expr `$expr` is not a function call." - escf = esc(expr.args[1]) - args = expr.args[2:end] - - newargs = [gensym(string(a)) for a in args] - letargs = [:($a=$(esc(b))) for (a,b) in zip(newargs,args)] + thunk = esc(:(()->($lifted_expr))) + var = esc(Base.sync_varname) quote let $(letargs...) - @spawn $escf($(newargs...)) + local task = Task($thunk) + task.sticky = false + if $(Expr(:islocal, var)) + push!($var, task) + end + schedule(task) + task end end end + +end diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 22f9a64569d2c..c24b6d23cd0ef 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -703,3 +703,27 @@ catch ex @test ex isa LoadError @test ex.error isa ArgumentError end + +@testset "@spawncall" begin + # Issue #30896: evaluating argumentss immediately + begin + outs = zeros(5) + @sync begin + local i = 1 + while i <= 5 + Threads.@spawncall setindex!(outs, i, i) + i += 1 + end + end + @test outs == 1:5 + end + + # Args + @test fetch(Threads.@spawncall 2+2) == 4 + @test fetch(Threads.@spawncall Int(2.0)) == 2 + a = 2 + @test fetch(Threads.@spawncall *(a,a)) == a^2 + # kwargs + @test fetch(Threads.@spawncall sort([3 2; 1 0], dims=2)) == [2 3; 0 1] + @test fetch(Threads.@spawncall sort([3 2; 1 0]; dims=2)) == [2 3; 0 1] +end From b9ad1de77b2ac2a24fb7be4a9c6222beec753bf4 Mon Sep 17 00:00:00 2001 From: Nathan Date: Wed, 18 Dec 2019 10:13:12 +0530 Subject: [PATCH 3/4] Adds `Threads.@spawncall`, which evaluates arguments immediately This acts more like a "parallel function call", where the arguments are evaluated immediately, in the current thread, and the function is then invoked in any available thread with the stored argument values. This prevents variables being "boxed" in order to capture them in the closure. Fix at-spawncall to work w/ kwargs; add unit tests Rough implementation of `$`-interpolation in at-spawn Update spawncall tests to exercise the interpolation Fix my basic "lift $" function for $ inside :() Add string-interp test Escape the `$` reference in at-spawn docstring Oops, remove spurious "end" at end of file Factor out `$`-lifting; share b/w `@async` & `@spawn` --- base/task.jl | 42 ++++++++++++++++++++++++++++++++----- base/threadingconstructs.jl | 25 +++------------------- test/threads_exec.jl | 37 +++++++++++++++++++++++++------- 3 files changed, 70 insertions(+), 34 deletions(-) diff --git a/base/task.jl b/base/task.jl index 28878e2dcaa0d..780ca82327fed 100644 --- a/base/task.jl +++ b/base/task.jl @@ -346,18 +346,50 @@ end Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. """ macro async(expr) + letargs = Base._lift_one_interp!(expr) + thunk = esc(:(()->($expr))) var = esc(sync_varname) quote - local task = Task($thunk) - if $(Expr(:islocal, var)) - push!($var, task) + let $(letargs...) + local task = Task($thunk) + if $(Expr(:islocal, var)) + push!($var, task) + end + schedule(task) + task + end + end +end + +# Capture interpolated variables in $() and move them to let-block +function _lift_one_interp!(e) + letargs = Any[] # store the new gensymed arguments + _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false) + letargs +end +_lift_one_interp_helper(v, _, _) = v +function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs) + if expr.head == :$ + if in_quote_context # This $ is simply interpolating out of the quote + # Now, we're out of the quote, so any _further_ $ is ours. + in_quote_context = false + else + newarg = gensym() + push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) + return newarg # Don't recurse into the lifted $() exprs +>>>>>>> fb29992... Factor out `$`-lifting; share b/w `@async` & `@spawn` end - schedule(task) - task + elseif expr.head == :quote + in_quote_context = true # Don't try to lift $ directly out of quotes end + for (i,e) in enumerate(expr.args) + expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs) + end + expr end + # add a wait-able object to the sync pool macro sync_add(expr) var = esc(sync_varname) diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 3e7e970fd65d8..97a36a012e0a3 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -107,7 +107,7 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to wait and then obtain its return value. -(Values can be interpolated into `@spawn` via `$` to evaluate them in the current task.) +(Values can be interpolated into `@spawn` via `\$` to evaluate them in the current task.) !!! note This feature is currently considered experimental. @@ -116,26 +116,9 @@ to wait and then obtain its return value. This macro is available as of Julia 1.3. """ macro spawn(expr) - # Capture interpolated variables in $() and move them to let-block - letargs = Any[] # store the new gensymed arguments - lift_one_interp!(v) = v - function lift_one_interp!(expr::Expr) - if expr.head == :quote # Don't try to lift $ out of quotes - return expr - end - if expr.head == :$ - newarg = gensym() - push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) - return newarg # Don't recurse into the $() exprs - end - for (i,e) in enumerate(expr.args) - expr.args[i] = lift_one_interp(e) - end - expr - end - lifted_expr = lift_one_interp!(expr) + letargs = Base._lift_one_interp!(expr) - thunk = esc(:(()->($lifted_expr))) + thunk = esc(:(()->($expr))) var = esc(Base.sync_varname) quote let $(letargs...) @@ -149,5 +132,3 @@ macro spawn(expr) end end end - -end diff --git a/test/threads_exec.jl b/test/threads_exec.jl index c24b6d23cd0ef..8bf9af4c068a3 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -704,14 +704,14 @@ catch ex @test ex.error isa ArgumentError end -@testset "@spawncall" begin +@testset "@spawn interpolation" begin # Issue #30896: evaluating argumentss immediately begin outs = zeros(5) @sync begin local i = 1 while i <= 5 - Threads.@spawncall setindex!(outs, i, i) + Threads.@spawn setindex!(outs, $i, $i) i += 1 end end @@ -719,11 +719,34 @@ end end # Args - @test fetch(Threads.@spawncall 2+2) == 4 - @test fetch(Threads.@spawncall Int(2.0)) == 2 + @test fetch(Threads.@spawn 2+$2) == 4 + @test fetch(Threads.@spawn Int($(2.0))) == 2 a = 2 - @test fetch(Threads.@spawncall *(a,a)) == a^2 + @test fetch(Threads.@spawn *($a,$a)) == a^2 # kwargs - @test fetch(Threads.@spawncall sort([3 2; 1 0], dims=2)) == [2 3; 0 1] - @test fetch(Threads.@spawncall sort([3 2; 1 0]; dims=2)) == [2 3; 0 1] + @test fetch(Threads.@spawn sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1] + @test fetch(Threads.@spawn sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1] + + # Supports multiple levels of interpolation + @test fetch(Threads.@spawn :($a)) == a + @test fetch(Threads.@spawn :($($a))) == a + @test fetch(Threads.@spawn "$($a)") == "$a" + + # Test the difference between different levels of interpolation + let + oneinterp = Vector{Any}(undef, 5) + twointerps = Vector{Any}(undef, 5) + @sync begin + local i = 1 + while i <= 5 + Threads.@spawn setindex!(oneinterp, :($i), $i) + Threads.@spawn setindex!(twointerps, :($($i)), $i) + i += 1 + end + end + # The first definition _didn't_ escape i + @test oneinterp == fill(6, 5) + # The second definition _did_ escape i + @test twointerps == 1:5 + end end From f2752971ebfe2dd801b1960823193c07c073617e Mon Sep 17 00:00:00 2001 From: Nathan Date: Wed, 18 Dec 2019 10:14:26 +0530 Subject: [PATCH 4/4] Adds $-interpolation syntax to `@async` and `Threads.@spawn`, to evaluate arguments immediately. Add the ability to evaluate some parts of a `@spawn`/`@async` immediately, in the current thread context. This prevents variables being "boxed" in order to capture them in the closure, exactly the same as wrapping them in a let-block locally. For example, `$x` expands like this: ```julia julia> @macroexpand @async $x + 2 quote #= task.jl:361 =# let var"##454" = x #= task.jl:362 =# local var"#9#task" = Base.Task((()->begin #= task.jl:358 =# var"##454" + 2 end)) #= task.jl:363 =# if $(Expr(:islocal, Symbol("##sync#95"))) #= task.jl:364 =# Base.push!(var"##sync#95", var"#9#task") end #= task.jl:366 =# Base.schedule(var"#9#task") #= task.jl:367 =# var"#9#task" end end ``` --- NEWS.md | 2 ++ base/expr.jl | 2 -- base/task.jl | 8 +++++++- base/threadingconstructs.jl | 7 ++++++- test/threads_exec.jl | 40 +++++++++++++++++++++++++++++++++++-- 5 files changed, 53 insertions(+), 6 deletions(-) diff --git a/NEWS.md b/NEWS.md index 99f0e2a601b20..5a752d53c7ab0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -29,6 +29,8 @@ Language changes Multi-threading changes ----------------------- +* Values can now be interpolated into `@async` and `@spawn` via `$`, which copies the value directly into the constructed +underlying closure. ([#33119]) Build system changes -------------------- diff --git a/base/expr.jl b/base/expr.jl index e05366bc9cd25..45726a624d4bc 100644 --- a/base/expr.jl +++ b/base/expr.jl @@ -330,8 +330,6 @@ function is_short_function_def(ex) return false end - - function findmeta(ex::Expr) if ex.head === :function || is_short_function_def(ex) body::Expr = ex.args[2] diff --git a/base/task.jl b/base/task.jl index 780ca82327fed..f376b168f84a8 100644 --- a/base/task.jl +++ b/base/task.jl @@ -344,6 +344,13 @@ end @async Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. + +Values can be interpolated into `@async` via `\$`, which copies the value directly into the +constructed underlying closure. This allows you to insert the _value_ of a variable, +isolating the aysnchronous code from changes to the variable's value in the current task. + +!!! compat "Julia 1.4" + Interpolating values via `\$` is available as of Julia 1.4. """ macro async(expr) letargs = Base._lift_one_interp!(expr) @@ -378,7 +385,6 @@ function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs) newarg = gensym() push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) return newarg # Don't recurse into the lifted $() exprs ->>>>>>> fb29992... Factor out `$`-lifting; share b/w `@async` & `@spawn` end elseif expr.head == :quote in_quote_context = true # Don't try to lift $ directly out of quotes diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 97a36a012e0a3..8ca8ef0517362 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -107,13 +107,18 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to wait and then obtain its return value. -(Values can be interpolated into `@spawn` via `\$` to evaluate them in the current task.) +Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the +constructed underlying closure. This allows you to insert the _value_ of a variable, +isolating the aysnchronous code from changes to the variable's value in the current task. !!! note This feature is currently considered experimental. !!! compat "Julia 1.3" This macro is available as of Julia 1.3. + +!!! compat "Julia 1.4" + Interpolating values via `\$` is available as of Julia 1.4. """ macro spawn(expr) letargs = Base._lift_one_interp!(expr) diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 8bf9af4c068a3..d57667e3e0e8f 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -728,9 +728,13 @@ end @test fetch(Threads.@spawn sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1] # Supports multiple levels of interpolation - @test fetch(Threads.@spawn :($a)) == a - @test fetch(Threads.@spawn :($($a))) == a @test fetch(Threads.@spawn "$($a)") == "$a" + let a = 1 + # Interpolate the current value of `a` vs the value of `a` in the closure + t = Threads.@spawn :(+($$a, $a, a)) + a = 2 # update `a` after spawning + @test fetch(t) == Expr(:call, :+, 1, 2, :a) + end # Test the difference between different levels of interpolation let @@ -750,3 +754,35 @@ end @test twointerps == 1:5 end end + +@testset "@async interpolation" begin + # Args + @test fetch(@async 2+$2) == 4 + @test fetch(@async Int($(2.0))) == 2 + a = 2 + @test fetch(@async *($a,$a)) == a^2 + # kwargs + @test fetch(@async sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1] + @test fetch(@async sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1] + + # Supports multiple levels of interpolation + @test fetch(@async :($a)) == a + @test fetch(@async :($($a))) == a + @test fetch(@async "$($a)") == "$a" +end + +# errors inside @threads +function _atthreads_with_error(a, err) + Threads.@threads for i in eachindex(a) + if err + error("failed") + end + a[i] = Threads.threadid() + end + a +end +@test_throws TaskFailedException _atthreads_with_error(zeros(nthreads()), true) +let a = zeros(nthreads()) + _atthreads_with_error(a, false) + @test a == [1:nthreads();] +end