Skip to content

Commit

Permalink
make @sync lexically scoped and merge @schedule with @async
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed May 19, 2018
1 parent b5c0cb0 commit e70e825
Show file tree
Hide file tree
Showing 27 changed files with 167 additions and 179 deletions.
2 changes: 1 addition & 1 deletion base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing)
end

function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
t = @schedule begin
t = @async begin
retval = nothing

try
Expand Down
4 changes: 2 additions & 2 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ termination of the task will close all of the bound channels.
```jldoctest
julia> c = Channel(0);
julia> task = @schedule foreach(i->put!(c, i), 1:4);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
Expand All @@ -174,7 +174,7 @@ false
```jldoctest
julia> c = Channel(0);
julia> task = @schedule (put!(c,1);error("foo"));
julia> task = @async (put!(c,1);error("foo"));
julia> bind(c,task);
Expand Down
2 changes: 2 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,8 @@ end
@deprecate next(s::AbstractString, i::Integer) iterate(s, i)
@deprecate done(s::AbstractString, i::Integer) i > ncodeunits(s)

@eval @deprecate $(Symbol("@schedule")) $(Symbol("@async"))

# END 0.7 deprecations

# BEGIN 1.0 deprecations
Expand Down
13 changes: 0 additions & 13 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,6 @@ notify_error(c::Condition, err) = notify(c, err, true, true)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
@schedule
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks
started with an `@schedule`.
"""
macro schedule(expr)
thunk = esc(:(()->($expr)))
:(enq_work(Task($thunk)))
end

## scheduler and work queue

global const Workqueue = Task[]
Expand Down
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,6 @@ export
@allocated,

# tasks
@schedule,
@sync,
@async,
@task,
Expand Down
149 changes: 67 additions & 82 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,72 @@ function fetch(t::Task)
task_result(t)
end


## lexically-scoped waiting for multiple items

function sync_end(refs)
c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

const sync_varname = gensym(:sync)

"""
@sync
Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
var = esc(sync_varname)
quote
let $var = Any[]
v = $(esc(block))
sync_end($var)
v
end
end
end

# schedule an expression to run asynchronously

"""
@async
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
var = esc(sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
get_task_tls(task)[:SUPPRESS_EXCEPTION_PRINTING] = true
end
schedule(task)
end
end


suppress_excp_printing(t::Task) = isa(t.storage, IdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false

function register_taskdone_hook(t::Task, hook)
Expand Down Expand Up @@ -237,7 +303,7 @@ function task_done_hook(t::Task)
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(Base.error_color(), stderr) do io
@async with_output_color(Base.error_color(), stderr) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
Expand All @@ -263,87 +329,6 @@ function task_done_hook(t::Task)
end
end


## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns === ()
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])

c_ex = CompositeException()
for r in refs
try
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
end
end
end

if !isempty(c_ex)
throw(c_ex)
end
nothing
end

"""
@sync
Wait until all dynamically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@parallel`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if spawns !== ()
push!(spawns[1], r)
if isa(r, Task)
tls_r = get_task_tls(r)
tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
end
end
r
end

function async_run_thunk(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

"""
@async
Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local
machine's scheduler queue. Additionally it adds the task to the set of items that the
nearest enclosing `@sync` waits for.
"""
macro async(expr)
thunk = esc(:(()->($expr)))
:(async_run_thunk($thunk))
end


"""
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
Expand Down
1 change: 0 additions & 1 deletion doc/src/base/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Base.task_local_storage(::Function, ::Any, ::Any)
Base.Condition
Base.notify
Base.schedule
Base.@schedule
Base.@task
Base.sleep
Base.Channel
Expand Down
4 changes: 2 additions & 2 deletions doc/src/manual/control-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,8 @@ A task created explicitly by calling [`Task`](@ref) is initially not known to th
allows you to manage tasks manually using [`yieldto`](@ref) if you wish. However, when such
a task waits for an event, it still gets restarted automatically when the event happens, as you
would expect. It is also possible to make the scheduler run a task whenever it can, without necessarily
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@schedule`](@ref)
or [`@async`](@ref) macros (see [Parallel Computing](@ref) for more details).
waiting for any events. This is done by calling [`schedule`](@ref), or using the [`@async`](@ref)
macro (see [Parallel Computing](@ref) for more details).

### Task states

Expand Down
8 changes: 4 additions & 4 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and read end.

# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
@schedule foo()
@async foo()
end
```
* Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects
Expand Down Expand Up @@ -672,10 +672,10 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@schedule do_work()
@async do_work()
end
julia> @elapsed while n > 0 # print out results
Expand Down Expand Up @@ -780,7 +780,7 @@ julia> function make_jobs(n)
julia> n = 12;
julia> @schedule make_jobs(n); # feed the jobs channel with "n" jobs
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
Expand Down
5 changes: 2 additions & 3 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, acquire, release, invokelatest,
VERSION_STRING, binding_module, notify_error, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, coalesce, notnothing

using Serialization, Sockets
Expand Down
24 changes: 12 additions & 12 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
@schedule exec_conn_func(w)
@async exec_conn_func(w)
else
# route request via node 1
@schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
@async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
wait_for_conn(w)
end
Expand All @@ -144,7 +144,7 @@ function wait_for_conn(w)
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
@async (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
Expand Down Expand Up @@ -200,7 +200,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin))
else
sock = listen(interface, LPROC.bind_port)
end
@schedule while isopen(sock)
@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end
Expand Down Expand Up @@ -231,7 +231,7 @@ end


function redirect_worker_output(ident, stream)
@schedule while !eof(stream)
@async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -265,7 +265,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = @schedule readline(io)
readtask = @async readline(io)
yield()
while !istaskdone(readtask) && ((time() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -396,13 +396,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = @schedule launch(manager, params, launched, launch_ntfy)
t_launch = @async launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
@schedule (sleep(1); notify(launch_ntfy))
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
end

Expand Down Expand Up @@ -574,7 +574,7 @@ function create_worker(manager, wconfig)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@schedule manage(w.manager, w.id, w.config, :register)
@async manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
lock(client_refs) do
delete!(PGRP.refs, ntfy_oid)
Expand Down Expand Up @@ -621,7 +621,7 @@ function check_master_connect()
if ccall(:jl_running_on_valgrind,Cint,()) != 0
return
end
@schedule begin
@async begin
start = time()
while !haskey(map_pid_wrkr, 1) && (time() - start) < timeout
sleep(1.0)
Expand Down Expand Up @@ -844,13 +844,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = @schedule _rmprocs(pids, typemax(Int))
t = @async _rmprocs(pids, typemax(Int))
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return @schedule nothing
return @async nothing
end
end

Expand Down
Loading

0 comments on commit e70e825

Please sign in to comment.