Skip to content

Commit

Permalink
Fix and re-enable dynamic scheduler scaling
Browse files Browse the repository at this point in the history
Change dynamic scheduler scaling implementation in order to resolve
the hangs encountered in #2451.

The previous implementation assumed that signalling to wake a thread
was a reliable operation. Apparently, that's not necessarily true
(see https://en.wikipedia.org/wiki/Spurious_wakeup and
https://askldjd.com/2010/04/24/the-lost-wakeup-problem/). Seeing
as we couldn't find any other explanation for why the previous
implementation was experiencing hangs, I've assumed it is either
because of lost wake ups or spurious wake ups and redesigned the
logic accordingly.

Now, when a thread is about to suspend, it will decrement the
`active_scheduler_count` and then suspend. When it wakes up, it will
check to see if the `active_scheduler_count` is at least as big as
its `index`. If the `active_scheduler_count` isn't big enough, the
thread will suspend itself again immediately. If it is big enough,
it will resume. Threads no longer modify `active_scheduler_count`
when they wake up.

`active_scheduler_count` must now be modified by the thread that is
waking up another thread prior to sending the wake up notification.
Additionally, since we're now assuming that wake up signals can be
lost, we now send multiple wake up notifications just in case. While
this is somewhat wasteful, it is better than being in a situation
where some threads aren't woken up at all (i.e. a hang).

This commit also includes a change inspired by #2474. Now, *all*
scheduler threads can suspend as long as there is at least one
noisy actor registered with the ASIO subsystem. If there are no
noisy actors registered with the ASIO subsystem then scheduler 0
is not allowed to suspend itself.
  • Loading branch information
dipinhora committed Jan 10, 2018
1 parent 615081b commit 9d2150e
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 104 deletions.
2 changes: 1 addition & 1 deletion src/libponyc/options/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ static void usage(void)
" --ponythreads Use N scheduler threads. Defaults to the number of\n"
" cores (not hyperthreads) available.\n"
" --ponyminthreads Minimum number of active scheduler threads allowed.\n"
" Defaults to the number of '--ponythreads'.\n"
" Defaults to 0.\n"
" --ponycdmin Defer cycle detection until 2^N actors have blocked.\n"
" Defaults to 2^4.\n"
" --ponycdmax Always cycle detect when 2^N actors have blocked.\n"
Expand Down
4 changes: 4 additions & 0 deletions src/libponyrt/asio/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "asio.h"
#include "../actor/actor.h"
#include "../mem/pool.h"
#include "../sched/scheduler.h"
#include "ponyassert.h"
#include <string.h>

Expand Down Expand Up @@ -122,4 +123,7 @@ PONY_API void pony_asio_event_send(asio_event_t* ev, uint32_t flags,
// sender they aren't covered by backpressure. We pass false for an early
// bailout in the backpressure code.
pony_sendv(pony_ctx(), ev->owner, &m->msg, &m->msg, false);

// maybe wake up a scheduler thread if they've all fallen asleep
ponyint_sched_maybe_wakeup_if_all_asleep(-1);
}
233 changes: 131 additions & 102 deletions src/libponyrt/sched/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,46 @@ static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg)
send_msg(from, i, msg, arg);
}

static void wake_suspended_threads()
static void wake_suspended_threads(int32_t current_scheduler_id)
{
uint32_t current_active_scheduler_count = get_active_scheduler_count();

// wake up any sleeping threads
while (get_active_scheduler_count() < scheduler_count)
while ((current_active_scheduler_count = get_active_scheduler_count()) < scheduler_count)
{
if(!atomic_exchange_explicit(&scheduler_count_changing, true,
memory_order_acquire))
{
// in case the count changed between the while check and now
if(get_active_scheduler_count() < scheduler_count)
current_active_scheduler_count = get_active_scheduler_count();

if(current_active_scheduler_count < scheduler_count)
{
// send signal to wake up next scheduler thread available
if(ponyint_thread_wake(scheduler[get_active_scheduler_count()].tid,
scheduler[get_active_scheduler_count()].sleep_object))
// if there was an error waking the thread
// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);
// set active_scheduler_count to wake all schedulers
current_active_scheduler_count = scheduler_count;
atomic_store_explicit(&active_scheduler_count, current_active_scheduler_count,
memory_order_relaxed);
}

// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);

// send multiple signals to wake up all schedulers that should be awake
// this is somewhat wasteful, but, it's better than risking some
// schedulers never waking up. If a scheduler is already awake,
// the signal is disregarded
for(int j = 0; j < 3; j++)
{
for(uint32_t i = 0; i < current_active_scheduler_count; i++)
{
if((int32_t)i != current_scheduler_id)
{
ponyint_thread_wake(scheduler[i].tid, scheduler[i].sleep_object);
}
}
}
else
// if there are no scheduler threads left to unlock
// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);
}
}
}
Expand Down Expand Up @@ -283,7 +297,7 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2)
{
send_msg_all(sched->index, SCHED_TERMINATE, 0);

wake_suspended_threads();
wake_suspended_threads(sched->index);

sched->ack_token++;
sched->ack_count = 0;
Expand Down Expand Up @@ -443,69 +457,68 @@ static pony_actor_t* steal(scheduler_t* sched)
&& !atomic_exchange_explicit(&scheduler_count_changing, true,
memory_order_acquire))
{
// decrement active_scheduler_count so other schedulers know we're
// sleeping
uint32_t sched_count = atomic_load_explicit(&active_scheduler_count,
memory_order_relaxed);

// make sure the scheduler count didn't change
pony_assert(sched_count == current_active_scheduler_count);

atomic_store_explicit(&active_scheduler_count, sched_count - 1,
memory_order_relaxed);

// unlock the bool that controls modifying the active scheduler count
// variable
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);

// let sched 0 know we're suspending only after decrementing
// active_scheduler_count to avoid a race condition between
// when we update active_scheduler_count and scheduler 0 processes
// the SCHED_SUSPEND message we send it. If we don't do this,
// and scheduler 0 processes the SCHED_SUSPEND message before we
// decrement active_scheduler_count, it could think that
// active_scheduler_count > block_count and not start the CNF/ACK
// process for termination and potentiall hang the runtime instead
// of allowing it to reach quiescence.
send_msg(sched->index, 0, SCHED_SUSPEND, 0);

// dtrace suspend notification
DTRACE1(THREAD_SUSPEND, (uintptr_t)sched);

// sleep waiting for signal to wake up again
ponyint_thread_suspend(sched->sleep_object);

bool scc = atomic_load_explicit(&scheduler_count_changing,
memory_order_acquire);

// make sure scheduler_count_changing is true
pony_assert(scc);

// increment active_scheduler_count so other schedulers know we're
// awake again
sched_count = atomic_load_explicit(&active_scheduler_count,
memory_order_relaxed);

// make sure the scheduler count is correct still
pony_assert((sched_count + 1) == current_active_scheduler_count);

atomic_store_explicit(&active_scheduler_count, sched_count + 1,
memory_order_relaxed);

// unlock the bool that controls modifying the active scheduler count
// variable. this is because the signalling thread locks the control
// variable before signalling
scc = false;
atomic_store_explicit(&scheduler_count_changing, scc,
memory_order_release);

// dtrace resume notification
DTRACE1(THREAD_RESUME, (uintptr_t)sched);

// reset steal_attempts so we try to steal from all other schedulers
// prior to suspending again
steal_attempts = 0;
// can only sleep if we're scheduler > 0 or if we're scheduler 0 and
// there is at least one noisy actor registered
if((sched->index > 0) || ((sched->index == 0) && sched->asio_noisy))
{
// decrement active_scheduler_count so other schedulers know we're
// sleeping
uint32_t sched_count = atomic_load_explicit(&active_scheduler_count,
memory_order_relaxed);

// make sure the scheduler count didn't change
pony_assert(sched_count == current_active_scheduler_count);

atomic_store_explicit(&active_scheduler_count, sched_count - 1,
memory_order_relaxed);

// unlock the bool that controls modifying the active scheduler count
// variable
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);

// let sched 0 know we're suspending only after decrementing
// active_scheduler_count to avoid a race condition between
// when we update active_scheduler_count and scheduler 0 processes
// the SCHED_SUSPEND message we send it. If we don't do this,
// and scheduler 0 processes the SCHED_SUSPEND message before we
// decrement active_scheduler_count, it could think that
// active_scheduler_count > block_count and not start the CNF/ACK
// process for termination and potentiall hang the runtime instead
// of allowing it to reach quiescence.
send_msg(sched->index, 0, SCHED_SUSPEND, 0);

