Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Distributed] Worker local race condition between put! and fetch for Futures #42339

Merged
merged 29 commits into from
Dec 3, 2021
Merged

[Distributed] Worker local race condition between put! and fetch for Futures #42339

merged 29 commits into from
Dec 3, 2021

Conversation

krynju
Copy link
Contributor

@krynju krynju commented Sep 22, 2021

This fixes the issues observed here JuliaParallel/Dagger.jl#282 for a one-worker multi threaded environment.

It fixes that, but I'm afraid it might break multi-worker behavior, so this is still kind of WIP if that's the case

  • I think serialization can be affected as the local_lock was added to the Future
  • Multi-worker seems to work ok in Dagger, but I'm not 100% confident there's no situation where this locks up for some unknown reason and I'm not sure yet how to confirm that

With these changes applied the worker-local fetches wait on the local_lock instead of the channel associated with the Future. The put! still uses the channel in case there are any remote references to the Future and when it's done caching the result it notifies the local_lock, so that the local fetches can get the cached value properly.

Let me know what you think

@krynju
Copy link
Contributor Author

krynju commented Sep 22, 2021

It's breaking Shared Arrays as it gets stuck there during the CI runs

@vchuravy
Copy link
Member

Is there an MWE without Dagger? And can you put the stacktrace or a short sequence of events you are seeing?

@krynju
Copy link
Contributor Author

krynju commented Sep 22, 2021

Is there an MWE without Dagger? And can you put the stacktrace or a short sequence of events you are seeing?

No; Yes:

quick explaination in order:

Thunk502 (id not visible in the logs) is fetching on the Future related to Thunk516.
From the start you can see:

  1. Future registration for Thunk516
  2. Thunk516 being scheduled
  3. Thunk516 finishing and releasing resources
  4. Thunk502 fetching on the future related to Thunk516
  5. Future of Thunk516 has put! called on it
  6. This is a printout after put! to confirm the put! actually happened

This is the end of the log, Thunk502 never resumes and it's stuck on fetching on the Thunk516's future

Additionally if i synchronize put! and fetch inside Dagger's code the race doesn't happen, but that uses a global lock which is suboptimal I think

┌ Debug: 1036287804087900 futures are put Dagger.ThunkFuture(Distributed.Future(1, 1, 3101, nothing)), Dagger.Sch.ThunkID(516, MemPool.DRef(1, 1542, 0x0000000000000240))
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\dynamic.jl:153
┌ Debug: (1) #5 (516) Using available Dagger.ThreadProc(1, 1): 28838 | 0/16000000000
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:949
┌ Debug: (1) #5 (516) 1036287804414100 Releasing Dagger.ThreadProc: 28838 | 0/16000000000
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\Sch.jl:989
┌ Debug: 1036287804467600 fetching objid=16455950807025121162 Dagger.ThunkFuture(Distributed.Future(1, 1, 3101, nothing))
└ @ Dagger C:\Users\krynjupc\.julia\dev\Dagger\src\thunk.jl:130
┌ Debug: filling objid=16455950807025121162 Dagger.ThunkFuture(Distributed.Future(1, 1, 3101, nothing))
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\util.jl:17
┌ Debug: future.future.v=Some((false, Dagger.Chunk{Int64, MemPool.DRef, Dagger.ThreadProc, AnyScope}(Int64, UnitDomain(), MemPool.DRef(1, 1543, 0x0000000000000008), Dagger.ThreadProc(1, 1), AnyScope(), false)))
└ @ Dagger.Sch C:\Users\krynjupc\.julia\dev\Dagger\src\sch\util.jl:19

@vchuravy
Copy link
Member

So something akin to:

using Distributed

fut = Distributed.Future()
barrier = Threads.Event()
T1 = Threads.@spawn begin
    notify($barrier)
    fetch($fut)
end
wait(barrier)
T2 = Threads.@spawn put!($fut, 1)

wait(T2)
wait(T1)

but that doesn't hang in my few tries.

Adding a condition to every future is I think not the right strategy. I would first like to understand what race is happening (and on which data)

@krynju
Copy link
Contributor Author

krynju commented Sep 22, 2021

