-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Work-stealing scheduler #43366
RFC: Work-stealing scheduler #43366
Conversation
src/partr.c
Outdated
jl_task_t *task = jl_atomic_load_relaxed( | ||
(_Atomic(jl_task_t **))&wsdeques[tid].tasks[t % tasks_per_heap]); | ||
if (!atomic_compare_exchange_strong_explicit( | ||
&wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this shrink the workqueue from the top? And thus everytime we steal we lose space ontop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses the .tasks
buffer as a circular array so it's OK that the .top
is never decremented. (In fact, that's the main cleverness of the Chase-Lev deque.)
Is it a known error? Or can it be related to this PR?
(... many similar lines ...)
|
Codecov Report
@@ Coverage Diff @@
## master #43366 +/- ##
==========================================
- Coverage 89.55% 89.52% -0.03%
==========================================
Files 342 342
Lines 80387 80387
==========================================
- Hits 71992 71970 -22
- Misses 8395 8417 +22
Continue to review full report at Codecov.
|
This reverts commit f2b47d2.
It's good to go on my end. |
I ran additional benchmarks to help decide if we want this PR. A short summary is that workstealing works better than multiqueue with a relatively low thread count (< 10). On the other hand, the current multiqueue-based scheduler can outperform workstealing with many cores in some cases. This relative advantage of multiqueue is a bit surprizing to me since we are essentially using it only as a concurrent bag ATM and no special properties of multiqueue are exploited. But I haven't had time to investigate this further yet. That said, I think it is already worth adding the workstealing scheduler because:
Benchmarksfib(30)This benchmark compares the run-time of Lines 696 to 702 in b6bca19
The above plot shows the speedup; i.e., the run-time with the multiqueue scheduler relative to the run-time using workstealing 1. The left plot is with producer-consumerThis is a benchmark with the following function function producer_consumer(;
niters = 1000,
nreplicas = 10 * Threads.nthreads(),
)
@sync begin
for _ in 1:nreplicas
ch = Channel{Int}()
Threads.@spawn try
for i in 1:niters
put!(ch, i)
end
finally
close(ch)
end
Threads.@spawn foreach(identity, ch)
end
end
end The result is similar to fib(30) but the multiqueue performs better even with This is rather puzzling since I thought the above program strongly favors workstealing. Since the channel Machine informationMachine 1
Machine 2
Footnotes
|
|
Shouldn't it be the other way around? A ratio < 1 means the worksteeling scheduler takes less time to complete, i.e. worksteeling outperforms multiqueue. For the fib(30) benchmark this is only the case for |
Ah, thanks for noticing it. The equation in the figure label was the other way around. The numbers and the conclusions are correct. I fixed it. |
With 531e479 that simplifies the task lock handing, workstealing now outperforms multiqueue more consistently The only case where it looks like multiqueue is working better is fib(30) with It's noisier but it looks like that the trend is on speedup with workstealing. It may be reflecting that multiqueue is a more RNG-driven algorithm and it can get lucky sometimes. |
This reverts commit f2b47d2.
As a more practical example, I ran some benchmarks of quicksort and mergesort implemented in ThreadsX for sorting You can see that workstealing improves the performance of quicksort up to 20%. There's no clear performance improvement in mergesort. |
It could be interesting to see if Gaius.jl gets faster or slower with this new scheduler. |
Haha, I just commented it on https://julialang.zulipchat.com/#narrow/stream/137791-general/topic/Gaius.2Ejl's.20divide.20strategy/near/264786592 but I couldn't find any clear change in the performance of I think it makes sense as you don't need load-balancing for GEMM (which is what workstealing is good at). OTOH, algorithms like quicksort have the task DAG determined by the input data and so load-balancing is beneficial. That said, it's conceivable that improving task scheduling performance can improve |
_Atomic(int64_t) top; | ||
_Atomic(int64_t) bottom; | ||
}; | ||
uint8_t padding[JL_CACHE_BYTE_ALIGNMENT]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has no alignment whatsoever
wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads + | ||
JL_CACHE_BYTE_ALIGNMENT - 1) + | ||
JL_CACHE_BYTE_ALIGNMENT - 1) & | ||
(-JL_CACHE_BYTE_ALIGNMENT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads + | |
JL_CACHE_BYTE_ALIGNMENT - 1) + | |
JL_CACHE_BYTE_ALIGNMENT - 1) & | |
(-JL_CACHE_BYTE_ALIGNMENT)); | |
wsdeques = (wsdeque_t *)malloc_cache_align(sizeof(wsdeque_t) * jl_n_threads); |
{ | ||
int16_t tid = jl_threadid(); | ||
int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom); | ||
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is being load-acquired? There are no stores in wsdeque_steal
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); | |
int64_t t = jl_atomic_load(&wsdeques[tid].top); // ensures that `tasks` is successfully stolen before we try to reuse the slot below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to digest this and I realized that I still don't get it. As we discussed, there is a CAS on .top
in wsdeque_steal
Lines 316 to 318 in 0e50d01
jl_task_t *task = jl_atomic_load_relaxed( | |
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]); | |
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) |
The loaded task only matters in the success path where we have a seq_cst write that supersets release write. So, we have:
- a sequenced-before edge from the load
.tasks[t % tasks_per_heap]
to the store on the.top
via CAS in the next line (as they both happens in the same thread) - a synchronizes-with edge from the store on the
.top
to the load of.top
inwsdeque_push
you quoted above
So they establish happens-before and it looks like we know that the task is loaded by the time we load the .top
? IIRC, your concern was that loading .tasks[t % tasks_per_heap]
and storing .top
can be reordered, but doesn't release forbids that?
A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store.
https://en.cppreference.com/w/cpp/atomic/memory_order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. But edges are not transitive, unless all of them are seq-cst, IIUC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But yes, looks like we have a proper release/acquire pair here on top to ensure the ops on tasks are okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.... Lahav et al.'s definition of happens-before hb
is the transitive closure of the union of sequenced-before sb
and synchronizes-with sw
R^+
is the transitive closure of R
:
But I don't know if that's a mismatch between their definition and the C11 semantics (though they use the same notions for discussing C11 so probably not) and/or some difference to the actual definition in the standard.
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]); | ||
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) | ||
return NULL; | ||
return task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return task; | |
if (jl_set_task_tid(task)) | |
return task; | |
wsdequeue_push(task, 1); // FIXME: the sticky queue would be a better place for this, but we need to handle that inside Julia instead of handling these conditions here | |
return NULL; |
if (jl_atomic_load_acquire(&task->tid) != -1) | ||
// If the `task` still hasn't finished the context switch at this point, abort push | ||
// and put it in the sticky queue. | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (jl_atomic_load_acquire(&task->tid) != -1) | |
// If the `task` still hasn't finished the context switch at this point, abort push | |
// and put it in the sticky queue. | |
return -1; |
The task->tid
is fairly likely to be our own tid
here. I think we can handle this when we pop the value (putting it into the sticky queue when we pop the value, rather than when we pushed it)
// If the `task` still hasn't finished the context switch at this point, abort push | ||
// and put it in the sticky queue. | ||
return -1; | ||
jl_fence_release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only store was the relaxed to tasks
, so maybe just make that a release store?
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap]); | ||
if (size > 0) | ||
return task; | ||
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might need a fence, since it is trying to order the relaxed load above with the relaxed store below it?
return NULL; | ||
jl_task_t *task = jl_atomic_load_relaxed( | ||
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]); | ||
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly also need an explicit fence? I am not certain if seq-cst cmpswp is sufficient to enforce an order on relaxed ops nearby.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised this doesn't try to steal size*fraction items, instead of a constant (1), but that does not need to be changed for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's Stealing Multi-Queue that does something similar to what you said https://arxiv.org/abs/2109.00657
But stealing like this works quite well for fork-join use cases in which you typically only enqueue/"materialize" O(log n) tasks in the queue while the task itself can "reveal" O(n) child tasks upon execution (n = e.g., length of array for a parallel map).
bump? |
Since Taka is no longer active, I rejuvinated this work in #50221 |
This PR implements a work-stealing scheduler for Julia's Parallel Task Runtime. It is based on the lock-free 1 work-stealing deque by Chase and Lev (2005). The scheduler can be specified upon startup using the environment variable
$JULIA_THREAD_SCHEDULER
. Starting Julia withJULIA_THREAD_SCHEDULER=workstealing julia
will use the work-stealing scheduler. Otherwise, it uses the default multi-queue 2.A quick benchmark for computing
fib(30)
using tasks shows that this implementation can be up to 2x faster than the current scheduler, especially at relatively low thread counts. I think there are a few more optimization opportunities.@vtjnash From our discussion in the threading BoF a couple of months ago, I think you were in favor of adding alternative schedulers to Julia? What do you think about adding an alternative scheduler like this?
A bit more context: I'm playing with the depth-first scheduler now and I need some well-described scheduler, like work-stealing, as a baseline. I then thought to package it up in a way useful for everyone.
Before merge:
JULIA_THREAD_SCHEDULER
in the documentationFootnotes
Due to the way "task locking" is handled ATM, I don't think the current enqueue and dequeue implementation is lock-free. I haven't found a better way to do this so far. ↩
For testing purposes, this branch currently defaults to work-stealing. This should be reverted before merging. ↩