Skip to content

Commit

Permalink
[Distributed] Fix finalizer_ref usage of lock/wait resulting in faile…
Browse files Browse the repository at this point in the history
…d task switches (#41846)
  • Loading branch information
krynju authored Aug 16, 2021
1 parent cd43dda commit 3306a8b
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions stdlib/Distributed/src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ end

function finalize_ref(r::AbstractRemoteRef)
if r.where > 0 # Handle the case of the finalizer having been called manually
if islocked(client_refs)
# delay finalizer for later, when it's not already locked
if trylock(client_refs.lock) # trylock doesn't call wait which causes yields
try
delete!(client_refs.ht, r) # direct removal avoiding locks
if isa(r, RemoteChannel)
send_del_client_no_lock(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client_no_lock(r)
r.v = nothing
end
r.where = 0
finally
unlock(client_refs.lock)
end
else
finalizer(finalize_ref, r)
return nothing
end
delete!(client_refs, r)
if isa(r, RemoteChannel)
send_del_client(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client(r)
r.v = nothing
end
r.where = 0
end
nothing
end
Expand Down Expand Up @@ -229,13 +233,18 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
del_client(id, client) = del_client(PGRP, id, client)
function del_client(pg, id, client)
lock(client_refs) do
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
if isempty(rv.clientset)
delete!(pg.refs, id)
#print("$(myid()) collected $id\n")
end
_del_client(pg, id, client)
end
nothing
end

function _del_client(pg, id, client)
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
if isempty(rv.clientset)
delete!(pg.refs, id)
#print("$(myid()) collected $id\n")
end
end
nothing
Expand All @@ -259,13 +268,26 @@ function send_del_client(rr)
if rr.where == myid()
del_client(rr)
elseif id_in_procs(rr.where) # process only if a valid worker
w = worker_from_id(rr.where)::Worker
push!(w.del_msgs, (remoteref_id(rr), myid()))
w.gcflag = true
notify(any_gc_flag)
process_worker(rr)
end
end

function send_del_client_no_lock(rr)
# for gc context to avoid yields
if rr.where == myid()
_del_client(PGRP, remoteref_id(rr), myid())
elseif id_in_procs(rr.where) # process only if a valid worker
process_worker(rr)
end
end

function process_worker(rr)
w = worker_from_id(rr.where)::Worker
push!(w.del_msgs, (remoteref_id(rr), myid()))
w.gcflag = true
notify(any_gc_flag)
end

function add_client(id, client)
lock(client_refs) do
rv = lookup_ref(id)
Expand Down

0 comments on commit 3306a8b

Please sign in to comment.