So overall I considered two options:

  • race on the future.v, which is the locally cached thing put into the future
  • race on the Channel associated with the Future that eventually get's fetched on and comes from clientrefs (but all of that is synchronized with the client_refs lock, so I doubt it's the problem)

The fix I proposed addresses the first one in case of Future usage in a one-worker multi-threaded environment, but also ditches the channel usage on local fetches alltogether, which I'm not sure is a good move, but I'll give it some more thought later

@krynju
Copy link
Contributor Author

krynju commented Sep 22, 2021

@vchuravy
Can you please try this? This locks up for me - it's more similar to what is actually happening in my examples in Dagger

using Distributed
create_future = () -> Future()
put_future = (f) -> put!(f, Threads.threadid())
fetch_future = (f) -> fetch(f)

for x in 1:10
    _f = [create_future() for i in 1:10]
    Threads.@spawn put_future.(_f)
    t = Threads.@spawn fetch_future.(_f)
    wait(t)
end

example (stuck on iter2):

julia> for x in 1:10
           println("starting $x")
           _f = [create_future() for i in 1:10]
           Threads.@spawn put_future.(_f)
           t = Threads.@spawn fetch_future.(_f)
           wait(t)
       end
starting 1
starting 2

@jpsamaroo
Copy link
Member

Testing the above with >1 thread hangs for me, but works fine with only 1 thread.

@krynju
Copy link
Contributor Author

krynju commented Sep 28, 2021

Ok i found exactly where the problem is.
I've put debug statements inside fetch_ref and put_future to see what lookup_ref is doing.

function put_future(rid, v, caller)
    rv = lookup_ref(rid)
    @debug "put; rid=$(objectid(rid)); rv=$(objectid(rv))"
...
fetch_ref(rid, args...) = begin
    rv= lookup_ref(rid)
    @debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))"
    fetch(rv.c, args...)
end

What should be happening is that for the put and fetch on a single Future the rid and rv object ids should be exactly the same as this is a local environment.
What is actually happening is that:

For many Futures the condition passes, example:

┌ Debug: put; rid=4746684393088201827; rv=4325974976783221916
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=4746684393088201827; rv=4325974976783221916
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578

For some Futures the lookup_ref function returns a different RemoteValue on which wait is eventually called and never returned from, because during put the value was put in a different RemoteValue. Example:

┌ Debug: put; rid=622726897413959939; rv=3734176968996073142
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=622726897413959939; rv=7331542461171591063
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578

The 2nd observation is where the hangs actually happen.

I'll look for a solution that wouldn't be very intrusive and work with the local caching as well, because that's a race-prone area as well

Full log that hangs:

starting 3
┌ Debug: put; rid=4746684393088201827; rv=4325974976783221916
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=4746684393088201827; rv=4325974976783221916
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=17723865940758269499; rv=6135573713805705389
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=17723865940758269499; rv=6135573713805705389
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=14525136180728428860; rv=479170697237490132
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=14525136180728428860; rv=479170697237490132
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=8278212479482857028; rv=7065234581043630017
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=1551521008739866053; rv=6329425338363095610
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=1551521008739866053; rv=6329425338363095610
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: put; rid=11548680288885168406; rv=10243852243780688353
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=11548680288885168406; rv=10243852243780688353
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=240326915445701795; rv=15783991434042387415
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=240326915445701795; rv=15783991434042387415
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=622726897413959939; rv=3734176968996073142
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: put; rid=14194915939145007482; rv=12395807295187781480
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624
┌ Debug: fet; rid=622726897413959939; rv=7331542461171591063
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:578
┌ Debug: put; rid=13560115101190643827; rv=6840043069730518484
└ @ Distributed C:\cygwin64\home\krynjupc\julia\stdlib\Distributed\src\remotecall.jl:624

@krynju
Copy link
Contributor Author

krynju commented Sep 30, 2021

