Skip to content

Commit

Permalink
Dynamic scheduler thread scaling based on workload
Browse files Browse the repository at this point in the history
Prior to this commit, the runtime would start up a specific number
of scheduler threads (by default the same as the number of physical
cores) on initialization and these scheduler threads would run
actors and send block/unblock messages and steal actors from each
other regardless of how many actors and what the workload of the
program actually was. This usually resulted in wasted cpu cycles
and cache thrashing if there wasn't enough work to keep all
scheduler threads busy.

This commit changes things so that the runtime still starts up
the threads on initialization, but now the threads can suspend
execution if there isn't enough work to do to minimize the work
stealing overhead. The rough outline of how this works is:

* We now have three variables related to number of schedulers;
  `maximum_scheduler_count` (the normal `--ponythreads` option),
  `active_scheduler_count`, and `minimum_scheduler_count`
  (a new `--ponyminthreads` option)
* On startup, we create all possible scheduler threads (up to
  `maximum_scheduler_count`)
* We can never have more than `maximum_scheduler_count` threads
  active at a time
* We can never have less than `minimum_scheduler_count` threads
  active at a time
* Scheduler threads can suspend themselves (i.e. effectively
  pretend as if they don't exist)
* A scheduler thread can only suspend itself if its actor queue
  is empty and it has no actors in it's mute map and it would
  normally send a block message
* Only one scheduler thread can suspend or resume at a time (the
  largest one running or the smallest one suspended respectively)
* We can never skip a scheduler thread and suspend or wake up a
  scheduler thread out of order (i.e. thread 6 is active, but
  thread 5 gets suspended or thread 5 is suspended but thread 6
  gets resumed)
* If there isn't enough work and a scheduler thread would normally
  block and it's the largest active scheduler thread, it suspends
  itself instead
* If there isn't enough work and a scheduler thread would normally
  block and it's not the largest active scheduler thread, it does
  normal scheduler block message sending
* If there's a lot of work to do and at least one actor is muted
  in a scheduler thread, that thread tries to resume a suspended
  scheduler thread (if there are any) every time it is about to
  run an actor. NOTE: This could result in a pathological case
  where only one thread has a muted actor but there is only one
  overloaded actor. In this case the extra scheduler threads will
  keep being woken up and then go back to sleep over and over again.
* The overhead to check if this scheduler thread is a candidate to
  be suspended (`&scheduler[current_active_scheduler_count - 1] ==
  current scheduler address`) is a load and single branch check
* The overhead to check if this scheduler thread is a candidate to
  be suspended (because `&scheduler[current_active_scheduler_count
  - 1] == current scheduler address`) but cannot actually be
  suspended because we're at `maximum_scheduler_count ` is one
  branch (this is in addition to the overhead from the previous
  bullet)
* The overhead to check if there are any scheduler threads to
  resume is a load and single branch check

The implementation of the scheduler suspend/resume is different
depending on the platform.

For Windows, it relies on Event Objects and `WaitForSingleObject`
to suspend threads and `SetEvent` to wake suspended threads.

For Posix environments, by default it relies on signals (specifically,
SIGUSR2) as they are quicker than other mechanisms (pthread
condition variables) (according to stackoverflow at:
https://stackoverflow.com/a/4676069 and
https://stackoverflow.com/a/23945651). It uses `sigwait` to
suspend threads and `pthread_kill` to wake suspended threads. The
signal allotted for this is `SIGUSR2` and so `SIGUSR2` has been
disabled for use in the `signals` package with an error indicating
that it is used by the runtime.
An alternative implementation using pthread condition variables is
also available via a `use=scheduler_scaling_pthreads` argument to
make. This implementation relies on pthread condition variables
and frees `SIGUSR2` so it is available for use in the `signals`
package. It uses `pthread_cond_wait` to suspend threads and
`pthread_cond_signal` to wake suspended threads.

The old behavior of having all scheduler threads active all the
time can be achieved by passing `--ponyminthreads=9999999` as an
argument to a program (because minimum scheduler threads is capped
to never exceed total scheduler threads).

This commit also adds DTRACE probes for thread suspend and thread
resume.

This commit also switches from using `signal` to `sigaction` for
the epoll/kqueue asio signals logic because `sigaction` is more
robust and reliable across platforms
(https://stackoverflow.com/a/232711).
  • Loading branch information
dipinhora committed Dec 16, 2017
1 parent 12505cd commit d026696
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 25 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ ifdef use
ALL_CFLAGS += -DUSE_ACTOR_CONTINUATIONS
PONY_BUILD_DIR := $(PONY_BUILD_DIR)-actor_continuations
endif

ifneq (,$(filter $(use), scheduler_scaling_pthreads))
ALL_CFLAGS += -DUSE_SCHEDULER_SCALING_PTHREADS
PONY_BUILD_DIR := $(PONY_BUILD_DIR)-scheduler_scaling_pthreads
endif
endif

ifdef config
Expand Down Expand Up @@ -980,6 +985,7 @@ help:
@echo ' dtrace'
@echo ' actor_continuations'
@echo ' coverage'
@echo ' scheduler_scaling_pthreads'
@echo
@echo 'TARGETS:'
@echo ' libponyc Pony compiler library'
Expand Down
14 changes: 11 additions & 3 deletions packages/signals/sig.pony
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ primitive Sig
end

fun usr2(): U32 =>
ifdef bsd or osx then 31
elseif linux then 12
else compile_error "no SIGUSR2"
ifdef not "scheduler_scaling_pthreads" then
ifdef bsd or osx then 31
elseif linux then 12
else compile_error "no SIGUSR2"
end
else
ifdef linux or bsd or osx then
compile_error "SIGUSR2 reserved for runtime use"
else
compile_error "no SIGUSR2"
end
end

fun rt(n: U32): U32 ? =>
Expand Down
12 changes: 12 additions & 0 deletions src/common/dtrace_probes.d
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,16 @@ provider pony {
*/
probe work__steal__failure(uintptr_t scheduler, uintptr_t victim);

/**
* Fired when a scheduler suspends
* @param scheduler is the scheduler that suspended
*/
probe thread__suspend(uintptr_t scheduler);

/**
* Fired when a scheduler resumes
* @param scheduler is the scheduler that resumed
*/
probe thread__resume(uintptr_t scheduler);

};
11 changes: 11 additions & 0 deletions src/common/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
*/
#ifdef PLATFORM_IS_POSIX_BASED
# include <pthread.h>
# include <signal.h>
# define pony_thread_id_t pthread_t
#if defined(USE_SCHEDULER_SCALING_PTHREADS)
# define pony_signal_event_t pthread_cond_t*
#else
# define pony_signal_event_t uint32_t
#endif

typedef void* (*thread_fn) (void* arg);

# define DECLARE_THREAD_FN(NAME) void* NAME (void* arg)
#elif defined(PLATFORM_IS_WINDOWS)
# include <process.h>
# define pony_thread_id_t HANDLE
# define pony_signal_event_t HANDLE

typedef uint32_t(__stdcall *thread_fn) (void* arg);

Expand Down Expand Up @@ -50,4 +57,8 @@ void ponyint_thread_detach(pony_thread_id_t thread);

pony_thread_id_t ponyint_thread_self();

void ponyint_thread_suspend(pony_signal_event_t signal);

void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal);

#endif
58 changes: 54 additions & 4 deletions src/libponyrt/asio/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ static void signal_handler(int sig)
if(sig >= MAX_SIGNAL)
return;

// Reset the signal handler.
signal(sig, signal_handler);
asio_backend_t* b = ponyint_asio_get_backend();
pony_assert(b != NULL);
asio_event_t* ev = atomic_load_explicit(&b->sighandlers[sig],
Expand All @@ -73,6 +71,11 @@ static void signal_handler(int sig)
eventfd_write(ev->fd, 1);
}

static void empty_signal_handler(int sig)
{
(void) sig;
}

static void handle_queue(asio_backend_t* b)
{
asio_msg_t* msg;
Expand Down Expand Up @@ -117,6 +120,19 @@ asio_backend_t* ponyint_asio_backend_init()

epoll_ctl(b->epfd, EPOLL_CTL_ADD, b->wakeup, &ep);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
new_action.sa_handler = empty_signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL);
#endif

return b;
}

Expand Down Expand Up @@ -183,6 +199,15 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch)
asio_backend_t* b = arg;
pony_assert(b != NULL);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we block signals related to scheduler sleeping/waking
// so they queue up to avoid race conditions
sigset_t set;
sigemptyset(&set);
sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

