-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Distributed] Worker local race condition between put! and fetch for Futures #42339
[Distributed] Worker local race condition between put! and fetch for Futures #42339
Conversation
It's breaking Shared Arrays as it gets stuck there during the CI runs |
Is there an MWE without Dagger? And can you put the stacktrace or a short sequence of events you are seeing? |
No; Yes: quick explaination in order: Thunk502 (id not visible in the logs) is fetching on the Future related to Thunk516.
This is the end of the log, Thunk502 never resumes and it's stuck on fetching on the Thunk516's future Additionally if i synchronize put! and fetch inside Dagger's code the race doesn't happen, but that uses a global lock which is suboptimal I think
|
So something akin to:
but that doesn't hang in my few tries. Adding a condition to every future is I think not the right strategy. I would first like to understand what race is happening (and on which data) |
So overall I considered two options:
The fix I proposed addresses the first one in case of Future usage in a one-worker multi-threaded environment, but also ditches the channel usage on local fetches alltogether, which I'm not sure is a good move, but I'll give it some more thought later |
@vchuravy
example (stuck on iter2):
|
Testing the above with >1 thread hangs for me, but works fine with only 1 thread. |
Ok i found exactly where the problem is. function put_future(rid, v, caller)
rv = lookup_ref(rid)
@debug "put; rid=$(objectid(rid)); rv=$(objectid(rv))"
... fetch_ref(rid, args...) = begin
rv= lookup_ref(rid)
@debug "fet; rid=$(objectid(rid)); rv=$(objectid(rv))"
fetch(rv.c, args...)
end What should be happening is that for the put and fetch on a single Future the rid and rv object ids should be exactly the same as this is a local environment. For many Futures the condition passes, example:
For some Futures the
|
Ok latest commit passes tests and fixes the synthethic stress test which used to fail (multiple fetches were another thing that wasn't that threadsafe). It fixes the Dagger hangs that were due to this not being thread safe, but there's still a reproducible hang in my heavy Dagger example and it seems like it's related to Dagger specifically (this one doesn't leave the task in state.running - new hang identified 😢 @jpsamaroo). I'll write another comment later explaining the solution - i dont think this one should affect the performance that much.
synthethic example that passes now: using Distributed
create_future = () -> Future()
put_future = (f) -> put!(f, sum(1:10_000))
fetch_future = (f) -> fetch(f)
for x in 1:1000
println("starting $x")
_f = [create_future() for i in 1:1000]
t1 = Threads.@spawn fetch_future.(_f)
t2 = Threads.@spawn fetch_future.(_f)
Threads.@spawn put_future.(_f)
t3 = Threads.@spawn fetch_future.(_f)
t4 = Threads.@spawn fetch_future.(_f)
wait(t1);
wait(t2);
wait(t3);
wait(t4);
end |
stdlib/Distributed/src/remotecall.jl
Outdated
@@ -72,7 +72,7 @@ function test_existing_ref(r::AbstractRemoteRef) | |||
if isa(r, Future) && found.v === nothing && r.v !== nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this function may require the client_refs
lock, since their are multiple uses of that dict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getkey is locking on client_refs, but still that function looks fishy
In the end i didn't find any issues with it, but I'll keep an eye on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is generally insufficient to lock each use independently, but all uses generally must share the same lock
Issue description:
Solutions:
|
Ok, so I've been daily driving this and it's looking good for the issues that were mentioned by me above. However, I noticed one more hang, which is very weird and I'll just create an issue for it. In summary when running the stress test with >12 threads on PC1 (linux 2c/4t) or >16 threads on PC2 (windows 8c/16t) the hang will occur Additionally I managed to gdb into the stuck thread (it's at 100% usage) and it seems to be stuck in gcutils.jl and in gf.c constantly going through
Other time it was stuck in rehash!/promotion stress test: using Distributed
create_future = () -> Future()
put_future = (f) -> put!(f, sum(1:10_000))
fetch_future = (f) -> fetch(f)
for x in 1:1000
println("starting $x")
_f = [create_future() for i in 1:1000]
t1 = Threads.@spawn fetch_future.(_f)
t2 = Threads.@spawn fetch_future.(_f)
Threads.@spawn put_future.(_f)
t3 = Threads.@spawn fetch_future.(_f)
t4 = Threads.@spawn fetch_future.(_f)
wait(t1);
wait(t2);
wait(t3);
wait(t4);
end |
stdlib/Distributed/src/remotecall.jl
Outdated
rv = lock(client_refs) do | ||
r.v === nothing ? lookup_ref(remoteref_id(r)) : nothing | ||
end | ||
if r.v !== nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the logic here. Can you add some comments here what this is guarding against and why we can't just call call_on_owner(fetch_ref, r)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this guards against creating unnecessary RemoteValues - more details here in issue 1 #42339 (comment)
If we just call_on_owner(fetch_ref, r)
then we get a RemoteValue's channel back, but we can get a freshly created one if the original doesn't exist anymore. Now we would have to check the r.v
again to make sure that RemoteValue is correct (r.v == nothing) or if it's a freshly created one (r != nothing) and then remove it, so this just doesn't create it in the first place
stdlib/Distributed/src/remotecall.jl
Outdated
rr.v = Some(v) | ||
_, ok = @atomicreplace rr.v nothing => Some(v) | ||
ok || error("Future can be set only once") | ||
call_on_owner(put_future, rr, something(rr.v), myid()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not v
instead of something(rr.v)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this changes the semantics doesn't it?
if I am not the owner of rr
I get to set it, and potentially I and the actual owner disagree?
Think about two clients call put!
on the same future. Only one of them is allowed to win that race. The race needs to be resolved on the owner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just changed all possible references to r.v / rr.v, so that the getproperty is called on them and possibly helps with the atomic ordering - no idea if it actually translates well.
On the conflict - I had the impression that two put!
's is an error.
So the issue is that in case the call_on_owner
fails the rogue put! has already cached and allowed local tasks to consume that cache even though it's wrong
So if we move these around - call on owner first then caching then we're hitting issue 2 from here #42339 (comment)
Basically fetch will cache faster than put. We'd have to remove caching from fetch i guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove caching from put!
instead. The option is to attempt an atomicreplace and if it fails, check whether or not it was equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So removing caching from put! has an additional implication - once put is done it removes the local channel with the value, so next time someone fetches they will be stuck on waiting on a newly created channel, because the original one was removed and cache is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about this:
function put!(rr::Future, v)
rr.where == myid() && set_future_cache(rr, v)
call_on_owner(put_future, rr, v, myid())
rr.where != myid() && set_future_cache(rr, v)
rr
end
function set_future_cache(rr::Future, v)
_, ok = @atomicreplace rr.v nothing => Some(v)
ok || error("Future can be set only once")
end
I suppose caching when future is local is fine before its channel get's put! on
stdlib/Distributed/src/remotecall.jl
Outdated
@@ -549,10 +549,21 @@ is an exception, throws a [`RemoteException`](@ref) which captures the remote ex | |||
""" | |||
function fetch(r::Future) | |||
r.v !== nothing && return something(r.v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relaxed load doesn't work here. Since the field has an indirection (Any
), we need at least acquire.
r.v !== nothing && return something(r.v) | |
v = @atomic r.v | |
v !== nothing && return something(v) |
Basically, if you put @atomic
on the field declaration, you have to manually make sure that you put @atomic
on every read and write (unless relaxed/monotonic load is really what you want).
(Edit: Stop mentioning Dekker; not sure how to construct an example where store-load reordering matters.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm a little confused now, @vtjnash reviewed this earlier and suggested that for these reads the atomic sequencing isn't required, because the normal read just counts as a consume load and is good enough. (#42339 (comment))
I've had all reads marked with atomic at some point, I can revert to that if needed, but I don't really have any strong argument behind either approach - both work in case of Futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that all invariance (which cannot be enforced by a type level) that the caller of put!
set up to be present by the time fetch
call returns. We need release/acquire for this. Nothing in the Julia runtime or compiler helps here since such an invariance can only exist in the value domain.
Consider:
mutable struct SumTo1
a::Int
b::Int
c::Int
end
invariance(n) = a + b + c == 1
function add_to_a!(n, x)
n.a += x
n.c = 1 - n.a - n.b
end
Task 1:
add_to_a!(n, 5)
put!(future, n)
Task 2:
n = fetch(future)
@assert invariance(n)
Without release and acquire, the assertion at the last line may fail. Furthermore, since Some{Any}
introduces one extra indirection, it is a data race and hence a UB without release and acquire.
@vtjnash's point is that, with the current supported architectures and current implementations of the compiler, the difference is very likely not observable. But UB is UB. A future compiler can break this. It's also possible that a UB like this gets in a way to use a tooling to detect concurrency bugs. We should respect the memory model and write correct code.
But maybe a safer route is to switch a lock-based solution which is probably easier to deal with for many people. Or, if you are still curious, I think a nice material for help understanding the issue here is this talk on atomics by Herb Sutter:
- C++ and Beyond 2012: Herb Sutter - atomic Weapons 1 of 2 - YouTube
- C++ and Beyond 2012: Herb Sutter - atomic Weapons 2 of 2 - YouTube
(or maybe his another higher-level tutorial talk on lock-free programming).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, thanks for the links to the talks! Very helpful in understanding that
watched the last link, will go at the longer one later
I've pushed a commit with @atomic reverted on loads and also marked them with :acquire and :release, since I believe for this scenario this is enough (basically a 1 publisher (put!), multiple consumer (fetch) scenario)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review comments from a joint review meeting
Main takeaway is that it appears we would solve many issues by adding a ReentrantLock
inside Future
, so that fetch
and put
can ensure only one process is inside those participating in the race to get/set the future
stdlib/Distributed/src/remotecall.jl
Outdated
r.v !== nothing && return something(r.v) | ||
v = call_on_owner(fetch_ref, r) | ||
r.v = Some(v) | ||
v_cache = @atomic :acquire r.v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v_cache = @atomic :acquire r.v | |
v_cache, = @atomicreplace! r.v nothing => nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just made it a default @atomic which is sequentially consistent
stdlib/Distributed/src/remotecall.jl
Outdated
v_local = call_on_owner(fetch_ref, r) | ||
end | ||
|
||
@atomic :release r.v = Some(v_local) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might have been already set (by put!
on myid != owner)
r.v = Some(v) | ||
v_cache = @atomic :acquire r.v | ||
v_cache !== nothing && return something(v_cache) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we lock r::Future
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't add a lock here since it's supposed to be a quick cache check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant here exactly, as in after the quick lookup.
For (my own) reference on performance:
julia> mutable struct AtomicCounter
@atomic x::Int
end
julia> const ac = AtomicCounter(0);
julia> const r = Some(0);
julia> const r2 = Ref(0);
# measure operation overhead
julia> @btime (r2[] = something(r))
1.696 ns (0 allocations: 0 bytes)
# measure atomic load overhead
julia> @btime @atomic ac.x
1.696 ns (0 allocations: 0 bytes)
julia> @btime @atomicreplace ac.x 0 => 0
8.372 ns (0 allocations: 0 bytes)
julia> @btime @atomicreplace :monotonic ac.x 0 => 0
8.372 ns (0 allocations: 0 bytes)
julia> const lk2 = Base.ThreadSynchronizer();
julia> @btime (lock(lk2); unlock(lk2))
20.072 ns (0 allocations: 0 bytes)
julia> @btime (Base.iolock_begin(); Base.iolock_end())
18.390 ns (0 allocations: 0 bytes)
julia> Base.iolock_begin(); @btime (Base.iolock_begin(); Base.iolock_end()); Base.iolock_end()
12.467 ns (0 allocations: 0 bytes)
julia> const lk3 = Libc.malloc(40)
Ptr{Nothing} @0x0000555e99ac70c0
julia> ccall(:uv_mutex_init, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
20.407 ns (0 allocations: 0 bytes)
julia> ccall(:uv_mutex_init_recursive, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
23.412 ns (0 allocations: 0 bytes)
0
julia> ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3)
0
julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
12.392 ns (0 allocations: 0 bytes)
0
julia> const lk = ReentrantLock()
julia> @btime (lock(lk); unlock(lk))
56.520 ns (0 allocations: 0 bytes)
julia> lock(lk)
julia> @btime (lock(lk); unlock(lk))
12.405 ns (0 allocations: 0 bytes)
(note: that last lock lk
needs to be optimized, to be on-par with the other lks, but we didn't used to have the ability to express atomic operations in julia)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I put the whole thing in a lock(r.lock)
then a local put!
will never be able to enter the lock, because the fetch
will be stuck on the fetch(rv.c)
. And in put!
if i put into the channel outside of the lock I don't have control over who caches the local value first
I'm not sure what else could I improve here.
The local cache is always set once. The paths from fetch(rv.c)
and call_on_owner
will rightfully try to cache the value and only fail if it was already cached or put!
was locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, yeah, I see how it is complicated by the fact that we might be the owner, so we are waiting for someone else to set the value before we can return it here. But it seems like if this fails in the remote case, it is because there was a thread-synchronization error, which could happen because this lock was not being held across here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the call on owner case won't fail due to this lock, because it fetches on a remote channel
this fetch
function isn't used in the call_on_owner
scenario
This looks safe to me as it is right now because of that, but I haven't dwelled that deep into the remote scenarios
We also discussed that we want this to provide sequential consistency for |
# why? put! performs caching and putting into channel under r.lock | ||
|
||
# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v | ||
v_cache = status ? v_local : v_old |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vtjnash Just not sure here. So when a fetch gets the value from call_on_owner
at line 573 we cache the value in r.v
and then should fetch return the v_local
obtained from the call_on_owner
or should I load r.v
and get that cached value?
Right now it's just returning v_local
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure here. Since fetch(rv.c)
succeeded, it seems like that set the value for everyone, but now we might decide to copy it locally too, but that we must have strictly maintained that rv.v === r.v
, since r.v
is just a locally cached copy of the same pointer from rv.v
(which has the canonical copy) in this case. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. The rv.v === r.v
is maintained right now properly I think
@vtjnash Just pushed changes that address what we talked about this week.
|
Yeah, we can worry about that some other day |
Ok, so it's good to go then. CI fails due to Downloads |
ah, wait, I looked closer at that error message, and realized it is being caused by this PR |
stdlib/Distributed/src/remotecall.jl
Outdated
function serialize(s::ClusterSerializer, f::Future) | ||
v_cache = @atomic f.v | ||
serialize(s, f, v_cache === nothing) | ||
end | ||
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) | ||
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) | ||
if addclient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to change the Future serializer to exclude the new fields:
function serialize(s::ClusterSerializer, f::Future) | |
v_cache = @atomic f.v | |
serialize(s, f, v_cache === nothing) | |
end | |
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) | |
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) | |
if addclient | |
function serialize(s::ClusterSerializer, f::Future) | |
serialize_type(s, typeof(f)) | |
serialize(s, f.where) | |
serialize(s, remoteref_id(f)) | |
value = @atomic f.v | |
if value === nothing | |
p = worker_id_from_socket(s.io) | |
(p !== rr.where) && send_add_client(rr, p) | |
end | |
serialize(s, value) | |
end | |
function serialize(s::ClusterSerializer, f::RemoteChannel) | |
p = worker_id_from_socket(s.io) | |
(p !== rr.where) && send_add_client(rr, p) | |
invoke(serialize, Tuple{AbstractSerializer, Any}, s, f) | |
end | |
serialize(s::AbstractSerializer, ::AbstractLock) = error("Locks cannot be serialized") |
and corresponding deserialize
methods
function deserialize(s::ClusterSerializer, T::Type{<:Future})
where = deserialize(s, f.where)::Int
rid = deserialize(s, remoteref_id(f))::RRID
value = deserialize(s, f.value)
# 1) send_add_client() is not executed when the ref is being serialized
# to where it exists, hence do it here.
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an
# already 'fetch'ed instance in client_refs (Issue #25847), we should not
# track it in the backing RemoteValue store.
if where == myid() && value === nothing
add_client(rid, where)
end
return T(where, rid, value) # ctor adds to client_refs table
end
function deserialize(s::ClusterSerializer, T::Type{<:RemoteChannel})
rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)::T # deserialize a placeholder object
rid = remoteref_id(rr)
if rr.where == myid()
# send_add_client() is not executed when the ref is being
# serialized to where it exists
add_client(rid, myid())
end
# call ctor to make sure this rr gets added to the client_refs table
return T(rr.where, rid)
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying to make it work, but without any success.
If I serialize the fields manually as proposed by you at some point the code expects a Future
structure and I'm not really sure how to handle that.
Anyway i was thinking about the issue and as of right now:
- Serialization takes the full
Future
with the new lock and serializes it - Deserialization takes the serialized data and constructs a new
Future
based on it, but I'm unsure whether it takes the serialized lock data or creates a new one (constructors only ever create a new one, apart from the default)
I made the serialization now serialize a copy of the Future with a new unused lock, but I'm not sure if it's necessary. It used to work fine before any serialization change anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vtjnash I rebased and it's finally passing CI.
What do you think of the above changes?
I couldn't get the serialization of only the "safe" fields working, so I just added a full copy in the serializer (so that the serialization always works on a copy with an unlocked lock).
I'm not sure it's necessary though as serializing a locked lock doesn't matter in the end as during deserialization a new lock is being constructed.
Or is there some reason not to serialize a locked lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why you were trying to use invoke
, since there is definitely no hope of that approach working at all, while I had written the code just above here that I think should be roughly correct. Anyways, this approach now seems suitable enough so I have merged it.
Co-authored-by: Jameson Nash <[email protected]>
This reverts commit 67da4d5.
I probably should have asked you to write the commit message for the change also, but I have written a bad one instead and merged. |
Great to see this merged! |
Echoing Valentin, awesome work, thanks for seeing it through @krynju! :) |
…Futures (#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]> (cherry picked from commit 728cba3)
PkgEval would have been good here. On 1.7.1 backport branch I see package errors that seem to originate from this: https://s3.amazonaws.com/julialang-reports/nanosoldier/pkgeval/by_hash/a1dd801_vs_3bf9d17/COBREXA.1.7.0-380e812253f.log etc. Full log at https://github.com/JuliaCI/NanosoldierReports/blob/master/pkgeval/by_hash/a1dd801_vs_3bf9d17/report.md. Therefore, dropping this for backporting until that has been investigated. |
…Futures (#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]> (cherry picked from commit 728cba3)
…Futures (JuliaLang#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]>
…Futures (JuliaLang#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]>
…Futures (JuliaLang/julia#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]> (cherry picked from commit 4d17719)
…Futures (#42339) * add local_lock to Future, use it in fetch and put! * add corrections to the remote/clientref logic * add memory ordering guarantees * serialize a (unlocked) copy of the future to avoid problems with the lock Co-authored-by: Jameson Nash <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]>
This fixes the issues observed here JuliaParallel/Dagger.jl#282 for a one-worker multi threaded environment.
It fixes that, but I'm afraid it might break multi-worker behavior, so this is still kind of WIP if that's the case
With these changes applied the worker-local fetches wait on the local_lock instead of the channel associated with the Future. The put! still uses the channel in case there are any remote references to the Future and when it's done caching the result it notifies the local_lock, so that the local fetches can get the cached value properly.
Let me know what you think