Ok latest commit passes tests and fixes the synthethic stress test which used to fail (multiple fetches were another thing that wasn't that threadsafe).

It fixes the Dagger hangs that were due to this not being thread safe, but there's still a reproducible hang in my heavy Dagger example and it seems like it's related to Dagger specifically (this one doesn't leave the task in state.running - new hang identified 😢 @jpsamaroo).

I'll write another comment later explaining the solution - i dont think this one should affect the performance that much.
Also I need to do a cleanup of the code

  • explaination
  • cleanup

synthethic example that passes now:

using Distributed
create_future = () -> Future()
put_future = (f) -> put!(f, sum(1:10_000))
fetch_future = (f) -> fetch(f)

for x in 1:1000
    println("starting $x")
    _f = [create_future() for i in 1:1000]
    
    t1 = Threads.@spawn fetch_future.(_f)
    t2 = Threads.@spawn fetch_future.(_f)
    Threads.@spawn put_future.(_f)

    t3 = Threads.@spawn fetch_future.(_f)
    t4 = Threads.@spawn fetch_future.(_f)

    wait(t1); 
    wait(t2);
    wait(t3);
    wait(t4);
end

@@ -72,7 +72,7 @@ function test_existing_ref(r::AbstractRemoteRef)
if isa(r, Future) && found.v === nothing && r.v !== nothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like this function may require the client_refs lock, since their are multiple uses of that dict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getkey is locking on client_refs, but still that function looks fishy
In the end i didn't find any issues with it, but I'll keep an eye on it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generally insufficient to lock each use independently, but all uses generally must share the same lock

stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
@vchuravy vchuravy added backport 1.7 multithreading Base.Threads and related functionality stdlib Julia's standard library labels Sep 30, 2021
@krynju
Copy link
Contributor Author

krynju commented Oct 2, 2021

Issue description:
All situations considered in a one worker-multiple threads scenario.

  1. Issue 1: Task fetching on a wrong channel. Let's consider a Future that is fetched twice and put! once. First fetch happens first, task locks on the RemoteValue's channel. Now put! happens and it puts the value into the channel, removes the reference from client_refs and then puts v into the cache. First fetch will return from the fetch, cache and send a req to remove the ref from client_refs. If the 2nd fetch happens between the removal of the reference and caching the value during put! it will create a new reference using lookup_ref and wait on its channel - which will never be put! into, because that already happened on a different channel.
  2. Issue 2: Fetch caching first. Same scenario, but let's consider a situation where put! is called, value is put into channel and between this moment and caching the value locally a complete fetch happens. This fetch will cache the value of the Future earlier than put!, so the check we have in put! whether the value was cached will fail (which implies that the Future has already been set, but in reality it was just cached by a fetch).

Solutions:

  1. Solution to Issue 1: To make it work caching of the value had to be adjusted in put!, so that it happens before putting the value into the channel. With that guarantee in place another adjustment was needed in fetch. lookup_ref couldn't be used directly for local operations. The new idea is that within the client_refs lock the future.v is checked and the lookup_ref is only performed if the future isn't already set. Due to the guarantee that a correct local channel exists as long as the cache isn't set (the local ref only ever gets removed after the local cache is set) no unnecessary/additional RemoteValues will be created and waited on.
  2. Solution to Issue 2: Reordering in put!, so that caching always happens before channel put! ensures the put! call is always the first one to cache the value locally, so that issue never appears.

@krynju
Copy link
Contributor Author

krynju commented Oct 8, 2021

Ok, so I've been daily driving this and it's looking good for the issues that were mentioned by me above.

However, I noticed one more hang, which is very weird and I'll just create an issue for it.
But this PR is good to go IMO, the hang described below - I sometimes observed it before the PR as well

In summary when running the stress test with >12 threads on PC1 (linux 2c/4t) or >16 threads on PC2 (windows 8c/16t) the hang will occur
Important: every time at the same iteration!
I have a moderate suspicion it's related to GC somehow.
The only thing which makes sense would pop up at exactly the same iteration is the GC

Additionally I managed to gdb into the stuck thread (it's at 100% usage) and it seems to be stuck in gcutils.jl and in gf.c constantly going through sig_match_fast in there

[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
julia_==_26805 () at gcutils.jl:5
5	gcutils.jl: No such file or directory.
(gdb) s
0x00007f5af26a06b1 in jl_apply_generic () at /opt/julia/cli/trampolines/trampolines_x86_64.S:19
19	JL_EXPORTED_FUNCS(XX)
(gdb) s
jl_apply_generic (F=0x7f5adf0c5480 <jl_system_image_data+8909632>, args=0x7ffc084da9d0, nargs=2) at /opt/julia/src/gf.c:2423
2423	{
(gdb)
2425	    jl_method_instance_t *mfunc = jl_lookup_generic_(F, args, nargs,
(gdb)
0x00007f5af1f28a3b in jl_lookup_generic_ (world=<optimized out>, callsite=<optimized out>,
    nargs=<optimized out>, args=<optimized out>, F=<optimized out>)
    at /opt/julia/src/gf.c:2346
2346	    LOOP_BODY(0);
(gdb)
etc.

Other time it was stuck in rehash!/promotion

stress test:

using Distributed
create_future = () -> Future()
put_future = (f) -> put!(f, sum(1:10_000))
fetch_future = (f) -> fetch(f)

for x in 1:1000
    println("starting $x")
    _f = [create_future() for i in 1:1000]
    
    t1 = Threads.@spawn fetch_future.(_f)
    t2 = Threads.@spawn fetch_future.(_f)
    Threads.@spawn put_future.(_f)

    t3 = Threads.@spawn fetch_future.(_f)
    t4 = Threads.@spawn fetch_future.(_f)

    wait(t1); 
    wait(t2);
    wait(t3);
    wait(t4);
end

rv = lock(client_refs) do
r.v === nothing ? lookup_ref(remoteref_id(r)) : nothing
end
if r.v !== nothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the logic here. Can you add some comments here what this is guarding against and why we can't just call call_on_owner(fetch_ref, r)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this guards against creating unnecessary RemoteValues - more details here in issue 1 #42339 (comment)

If we just call_on_owner(fetch_ref, r) then we get a RemoteValue's channel back, but we can get a freshly created one if the original doesn't exist anymore. Now we would have to check the r.v again to make sure that RemoteValue is correct (r.v == nothing) or if it's a freshly created one (r != nothing) and then remove it, so this just doesn't create it in the first place

rr.v = Some(v)
_, ok = @atomicreplace rr.v nothing => Some(v)
ok || error("Future can be set only once")
call_on_owner(put_future, rr, something(rr.v), myid())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not v instead of something(rr.v)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this changes the semantics doesn't it?

if I am not the owner of rr I get to set it, and potentially I and the actual owner disagree?
Think about two clients call put! on the same future. Only one of them is allowed to win that race. The race needs to be resolved on the owner.

Copy link
Contributor Author

@krynju krynju Oct 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed all possible references to r.v / rr.v, so that the getproperty is called on them and possibly helps with the atomic ordering - no idea if it actually translates well.

On the conflict - I had the impression that two put!'s is an error.
So the issue is that in case the call_on_owner fails the rogue put! has already cached and allowed local tasks to consume that cache even though it's wrong

So if we move these around - call on owner first then caching then we're hitting issue 2 from here #42339 (comment)

Basically fetch will cache faster than put. We'd have to remove caching from fetch i guess?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove caching from put! instead. The option is to attempt an atomicreplace and if it fails, check whether or not it was equivalent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So removing caching from put! has an additional implication - once put is done it removes the local channel with the value, so next time someone fetches they will be stuck on waiting on a newly created channel, because the original one was removed and cache is empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this:

function put!(rr::Future, v)
    rr.where == myid() && set_future_cache(rr, v)
    call_on_owner(put_future, rr, v, myid())
    rr.where != myid() && set_future_cache(rr, v)
    rr
end

function set_future_cache(rr::Future, v)
    _, ok = @atomicreplace rr.v nothing => Some(v)
    ok || error("Future can be set only once")
end

I suppose caching when future is local is fine before its channel get's put! on

@@ -549,10 +549,21 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex
"""
function fetch(r::Future)
r.v !== nothing && return something(r.v)
Copy link
Member

@tkf tkf Oct 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relaxed load doesn't work here. Since the field has an indirection (Any), we need at least acquire.

Suggested change
r.v !== nothing && return something(r.v)
v = @atomic r.v
v !== nothing && return something(v)

Basically, if you put @atomic on the field declaration, you have to manually make sure that you put @atomic on every read and write (unless relaxed/monotonic load is really what you want).

(Edit: Stop mentioning Dekker; not sure how to construct an example where store-load reordering matters.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm a little confused now, @vtjnash reviewed this earlier and suggested that for these reads the atomic sequencing isn't required, because the normal read just counts as a consume load and is good enough. (#42339 (comment))

I've had all reads marked with atomic at some point, I can revert to that if needed, but I don't really have any strong argument behind either approach - both work in case of Futures.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that all invariance (which cannot be enforced by a type level) that the caller of put! set up to be present by the time fetch call returns. We need release/acquire for this. Nothing in the Julia runtime or compiler helps here since such an invariance can only exist in the value domain.

Consider:

mutable struct SumTo1
    a::Int
    b::Int
    c::Int
end

invariance(n) = a + b + c == 1

function add_to_a!(n, x)
    n.a += x
    n.c = 1 - n.a - n.b
end

Task 1:

add_to_a!(n, 5)
put!(future, n)

Task 2:

n = fetch(future)
@assert invariance(n)

Without release and acquire, the assertion at the last line may fail. Furthermore, since Some{Any} introduces one extra indirection, it is a data race and hence a UB without release and acquire.

@vtjnash's point is that, with the current supported architectures and current implementations of the compiler, the difference is very likely not observable. But UB is UB. A future compiler can break this. It's also possible that a UB like this gets in a way to use a tooling to detect concurrency bugs. We should respect the memory model and write correct code.

But maybe a safer route is to switch a lock-based solution which is probably easier to deal with for many people. Or, if you are still curious, I think a nice material for help understanding the issue here is this talk on atomics by Herb Sutter:

(or maybe his another higher-level tutorial talk on lock-free programming).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks for the links to the talks! Very helpful in understanding that
watched the last link, will go at the longer one later
I've pushed a commit with @atomic reverted on loads and also marked them with :acquire and :release, since I believe for this scenario this is enough (basically a 1 publisher (put!), multiple consumer (fetch) scenario)

Copy link
Member

@vtjnash vtjnash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review comments from a joint review meeting

Main takeaway is that it appears we would solve many issues by adding a ReentrantLock inside Future, so that fetch and put can ensure only one process is inside those participating in the race to get/set the future

r.v !== nothing && return something(r.v)
v = call_on_owner(fetch_ref, r)
r.v = Some(v)
v_cache = @atomic :acquire r.v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
v_cache = @atomic :acquire r.v
v_cache, = @atomicreplace! r.v nothing => nothing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just made it a default @atomic which is sequentially consistent

stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Outdated Show resolved Hide resolved
stdlib/Distributed/src/remotecall.jl Show resolved Hide resolved
v_local = call_on_owner(fetch_ref, r)
end

@atomic :release r.v = Some(v_local)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might have been already set (by put! on myid != owner)

r.v = Some(v)
v_cache = @atomic :acquire r.v
v_cache !== nothing && return something(v_cache)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we lock r::Future here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't add a lock here since it's supposed to be a quick cache check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant here exactly, as in after the quick lookup.

For (my own) reference on performance:

julia> mutable struct AtomicCounter
           @atomic x::Int
       end

julia> const ac = AtomicCounter(0);

julia> const r = Some(0);

julia> const r2 = Ref(0);

# measure operation overhead
julia> @btime (r2[] = something(r))
  1.696 ns (0 allocations: 0 bytes)

# measure atomic load overhead
julia> @btime @atomic ac.x
  1.696 ns (0 allocations: 0 bytes)

julia> @btime @atomicreplace ac.x 0 => 0
  8.372 ns (0 allocations: 0 bytes)

julia> @btime @atomicreplace :monotonic ac.x 0 => 0
  8.372 ns (0 allocations: 0 bytes)

julia> const lk2 = Base.ThreadSynchronizer();

julia> @btime (lock(lk2); unlock(lk2))
  20.072 ns (0 allocations: 0 bytes)

julia> @btime (Base.iolock_begin(); Base.iolock_end())
  18.390 ns (0 allocations: 0 bytes)

julia> Base.iolock_begin(); @btime (Base.iolock_begin(); Base.iolock_end()); Base.iolock_end()
  12.467 ns (0 allocations: 0 bytes)

julia> const lk3 = Libc.malloc(40)
Ptr{Nothing} @0x0000555e99ac70c0

julia> ccall(:uv_mutex_init, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  20.407 ns (0 allocations: 0 bytes)

julia> ccall(:uv_mutex_init_recursive, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  23.412 ns (0 allocations: 0 bytes)
0

julia> ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  12.392 ns (0 allocations: 0 bytes)
0

julia> const lk = ReentrantLock()

julia> @btime (lock(lk); unlock(lk))
  56.520 ns (0 allocations: 0 bytes)

julia> lock(lk)

julia> @btime (lock(lk); unlock(lk))
  12.405 ns (0 allocations: 0 bytes)

(note: that last lock lk needs to be optimized, to be on-par with the other lks, but we didn't used to have the ability to express atomic operations in julia)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I put the whole thing in a lock(r.lock) then a local put! will never be able to enter the lock, because the fetch will be stuck on the fetch(rv.c). And in put! if i put into the channel outside of the lock I don't have control over who caches the local value first

I'm not sure what else could I improve here.
The local cache is always set once. The paths from fetch(rv.c) and call_on_owner will rightfully try to cache the value and only fail if it was already cached or put! was locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, yeah, I see how it is complicated by the fact that we might be the owner, so we are waiting for someone else to set the value before we can return it here. But it seems like if this fails in the remote case, it is because there was a thread-synchronization error, which could happen because this lock was not being held across here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call on owner case won't fail due to this lock, because it fetches on a remote channel

this fetch function isn't used in the call_on_owner scenario

This looks safe to me as it is right now because of that, but I haven't dwelled that deep into the remote scenarios

@vtjnash
Copy link
Member

vtjnash commented Oct 20, 2021

We also discussed that we want this to provide sequential consistency for put and fetch. While slow, this is probably essential for predicability and reliability to users.

# why? put! performs caching and putting into channel under r.lock

# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
v_cache = status ? v_local : v_old
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtjnash Just not sure here. So when a fetch gets the value from call_on_owner at line 573 we cache the value in r.v and then should fetch return the v_local obtained from the call_on_owner or should I load r.v and get that cached value?
Right now it's just returning v_local

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure here. Since fetch(rv.c) succeeded, it seems like that set the value for everyone, but now we might decide to copy it locally too, but that we must have strictly maintained that rv.v === r.v, since r.v is just a locally cached copy of the same pointer from rv.v (which has the canonical copy) in this case. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. The rv.v === r.v is maintained right now properly I think

@krynju
Copy link
Contributor Author

krynju commented Oct 22, 2021

@vtjnash Just pushed changes that address what we talked about this week.

  • Added the Future local lock
  • Right now a local put! will always get to cache first due to usage of a Future lock.
  • Some critical atomic reads were changed to default (so sequentially consistent).
  • The cache is always set once (fetch doesn't overwrite it anymore with the "same" value)

@vtjnash
Copy link
Member

vtjnash commented Nov 19, 2021

Yeah, we can worry about that some other day

@krynju
Copy link
Contributor Author

krynju commented Nov 19, 2021

Ok, so it's good to go then. CI fails due to Downloads

@vtjnash
Copy link
Member

vtjnash commented Nov 19, 2021

ah, wait, I looked closer at that error message, and realized it is being caused by this PR

Comment on lines 364 to 370
function serialize(s::ClusterSerializer, f::Future)
v_cache = @atomic f.v
serialize(s, f, v_cache === nothing)
end
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to change the Future serializer to exclude the new fields:

Suggested change
function serialize(s::ClusterSerializer, f::Future)
v_cache = @atomic f.v
serialize(s, f, v_cache === nothing)
end
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
function serialize(s::ClusterSerializer, f::Future)
serialize_type(s, typeof(f))
serialize(s, f.where)
serialize(s, remoteref_id(f))
value = @atomic f.v
if value === nothing
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
end
serialize(s, value)
end
function serialize(s::ClusterSerializer, f::RemoteChannel)
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
invoke(serialize, Tuple{AbstractSerializer, Any}, s, f)
end
serialize(s::AbstractSerializer, ::AbstractLock) = error("Locks cannot be serialized")

and corresponding deserialize methods

function deserialize(s::ClusterSerializer, T::Type{<:Future})
    where = deserialize(s, f.where)::Int
    rid = deserialize(s, remoteref_id(f))::RRID
    value = deserialize(s, f.value)
    # 1) send_add_client() is not executed when the ref is being serialized
    #    to where it exists, hence do it here.
    # 2) If we have received a 'fetch'ed Future or if the Future ctor found an
    #    already 'fetch'ed instance in client_refs (Issue #25847), we should not
    #    track it in the backing RemoteValue store.
    if where == myid() && value === nothing
        add_client(rid, where)
    end
    return T(where, rid, value) # ctor adds to client_refs table
end
function deserialize(s::ClusterSerializer, T::Type{<:RemoteChannel})
    rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)::T # deserialize a placeholder object
    rid = remoteref_id(rr)
    if rr.where == myid()
        # send_add_client() is not executed when the ref is being
        # serialized to where it exists
        add_client(rid, myid())
    end
    # call ctor to make sure this rr gets added to the client_refs table
    return T(rr.where, rid)
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to make it work, but without any success.
If I serialize the fields manually as proposed by you at some point the code expects a Future structure and I'm not really sure how to handle that.

Anyway i was thinking about the issue and as of right now:

  1. Serialization takes the full Future with the new lock and serializes it
  2. Deserialization takes the serialized data and constructs a new Future based on it, but I'm unsure whether it takes the serialized lock data or creates a new one (constructors only ever create a new one, apart from the default)

I made the serialization now serialize a copy of the Future with a new unused lock, but I'm not sure if it's necessary. It used to work fine before any serialization change anyway

Copy link
Contributor Author

@krynju krynju Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtjnash I rebased and it's finally passing CI.
What do you think of the above changes?
I couldn't get the serialization of only the "safe" fields working, so I just added a full copy in the serializer (so that the serialization always works on a copy with an unlocked lock).
I'm not sure it's necessary though as serializing a locked lock doesn't matter in the end as during deserialization a new lock is being constructed.
Or is there some reason not to serialize a locked lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why you were trying to use invoke, since there is definitely no hope of that approach working at all, while I had written the code just above here that I think should be roughly correct. Anyways, this approach now seems suitable enough so I have merged it.

@vtjnash vtjnash added the parallelism Parallel or distributed computation label Dec 1, 2021
@KristofferC KristofferC mentioned this pull request Dec 2, 2021
15 tasks
@vtjnash vtjnash merged commit 728cba3 into JuliaLang:master Dec 3, 2021
@vtjnash
Copy link
Member

vtjnash commented Dec 3, 2021

I probably should have asked you to write the commit message for the change also, but I have written a bad one instead and merged.

@vchuravy
Copy link
Member

vchuravy commented Dec 5, 2021

Great to see this merged!

@Sacha0
Copy link
Member

Sacha0 commented Dec 6, 2021

Echoing Valentin, awesome work, thanks for seeing it through @krynju! :)

KristofferC pushed a commit that referenced this pull request Dec 7, 2021
…Futures (#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
(cherry picked from commit 728cba3)
@Sacha0
Copy link
Member

Sacha0 commented Dec 11, 2021

With #43398 in, I've re-tagged this for backport (and tagged #43398 for backport alongside it).

KristofferC pushed a commit that referenced this pull request Dec 11, 2021
…Futures (#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
(cherry picked from commit 728cba3)
LilithHafner pushed a commit to LilithHafner/julia that referenced this pull request Feb 22, 2022
…Futures (JuliaLang#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
LilithHafner pushed a commit to LilithHafner/julia that referenced this pull request Mar 8, 2022
…Futures (JuliaLang#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
vchuravy pushed a commit to JuliaLang/Distributed.jl that referenced this pull request Oct 6, 2023
…Futures (JuliaLang/julia#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
(cherry picked from commit 4d17719)
Keno pushed a commit that referenced this pull request Jun 5, 2024
…Futures (#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <[email protected]>
Co-authored-by: Takafumi Arakaki <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality parallelism Parallel or distributed computation stdlib Julia's standard library
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants