Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds interpolation syntax to Threads.@spawn, to evaluate arguments immediately #33119

Merged
merged 4 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Language changes
Multi-threading changes
-----------------------

* Values can now be interpolated into `@async` and `@spawn` via `$`, which copies the value directly into the constructed
underlying closure. ([#33119])

Build system changes
--------------------
Expand Down
48 changes: 43 additions & 5 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,58 @@ end
@async

Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
NHDaly marked this conversation as resolved.
Show resolved Hide resolved

Values can be interpolated into `@async` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the aysnchronous code from changes to the variable's value in the current task.

!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
"""
macro async(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(sync_varname)
quote
local task = Task($thunk)
if $(Expr(:islocal, var))
push!($var, task)
let $(letargs...)
local task = Task($thunk)
if $(Expr(:islocal, var))
push!($var, task)
end
schedule(task)
task
end
end
end

# Capture interpolated variables in $() and move them to let-block
function _lift_one_interp!(e)
letargs = Any[] # store the new gensymed arguments
_lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
letargs
end
_lift_one_interp_helper(v, _, _) = v
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
if expr.head == :$
if in_quote_context # This $ is simply interpolating out of the quote
# Now, we're out of the quote, so any _further_ $ is ours.
in_quote_context = false
else
newarg = gensym()
push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
return newarg # Don't recurse into the lifted $() exprs
end
schedule(task)
task
elseif expr.head == :quote
in_quote_context = true # Don't try to lift $ directly out of quotes
end
for (i,e) in enumerate(expr.args)
expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
end
expr
end


# add a wait-able object to the sync pool
macro sync_add(expr)
var = esc(sync_varname)
Expand Down
23 changes: 17 additions & 6 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,33 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref)
to wait and then obtain its return value.

Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the aysnchronous code from changes to the variable's value in the current task.

!!! note
This feature is currently considered experimental.

!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.

!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
"""
macro spawn(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
task.sticky = false
if $(Expr(:islocal, var))
push!($var, task)
let $(letargs...)
local task = Task($thunk)
task.sticky = false
if $(Expr(:islocal, var))
push!($var, task)
end
schedule(task)
task
end
schedule(task)
task
end
end
83 changes: 83 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,86 @@ catch ex
@test ex isa LoadError
@test ex.error isa ArgumentError
end

@testset "@spawn interpolation" begin
# Issue #30896: evaluating argumentss immediately
begin
outs = zeros(5)
@sync begin
local i = 1
while i <= 5
Threads.@spawn setindex!(outs, $i, $i)
i += 1
end
end
@test outs == 1:5
end

# Args
@test fetch(Threads.@spawn 2+$2) == 4
@test fetch(Threads.@spawn Int($(2.0))) == 2
a = 2
@test fetch(Threads.@spawn *($a,$a)) == a^2
# kwargs
@test fetch(Threads.@spawn sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
@test fetch(Threads.@spawn sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]

# Supports multiple levels of interpolation
@test fetch(Threads.@spawn "$($a)") == "$a"
let a = 1
# Interpolate the current value of `a` vs the value of `a` in the closure
t = Threads.@spawn :(+($$a, $a, a))
a = 2 # update `a` after spawning
@test fetch(t) == Expr(:call, :+, 1, 2, :a)
end

# Test the difference between different levels of interpolation
let
oneinterp = Vector{Any}(undef, 5)
twointerps = Vector{Any}(undef, 5)
@sync begin
local i = 1
while i <= 5
Threads.@spawn setindex!(oneinterp, :($i), $i)
Threads.@spawn setindex!(twointerps, :($($i)), $i)
i += 1
end
end
# The first definition _didn't_ escape i
@test oneinterp == fill(6, 5)
# The second definition _did_ escape i
@test twointerps == 1:5
end
end

@testset "@async interpolation" begin
# Args
@test fetch(@async 2+$2) == 4
@test fetch(@async Int($(2.0))) == 2
a = 2
@test fetch(@async *($a,$a)) == a^2
# kwargs
@test fetch(@async sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
@test fetch(@async sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]

# Supports multiple levels of interpolation
@test fetch(@async :($a)) == a
@test fetch(@async :($($a))) == a
@test fetch(@async "$($a)") == "$a"
end

# errors inside @threads
function _atthreads_with_error(a, err)
Threads.@threads for i in eachindex(a)
if err
error("failed")
end
a[i] = Threads.threadid()
end
a
end
@test_throws TaskFailedException _atthreads_with_error(zeros(nthreads()), true)
let a = zeros(nthreads())
_atthreads_with_error(a, false)
@test a == [1:nthreads();]
end