// dtrace suspend notification
DTRACE1(THREAD_SUSPEND, (uintptr_t)sched);

while(get_active_scheduler_count() <= (uint32_t)sched->index)
{
// sleep waiting for signal to wake up again
ponyint_thread_suspend(sched->sleep_object);
}

// dtrace resume notification
DTRACE1(THREAD_RESUME, (uintptr_t)sched);

// reset steal_attempts so we try to steal from all other schedulers
// prior to suspending again
steal_attempts = 0;
}
else
{
pony_assert(sched->index == 0);
pony_assert(!sched->asio_noisy);

// unlock the bool that controls modifying the active scheduler count
// variable
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);

// send block message if there are no noisy actors registered
// with the ASIO thread and this is scheduler 0
send_msg(sched->index, 0, SCHED_BLOCK, 0);
block_sent = true;
}
}
else if(!sched->asio_noisy)
{
Expand Down Expand Up @@ -570,7 +583,7 @@ static void run(scheduler_t* sched)
// the extra scheduler threads would keep being woken up and then go back
// to sleep over and over again.
if(ponyint_mutemap_size(&sched->mute_mapping) > 0)
ponyint_sched_maybe_wakeup();
ponyint_sched_maybe_wakeup(sched->index);

// Run the current actor and get the next actor.
bool reschedule = ponyint_actor_run(&sched->ctx, actor, PONY_SCHED_BATCH);
Expand Down Expand Up @@ -673,10 +686,6 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin,
if(threads == 0)
threads = ponyint_cpu_count();

// If no minimum thread count is specified, use # of threads
if(min_threads == 0)
min_threads = threads;

// If minimum thread count is > thread count, cap it at thread count
if(min_threads > threads)
min_threads = threads;
Expand Down Expand Up @@ -827,7 +836,17 @@ void ponyint_sched_unnoisy_asio(int32_t from)
}

// Maybe wake up a scheduler thread if possible
void ponyint_sched_maybe_wakeup()
void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id)
{
uint32_t current_active_scheduler_count = get_active_scheduler_count();

// wake up threads is the current active count is 0
if(current_active_scheduler_count == 0)
ponyint_sched_maybe_wakeup(current_scheduler_id);
}

// Maybe wake up a scheduler thread if possible
void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id)
{
uint32_t current_active_scheduler_count = get_active_scheduler_count();

Expand All @@ -837,23 +856,33 @@ void ponyint_sched_maybe_wakeup()
memory_order_acquire))
{
// in case the count changed between the while check and now
if(get_active_scheduler_count() < scheduler_count)
current_active_scheduler_count = get_active_scheduler_count();

if(current_active_scheduler_count < scheduler_count)
{
// send signal to wake up next scheduler thread available
if(ponyint_thread_wake(scheduler[get_active_scheduler_count()].tid,
scheduler[get_active_scheduler_count()].sleep_object))
// if there was an error waking the thread
// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);
// increment active_scheduler_count to wake a new scheduler up
current_active_scheduler_count++;
atomic_store_explicit(&active_scheduler_count, current_active_scheduler_count,
memory_order_relaxed);
}

// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);

// send multiple signals to wake up all schedulers that should be awake
// this is somewhat wasteful, but, it's better than risking some
// schedulers never waking up. If a scheduler is already awake,
// the signal is disregarded
for(int j = 0; j < 3; j++)
{
for(uint32_t i = 0; i < current_active_scheduler_count; i++)
{
if((int32_t)i != current_scheduler_id)
ponyint_thread_wake(scheduler[i].tid, scheduler[i].sleep_object);
}
}
else
// if there are no scheduler threads left to unlock
// unlock the bool that controls modifying the active scheduler count
// variable.
atomic_store_explicit(&scheduler_count_changing, false,
memory_order_release);
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/libponyrt/sched/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ void ponyint_sched_noisy_asio(int32_t from);
void ponyint_sched_unnoisy_asio(int32_t from);

// Try and wake up a sleeping scheduler thread to help with load
void ponyint_sched_maybe_wakeup();
void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id);

// Try and wake up a sleeping scheduler thread only if all scheduler
// threads are asleep
void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id);

PONY_EXTERN_C_END

Expand Down

0 comments on commit 9d2150e

Please sign in to comment.