Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic scheduler thread scaling based on workload #2386

Merged
merged 3 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ ifdef use
ALL_CFLAGS += -DUSE_ACTOR_CONTINUATIONS
PONY_BUILD_DIR := $(PONY_BUILD_DIR)-actor_continuations
endif

ifneq (,$(filter $(use), scheduler_scaling_pthreads))
ALL_CFLAGS += -DUSE_SCHEDULER_SCALING_PTHREADS
PONY_BUILD_DIR := $(PONY_BUILD_DIR)-scheduler_scaling_pthreads
endif
endif

ifdef config
Expand Down Expand Up @@ -191,7 +196,7 @@ else
endif

ifeq ($(OSTYPE),osx)
ALL_CFLAGS += -mmacosx-version-min=10.8
ALL_CFLAGS += -mmacosx-version-min=10.8 -DUSE_SCHEDULER_SCALING_PTHREADS
ALL_CXXFLAGS += -stdlib=libc++ -mmacosx-version-min=10.8
endif

Expand Down Expand Up @@ -992,6 +997,7 @@ help:
@echo ' actor_continuations'
@echo ' coverage'
@echo ' llvm_link_static'
@echo ' scheduler_scaling_pthreads'
@echo
@echo 'TARGETS:'
@echo ' libponyc Pony compiler library'
Expand Down
14 changes: 11 additions & 3 deletions packages/signals/sig.pony
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ primitive Sig
end

fun usr2(): U32 =>
ifdef bsd or osx then 31
elseif linux then 12
else compile_error "no SIGUSR2"
ifdef not "scheduler_scaling_pthreads" then
ifdef bsd or osx then 31
elseif linux then 12
else compile_error "no SIGUSR2"
end
else
ifdef linux or bsd or osx then
compile_error "SIGUSR2 reserved for runtime use"
else
compile_error "no SIGUSR2"
end
end

fun rt(n: U32): U32 ? =>
Expand Down
12 changes: 12 additions & 0 deletions src/common/dtrace_probes.d
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,16 @@ provider pony {
*/
probe work__steal__failure(uintptr_t scheduler, uintptr_t victim);

/**
* Fired when a scheduler suspends
* @param scheduler is the scheduler that suspended
*/
probe thread__suspend(uintptr_t scheduler);

/**
* Fired when a scheduler resumes
* @param scheduler is the scheduler that resumed
*/
probe thread__resume(uintptr_t scheduler);

};
11 changes: 11 additions & 0 deletions src/common/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
*/
#ifdef PLATFORM_IS_POSIX_BASED
# include <pthread.h>
# include <signal.h>
# define pony_thread_id_t pthread_t
#if defined(USE_SCHEDULER_SCALING_PTHREADS)
# define pony_signal_event_t pthread_cond_t*
#else
# define pony_signal_event_t uint32_t
#endif

typedef void* (*thread_fn) (void* arg);

# define DECLARE_THREAD_FN(NAME) void* NAME (void* arg)
#elif defined(PLATFORM_IS_WINDOWS)
# include <process.h>
# define pony_thread_id_t HANDLE
# define pony_signal_event_t HANDLE

typedef uint32_t(__stdcall *thread_fn) (void* arg);

Expand Down Expand Up @@ -50,4 +57,8 @@ void ponyint_thread_detach(pony_thread_id_t thread);

pony_thread_id_t ponyint_thread_self();

void ponyint_thread_suspend(pony_signal_event_t signal);

void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal);

#endif
60 changes: 56 additions & 4 deletions src/libponyrt/asio/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ static void signal_handler(int sig)
if(sig >= MAX_SIGNAL)
return;

// Reset the signal handler.
signal(sig, signal_handler);
asio_backend_t* b = ponyint_asio_get_backend();
pony_assert(b != NULL);
asio_event_t* ev = atomic_load_explicit(&b->sighandlers[sig],
Expand All @@ -73,6 +71,13 @@ static void signal_handler(int sig)
eventfd_write(ev->fd, 1);
}

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
static void empty_signal_handler(int sig)
{
(void) sig;
}
#endif

static void handle_queue(asio_backend_t* b)
{
asio_msg_t* msg;
Expand Down Expand Up @@ -117,6 +122,19 @@ asio_backend_t* ponyint_asio_backend_init()

epoll_ctl(b->epfd, EPOLL_CTL_ADD, b->wakeup, &ep);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
new_action.sa_handler = empty_signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL);
#endif

return b;
}

Expand Down Expand Up @@ -183,6 +201,15 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch)
asio_backend_t* b = arg;
pony_assert(b != NULL);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we block signals related to scheduler sleeping/waking
// so they queue up to avoid race conditions
sigset_t set;
sigemptyset(&set);
sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