while(!atomic_load_explicit(&b->terminate, memory_order_relaxed))
{
int event_cnt = epoll_wait(b->epfd, b->events, MAX_EVENTS, -1);
Expand Down Expand Up @@ -320,7 +345,15 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev)
atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, ev,
memory_order_release, memory_order_relaxed))
{
signal(sig, signal_handler);
struct sigaction new_action;
new_action.sa_handler = signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(sig, &new_action, NULL);

ev->fd = eventfd(0, EFD_NONBLOCK);
ep.events |= EPOLLIN;
} else {
Expand Down Expand Up @@ -393,7 +426,24 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev)
atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, NULL,
memory_order_release, memory_order_relaxed))
{
signal(sig, SIG_DFL);
struct sigaction new_action;

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_DFL;

sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(sig, &new_action, NULL);

close(ev->fd);
ev->fd = -1;
}
Expand Down
64 changes: 62 additions & 2 deletions src/libponyrt/asio/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ struct asio_backend_t
messageq_t q;
};

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
static void empty_signal_handler(int sig)
{
(void) sig;
}
#endif

asio_backend_t* ponyint_asio_backend_init()
{
asio_backend_t* b = POOL_ALLOC(asio_backend_t);
Expand All @@ -49,6 +56,19 @@ asio_backend_t* ponyint_asio_backend_init()
struct timespec t = {0, 0};
kevent(b->kq, &new_event, 1, NULL, 0, &t);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
new_action.sa_handler = empty_signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL);
#endif

return b;
}

Expand Down Expand Up @@ -141,6 +161,16 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch)
pony_register_thread();
asio_backend_t* b = arg;
pony_assert(b != NULL);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we block signals related to scheduler sleeping/waking
// so they queue up to avoid race conditions
sigset_t set;
sigemptyset(&set);
sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

struct kevent fired[MAX_EVENTS];

while(b->kq != -1)
Expand Down Expand Up @@ -277,7 +307,22 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev)

if(ev->flags & ASIO_SIGNAL)
{
signal((int)ev->nsec, SIG_IGN);
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_IGN;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction((int)ev->nsec, &new_action, NULL);

EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, ev);
i++;
}
Expand Down Expand Up @@ -369,7 +414,22 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev)

if(ev->flags & ASIO_SIGNAL)
{
signal((int)ev->nsec, SIG_DFL);
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_DFL;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction((int)ev->nsec, &new_action, NULL);

EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_DELETE, 0, 0, ev);
i++;
}
Expand Down
58 changes: 58 additions & 0 deletions src/libponyrt/platform/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
typedef cpuset_t cpu_set_t;
#endif

#if defined(USE_SCHEDULER_SCALING_PTHREADS)
static pthread_mutex_t sleep_mut;

static pthread_once_t sleep_mut_once = PTHREAD_ONCE_INIT;

void sleep_mut_init()
{
pthread_mutex_init(&sleep_mut, NULL);
}
#endif

#if defined(PLATFORM_IS_LINUX)

#include <dlfcn.h>
Expand Down Expand Up @@ -201,6 +212,11 @@ bool ponyint_thread_create(pony_thread_id_t* thread, thread_fn start,
if(pthread_create(thread, NULL, start, arg))
return false;
#endif

#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS)
pthread_once(&sleep_mut_once, sleep_mut_init);
#endif

return true;
}

Expand Down Expand Up @@ -232,3 +248,45 @@ pony_thread_id_t ponyint_thread_self()
return pthread_self();
#endif
}

void ponyint_thread_suspend(pony_signal_event_t signal)
{
#ifdef PLATFORM_IS_WINDOWS
WaitForSingleObject(signal, INFINITE);
#elif defined(USE_SCHEDULER_SCALING_PTHREADS)
int ret;
// lock mutex
ret = pthread_mutex_lock(&sleep_mut);

// wait for condition variable (will sleep and release mutex)
ret = pthread_cond_wait(signal, &sleep_mut);
// TODO: What to do if `ret` is an unrecoverable error?

// unlock mutex
ret = pthread_mutex_unlock(&sleep_mut);
#else
int sig;
sigset_t sigmask;
sigemptyset(&sigmask); /* zero out all bits */
sigaddset(&sigmask, signal); /* unblock desired signal */

// sleep waiting for signal to wake up again
sigwait(&sigmask, &sig);
#endif
}

void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal)
{
#if defined(PLATFORM_IS_WINDOWS)
(void) thread;
SetEvent(scheduler[current_active_scheduler_count].wait_event_object);
#elif defined(USE_SCHEDULER_SCALING_PTHREADS)
(void) thread;
int ret;
// signal condition variable
ret = pthread_cond_signal(signal);
// TODO: What to do if `ret` is an unrecoverable error?
#else
pthread_kill(thread, signal);
#endif
}
Loading

0 comments on commit d026696

Please sign in to comment.