while(!atomic_load_explicit(&b->terminate, memory_order_relaxed))
{
int event_cnt = epoll_wait(b->epfd, b->events, MAX_EVENTS, -1);
Expand Down Expand Up @@ -320,7 +347,15 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev)
atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, ev,
memory_order_release, memory_order_relaxed))
{
signal(sig, signal_handler);
struct sigaction new_action;
new_action.sa_handler = signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(sig, &new_action, NULL);

ev->fd = eventfd(0, EFD_NONBLOCK);
ep.events |= EPOLLIN;
} else {
Expand Down Expand Up @@ -393,7 +428,24 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev)
atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, NULL,
memory_order_release, memory_order_relaxed))
{
signal(sig, SIG_DFL);
struct sigaction new_action;

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_DFL;

sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(sig, &new_action, NULL);

close(ev->fd);
ev->fd = -1;
}
Expand Down
64 changes: 62 additions & 2 deletions src/libponyrt/asio/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ struct asio_backend_t
messageq_t q;
};

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
static void empty_signal_handler(int sig)
{
(void) sig;
}
#endif

asio_backend_t* ponyint_asio_backend_init()
{
asio_backend_t* b = POOL_ALLOC(asio_backend_t);
Expand All @@ -49,6 +56,19 @@ asio_backend_t* ponyint_asio_backend_init()
struct timespec t = {0, 0};
kevent(b->kq, &new_event, 1, NULL, 0, &t);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
new_action.sa_handler = empty_signal_handler;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL);
#endif

return b;
}

Expand Down Expand Up @@ -141,6 +161,16 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch)
pony_register_thread();
asio_backend_t* b = arg;
pony_assert(b != NULL);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// Make sure we block signals related to scheduler sleeping/waking
// so they queue up to avoid race conditions
sigset_t set;
sigemptyset(&set);
sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

struct kevent fired[MAX_EVENTS];

while(b->kq != -1)
Expand Down Expand Up @@ -277,7 +307,22 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev)

if(ev->flags & ASIO_SIGNAL)
{
signal((int)ev->nsec, SIG_IGN);
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_IGN;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction((int)ev->nsec, &new_action, NULL);

EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, ev);
i++;
}
Expand Down Expand Up @@ -369,7 +414,22 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev)

if(ev->flags & ASIO_SIGNAL)
{
signal((int)ev->nsec, SIG_DFL);
// Make sure we ignore signals related to scheduler sleeping/waking
// as the default for those signals is termination
struct sigaction new_action;
#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL)
new_action.sa_handler = empty_signal_handler;
else
#endif
new_action.sa_handler = SIG_DFL;
sigemptyset (&new_action.sa_mask);

// ask to restart interrupted syscalls to match `signal` behavior
new_action.sa_flags = SA_RESTART;

sigaction((int)ev->nsec, &new_action, NULL);

EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_DELETE, 0, 0, ev);
i++;
}
Expand Down
60 changes: 60 additions & 0 deletions src/libponyrt/platform/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
typedef cpuset_t cpu_set_t;
#endif

#if defined(USE_SCHEDULER_SCALING_PTHREADS)
static pthread_mutex_t sleep_mut;

static pthread_once_t sleep_mut_once = PTHREAD_ONCE_INIT;

void sleep_mut_init()
{
pthread_mutex_init(&sleep_mut, NULL);
}
#endif

#if defined(PLATFORM_IS_LINUX)

#include <dlfcn.h>
Expand Down Expand Up @@ -201,6 +212,11 @@ bool ponyint_thread_create(pony_thread_id_t* thread, thread_fn start,
if(pthread_create(thread, NULL, start, arg))
return false;
#endif

#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS)
pthread_once(&sleep_mut_once, sleep_mut_init);
#endif

return true;
}

Expand Down Expand Up @@ -232,3 +248,47 @@ pony_thread_id_t ponyint_thread_self()
return pthread_self();
#endif
}

void ponyint_thread_suspend(pony_signal_event_t signal)
{
#ifdef PLATFORM_IS_WINDOWS
WaitForSingleObject(signal, INFINITE);
#elif defined(USE_SCHEDULER_SCALING_PTHREADS)
int ret;
// lock mutex
ret = pthread_mutex_lock(&sleep_mut);

// wait for condition variable (will sleep and release mutex)
ret = pthread_cond_wait(signal, &sleep_mut);
// TODO: What to do if `ret` is an unrecoverable error?
(void) ret;

// unlock mutex
ret = pthread_mutex_unlock(&sleep_mut);
#else
int sig;
sigset_t sigmask;
sigemptyset(&sigmask); /* zero out all bits */
sigaddset(&sigmask, signal); /* unblock desired signal */

// sleep waiting for signal to wake up again
sigwait(&sigmask, &sig);
#endif
}

void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal)
{
#if defined(PLATFORM_IS_WINDOWS)
(void) thread;
SetEvent(signal);
#elif defined(USE_SCHEDULER_SCALING_PTHREADS)
(void) thread;
int ret;
// signal condition variable
ret = pthread_cond_signal(signal);
// TODO: What to do if `ret` is an unrecoverable error?
(void) ret;
#else
pthread_kill(thread, signal);
#endif
}
Loading