diff --git a/Makefile b/Makefile index 4d407a98ba..625da253fc 100644 --- a/Makefile +++ b/Makefile @@ -163,6 +163,11 @@ ifdef use ALL_CFLAGS += -DUSE_ACTOR_CONTINUATIONS PONY_BUILD_DIR := $(PONY_BUILD_DIR)-actor_continuations endif + + ifneq (,$(filter $(use), scheduler_scaling_pthreads)) + ALL_CFLAGS += -DUSE_SCHEDULER_SCALING_PTHREADS + PONY_BUILD_DIR := $(PONY_BUILD_DIR)-scheduler_scaling_pthreads + endif endif ifdef config @@ -191,7 +196,7 @@ else endif ifeq ($(OSTYPE),osx) - ALL_CFLAGS += -mmacosx-version-min=10.8 + ALL_CFLAGS += -mmacosx-version-min=10.8 -DUSE_SCHEDULER_SCALING_PTHREADS ALL_CXXFLAGS += -stdlib=libc++ -mmacosx-version-min=10.8 endif @@ -992,6 +997,7 @@ help: @echo ' actor_continuations' @echo ' coverage' @echo ' llvm_link_static' + @echo ' scheduler_scaling_pthreads' @echo @echo 'TARGETS:' @echo ' libponyc Pony compiler library' diff --git a/packages/signals/sig.pony b/packages/signals/sig.pony index 13c4b75523..af150df965 100644 --- a/packages/signals/sig.pony +++ b/packages/signals/sig.pony @@ -199,9 +199,17 @@ primitive Sig end fun usr2(): U32 => - ifdef bsd or osx then 31 - elseif linux then 12 - else compile_error "no SIGUSR2" + ifdef not "scheduler_scaling_pthreads" then + ifdef bsd or osx then 31 + elseif linux then 12 + else compile_error "no SIGUSR2" + end + else + ifdef linux or bsd or osx then + compile_error "SIGUSR2 reserved for runtime use" + else + compile_error "no SIGUSR2" + end end fun rt(n: U32): U32 ? => diff --git a/src/common/dtrace_probes.d b/src/common/dtrace_probes.d index da325debe2..0649de73dc 100644 --- a/src/common/dtrace_probes.d +++ b/src/common/dtrace_probes.d @@ -170,4 +170,16 @@ provider pony { */ probe work__steal__failure(uintptr_t scheduler, uintptr_t victim); + /** + * Fired when a scheduler suspends + * @param scheduler is the scheduler that suspended + */ + probe thread__suspend(uintptr_t scheduler); + + /** + * Fired when a scheduler resumes + * @param scheduler is the scheduler that resumed + */ + probe thread__resume(uintptr_t scheduler); + }; diff --git a/src/common/threads.h b/src/common/threads.h index 79eed22fdd..6ed464f964 100644 --- a/src/common/threads.h +++ b/src/common/threads.h @@ -8,7 +8,13 @@ */ #ifdef PLATFORM_IS_POSIX_BASED # include +# include # define pony_thread_id_t pthread_t +#if defined(USE_SCHEDULER_SCALING_PTHREADS) +# define pony_signal_event_t pthread_cond_t* +#else +# define pony_signal_event_t uint32_t +#endif typedef void* (*thread_fn) (void* arg); @@ -16,6 +22,7 @@ typedef void* (*thread_fn) (void* arg); #elif defined(PLATFORM_IS_WINDOWS) # include # define pony_thread_id_t HANDLE +# define pony_signal_event_t HANDLE typedef uint32_t(__stdcall *thread_fn) (void* arg); @@ -50,4 +57,8 @@ void ponyint_thread_detach(pony_thread_id_t thread); pony_thread_id_t ponyint_thread_self(); +void ponyint_thread_suspend(pony_signal_event_t signal); + +void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal); + #endif diff --git a/src/libponyrt/asio/epoll.c b/src/libponyrt/asio/epoll.c index c2ee1c1983..d0b0399887 100644 --- a/src/libponyrt/asio/epoll.c +++ b/src/libponyrt/asio/epoll.c @@ -56,8 +56,6 @@ static void signal_handler(int sig) if(sig >= MAX_SIGNAL) return; - // Reset the signal handler. - signal(sig, signal_handler); asio_backend_t* b = ponyint_asio_get_backend(); pony_assert(b != NULL); asio_event_t* ev = atomic_load_explicit(&b->sighandlers[sig], @@ -73,6 +71,13 @@ static void signal_handler(int sig) eventfd_write(ev->fd, 1); } +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) +static void empty_signal_handler(int sig) +{ + (void) sig; +} +#endif + static void handle_queue(asio_backend_t* b) { asio_msg_t* msg; @@ -117,6 +122,19 @@ asio_backend_t* ponyint_asio_backend_init() epoll_ctl(b->epfd, EPOLL_CTL_ADD, b->wakeup, &ep); +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we ignore signals related to scheduler sleeping/waking + // as the default for those signals is termination + struct sigaction new_action; + new_action.sa_handler = empty_signal_handler; + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL); +#endif + return b; } @@ -183,6 +201,15 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) asio_backend_t* b = arg; pony_assert(b != NULL); +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we block signals related to scheduler sleeping/waking + // so they queue up to avoid race conditions + sigset_t set; + sigemptyset(&set); + sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL); + pthread_sigmask(SIG_BLOCK, &set, NULL); +#endif + while(!atomic_load_explicit(&b->terminate, memory_order_relaxed)) { int event_cnt = epoll_wait(b->epfd, b->events, MAX_EVENTS, -1); @@ -320,7 +347,15 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, ev, memory_order_release, memory_order_relaxed)) { - signal(sig, signal_handler); + struct sigaction new_action; + new_action.sa_handler = signal_handler; + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction(sig, &new_action, NULL); + ev->fd = eventfd(0, EFD_NONBLOCK); ep.events |= EPOLLIN; } else { @@ -393,7 +428,24 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) atomic_compare_exchange_strong_explicit(&b->sighandlers[sig], &prev, NULL, memory_order_release, memory_order_relaxed)) { - signal(sig, SIG_DFL); + struct sigaction new_action; + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we ignore signals related to scheduler sleeping/waking + // as the default for those signals is termination + if(sig == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; + else +#endif + new_action.sa_handler = SIG_DFL; + + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction(sig, &new_action, NULL); + close(ev->fd); ev->fd = -1; } diff --git a/src/libponyrt/asio/kqueue.c b/src/libponyrt/asio/kqueue.c index a4e675e546..350a7f3b9b 100644 --- a/src/libponyrt/asio/kqueue.c +++ b/src/libponyrt/asio/kqueue.c @@ -27,6 +27,13 @@ struct asio_backend_t messageq_t q; }; +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) +static void empty_signal_handler(int sig) +{ + (void) sig; +} +#endif + asio_backend_t* ponyint_asio_backend_init() { asio_backend_t* b = POOL_ALLOC(asio_backend_t); @@ -49,6 +56,19 @@ asio_backend_t* ponyint_asio_backend_init() struct timespec t = {0, 0}; kevent(b->kq, &new_event, 1, NULL, 0, &t); +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we ignore signals related to scheduler sleeping/waking + // as the default for those signals is termination + struct sigaction new_action; + new_action.sa_handler = empty_signal_handler; + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction(PONY_SCHED_SLEEP_WAKE_SIGNAL, &new_action, NULL); +#endif + return b; } @@ -141,6 +161,16 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) pony_register_thread(); asio_backend_t* b = arg; pony_assert(b != NULL); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we block signals related to scheduler sleeping/waking + // so they queue up to avoid race conditions + sigset_t set; + sigemptyset(&set); + sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL); + pthread_sigmask(SIG_BLOCK, &set, NULL); +#endif + struct kevent fired[MAX_EVENTS]; while(b->kq != -1) @@ -277,7 +307,22 @@ PONY_API void pony_asio_event_subscribe(asio_event_t* ev) if(ev->flags & ASIO_SIGNAL) { - signal((int)ev->nsec, SIG_IGN); + // Make sure we ignore signals related to scheduler sleeping/waking + // as the default for those signals is termination + struct sigaction new_action; +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; + else +#endif + new_action.sa_handler = SIG_IGN; + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction((int)ev->nsec, &new_action, NULL); + EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, ev); i++; } @@ -369,7 +414,22 @@ PONY_API void pony_asio_event_unsubscribe(asio_event_t* ev) if(ev->flags & ASIO_SIGNAL) { - signal((int)ev->nsec, SIG_DFL); + // Make sure we ignore signals related to scheduler sleeping/waking + // as the default for those signals is termination + struct sigaction new_action; +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + if((int)ev->nsec == PONY_SCHED_SLEEP_WAKE_SIGNAL) + new_action.sa_handler = empty_signal_handler; + else +#endif + new_action.sa_handler = SIG_DFL; + sigemptyset (&new_action.sa_mask); + + // ask to restart interrupted syscalls to match `signal` behavior + new_action.sa_flags = SA_RESTART; + + sigaction((int)ev->nsec, &new_action, NULL); + EV_SET(&event[i], ev->nsec, EVFILT_SIGNAL, EV_DELETE, 0, 0, ev); i++; } diff --git a/src/libponyrt/platform/threads.c b/src/libponyrt/platform/threads.c index fa42750c75..52e8a23c53 100644 --- a/src/libponyrt/platform/threads.c +++ b/src/libponyrt/platform/threads.c @@ -18,6 +18,17 @@ typedef cpuset_t cpu_set_t; #endif +#if defined(USE_SCHEDULER_SCALING_PTHREADS) +static pthread_mutex_t sleep_mut; + +static pthread_once_t sleep_mut_once = PTHREAD_ONCE_INIT; + +void sleep_mut_init() +{ + pthread_mutex_init(&sleep_mut, NULL); +} +#endif + #if defined(PLATFORM_IS_LINUX) #include @@ -201,6 +212,11 @@ bool ponyint_thread_create(pony_thread_id_t* thread, thread_fn start, if(pthread_create(thread, NULL, start, arg)) return false; #endif + +#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) + pthread_once(&sleep_mut_once, sleep_mut_init); +#endif + return true; } @@ -232,3 +248,47 @@ pony_thread_id_t ponyint_thread_self() return pthread_self(); #endif } + +void ponyint_thread_suspend(pony_signal_event_t signal) +{ +#ifdef PLATFORM_IS_WINDOWS + WaitForSingleObject(signal, INFINITE); +#elif defined(USE_SCHEDULER_SCALING_PTHREADS) + int ret; + // lock mutex + ret = pthread_mutex_lock(&sleep_mut); + + // wait for condition variable (will sleep and release mutex) + ret = pthread_cond_wait(signal, &sleep_mut); + // TODO: What to do if `ret` is an unrecoverable error? + (void) ret; + + // unlock mutex + ret = pthread_mutex_unlock(&sleep_mut); +#else + int sig; + sigset_t sigmask; + sigemptyset(&sigmask); /* zero out all bits */ + sigaddset(&sigmask, signal); /* unblock desired signal */ + + // sleep waiting for signal to wake up again + sigwait(&sigmask, &sig); +#endif +} + +void ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal) +{ +#if defined(PLATFORM_IS_WINDOWS) + (void) thread; + SetEvent(signal); +#elif defined(USE_SCHEDULER_SCALING_PTHREADS) + (void) thread; + int ret; + // signal condition variable + ret = pthread_cond_signal(signal); + // TODO: What to do if `ret` is an unrecoverable error? + (void) ret; +#else + pthread_kill(thread, signal); +#endif +} diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 459d5b36b1..e31fc57d2c 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -23,6 +23,7 @@ typedef enum SCHED_CNF = 30, SCHED_ACK, SCHED_TERMINATE = 40, + SCHED_SUSPEND = 41, SCHED_UNMUTE_ACTOR = 50, SCHED_NOISY_ASIO = 51, SCHED_UNNOISY_ASIO = 52 @@ -31,12 +32,27 @@ typedef enum // Scheduler global data. static uint32_t asio_cpu; static uint32_t scheduler_count; +static uint32_t min_scheduler_count; +static PONY_ATOMIC(uint32_t) active_scheduler_count; +static PONY_ATOMIC(bool) scheduler_count_changing; static scheduler_t* scheduler; static PONY_ATOMIC(bool) detect_quiescence; static bool use_yield; static mpmcq_t inject; static __pony_thread_local scheduler_t* this_scheduler; +#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) +static pthread_cond_t sleep_cond; +#endif + +/** + * Gets the current active scheduler count + */ +static uint32_t get_active_scheduler_count() +{ + return atomic_load_explicit(&active_scheduler_count, memory_order_relaxed); +} + /** * Gets the next actor from the scheduler queue. */ @@ -84,6 +100,14 @@ static void send_msg(uint32_t from, uint32_t to, sched_msg_t msg, intptr_t arg) (void)from; } +static void send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg) +{ + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + + for(uint32_t i = 0; i < current_active_scheduler_count; i++) + send_msg(from, i, msg, arg); +} + static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg) { send_msg(from, 0, msg, arg); @@ -92,6 +116,29 @@ static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg) send_msg(from, i, msg, arg); } +static void wake_suspended_threads() +{ + // wake up any sleeping threads + while (get_active_scheduler_count() < scheduler_count) + { + if(!atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) + { + // in case the count changed between the while check and now + if(get_active_scheduler_count() < scheduler_count) + // send signal to wake up next scheduler thread available + ponyint_thread_wake(scheduler[get_active_scheduler_count()].tid, + scheduler[get_active_scheduler_count()].sleep_object); + else + // if there are no scheduler threads left to unlock + // unlock the bool that controls modifying the active scheduler count + // variable. + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); + } + } +} + static bool read_msg(scheduler_t* sched) { pony_msgi_t* m; @@ -106,15 +153,26 @@ static bool read_msg(scheduler_t* sched) { switch(m->msg.id) { + case SCHED_SUSPEND: + { + if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && + (sched->block_count == get_active_scheduler_count())) + { + // If we think all threads are blocked, send CNF(token) to everyone. + send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + } + break; + } + case SCHED_BLOCK: { sched->block_count++; if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && - (sched->block_count == scheduler_count)) + (sched->block_count == get_active_scheduler_count())) { // If we think all threads are blocked, send CNF(token) to everyone. - send_msg_all(sched->index, SCHED_CNF, sched->ack_token); + send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); } break; } @@ -191,7 +249,6 @@ static bool read_msg(scheduler_t* sched) return run_queue_changed; } - /** * If we can terminate, return true. If all schedulers are waiting, one of * them will stop the ASIO back end and tell the cycle detector to try to @@ -203,12 +260,16 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) if(sched->terminate) return true; - if(sched->ack_count == scheduler_count) + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + + if(sched->ack_count >= current_active_scheduler_count) { if(sched->asio_stopped) { send_msg_all(sched->index, SCHED_TERMINATE, 0); + wake_suspended_threads(); + sched->ack_token++; sched->ack_count = 0; } else if(ponyint_asio_stop()) { @@ -217,7 +278,7 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) sched->ack_count = 0; // Run another CNF/ACK cycle. - send_msg_all(sched->index, SCHED_CNF, sched->ack_token); + send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); } } @@ -238,12 +299,14 @@ static scheduler_t* choose_victim(scheduler_t* sched) // Back up one. victim--; + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + if(victim < scheduler) // victim is before the first scheduler location // wrap around to the end. - victim = &scheduler[scheduler_count - 1]; + victim = &scheduler[current_active_scheduler_count - 1]; - if(victim == sched->last_victim) + if((victim == sched->last_victim) || (current_active_scheduler_count == 1)) { // If we have tried all possible victims, return no victim. Set our last // victim to ourself to indicate we've started over. @@ -343,16 +406,65 @@ static pony_actor_t* steal(scheduler_t* sched) // stealing for generating far fewer block/unblock messages. if (!block_sent) { - if (steal_attempts < scheduler_count) + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + if (steal_attempts < current_active_scheduler_count) { steal_attempts++; } - else if ((!sched->asio_noisy) && - ((tsc2 - tsc) > 1000000) && + else if (((tsc2 - tsc) > 1000000) && (ponyint_mutemap_size(&sched->mute_mapping) == 0)) { - send_msg(sched->index, 0, SCHED_BLOCK, 0); - block_sent = true; + // if we're the highest active scheduler thread + // and there are more active schedulers than the minimum requested + if ((sched == &scheduler[current_active_scheduler_count - 1]) + && (current_active_scheduler_count > min_scheduler_count) && + !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) + { + // let sched 0 know we're suspending + send_msg(sched->index, 0, SCHED_SUSPEND, 0); + + // dtrace suspend notification + DTRACE1(THREAD_SUSPEND, (uintptr_t)sched); + + // decrement active_scheduler_count so other schedulers know we're + // sleeping + uint32_t sched_count = atomic_load_explicit(&active_scheduler_count, + memory_order_relaxed); + atomic_store_explicit(&active_scheduler_count, sched_count - 1, + memory_order_relaxed); + + // unlock the bool that controls modifying the active scheduler count + // variable + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); + + // sleep waiting for signal to wake up again + ponyint_thread_suspend(sched->sleep_object); + + // increment active_scheduler_count so other schedulers know we're + // awake again + sched_count = atomic_load_explicit(&active_scheduler_count, + memory_order_relaxed); + atomic_store_explicit(&active_scheduler_count, sched_count + 1, + memory_order_relaxed); + + // unlock the bool that controls modifying the active scheduler count + // variable. this is because the signalling thread locks the control + // variable before signalling + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); + + // dtrace resume notification + DTRACE1(THREAD_RESUME, (uintptr_t)sched); + } + else if(!sched->asio_noisy) + { + // Only send block messages if there are no noisy actors registered + // with the ASIO thread + send_msg(sched->index, 0, SCHED_BLOCK, 0); + block_sent = true; + } } } } @@ -399,6 +511,18 @@ static void run(scheduler_t* sched) DTRACE2(ACTOR_SCHEDULED, (uintptr_t)sched, (uintptr_t)actor); } + // We have at least one muted actor... + // Try and wake up a sleeping scheduler thread to help with the load. + // This is to err on the side of caution and wake up more threads in case + // of muted actors rather than potentially not wake up enough threads. + // If there isn't enough work, they'll go back to sleep. + // NOTE: This could result in a pathological case where only one thread + // has a muted actor but there is only one overloaded actor. In this case + // the extra scheduler threads would keep being woken up and then go back + // to sleep over and over again. + if(ponyint_mutemap_size(&sched->mute_mapping) > 0) + ponyint_sched_maybe_wakeup(); + // Run the current actor and get the next actor. bool reschedule = ponyint_actor_run(&sched->ctx, actor, PONY_SCHED_BATCH); pony_actor_t* next = pop_global(sched); @@ -431,6 +555,16 @@ static DECLARE_THREAD_FN(run_thread) scheduler_t* sched = (scheduler_t*) arg; this_scheduler = sched; ponyint_cpu_affinity(sched->cpu); + +#if !defined(PLATFORM_IS_WINDOWS) && !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we block signals related to scheduler sleeping/waking + // so they queue up to avoid race conditions + sigset_t set; + sigemptyset(&set); + sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL); + pthread_sigmask(SIG_BLOCK, &set, NULL); +#endif + run(sched); ponyint_pool_thread_cleanup(); @@ -458,17 +592,34 @@ static void ponyint_sched_shutdown() ) != NULL) { ; } ponyint_messageq_destroy(&scheduler[i].mq); ponyint_mpmcq_destroy(&scheduler[i].q); + +#if defined(PLATFORM_IS_WINDOWS) + // close wait event objects + CloseHandle(scheduler[i].sleep_object); +#elif defined(USE_SCHEDULER_SCALING_PTHREADS) + // set sleep condition object to NULL + scheduler[i].sleep_object = NULL; +#endif } +#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) + int ret; + // destroy pthread condition object + ret = pthread_cond_destroy(&sleep_cond); + // TODO: What to do if `ret` is a non-recoverable error? + (void) ret; +#endif + ponyint_pool_free_size(scheduler_count * sizeof(scheduler_t), scheduler); scheduler = NULL; scheduler_count = 0; + atomic_store_explicit(&active_scheduler_count, 0, memory_order_relaxed); ponyint_mpmcq_destroy(&inject); } pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, - bool pinasio) + bool pinasio, uint32_t min_threads) { pony_register_thread(); @@ -478,7 +629,18 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, if(threads == 0) threads = ponyint_cpu_count(); + // If no minimum thread count is specified, use 1 + if(min_threads == 0) + min_threads = 1; + + // If minimum thread count is > thread count, cap it at thread count + if(min_threads > threads) + min_threads = threads; + scheduler_count = threads; + min_scheduler_count = min_threads; + atomic_store_explicit(&active_scheduler_count, scheduler_count, + memory_order_relaxed); scheduler = (scheduler_t*)ponyint_pool_alloc_size( scheduler_count * sizeof(scheduler_t)); memset(scheduler, 0, scheduler_count * sizeof(scheduler_t)); @@ -486,8 +648,26 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, asio_cpu = ponyint_cpu_assign(scheduler_count, scheduler, nopin, pinasio); +#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) + // initialize pthread condition object + int ret = pthread_cond_init(&sleep_cond, NULL); +#endif + for(uint32_t i = 0; i < scheduler_count; i++) { +#if defined(PLATFORM_IS_WINDOWS) + // create wait event objects + scheduler[i].sleep_object = CreateEvent(NULL, FALSE, FALSE, NULL); +#elif defined(USE_SCHEDULER_SCALING_PTHREADS) + // if it failed, set `sleep_object` to `NULL` for error + if(ret != 0) + scheduler[i].sleep_object = NULL; + else + scheduler[i].sleep_object = &sleep_cond; +#else + scheduler[i].sleep_object = PONY_SCHED_SLEEP_WAKE_SIGNAL; +#endif + scheduler[i].ctx.scheduler = &scheduler[i]; scheduler[i].last_victim = &scheduler[i]; scheduler[i].index = i; @@ -516,6 +696,12 @@ bool ponyint_sched_start(bool library) for(uint32_t i = start; i < scheduler_count; i++) { +#if defined(PLATFORM_IS_WINDOWS) || defined(USE_SCHEDULER_SCALING_PTHREADS) + // there was an error creating a wait event or a pthread condition object + if(scheduler[i].sleep_object == NULL) + return false; +#endif + if(!ponyint_thread_create(&scheduler[i].tid, run_thread, scheduler[i].cpu, &scheduler[i])) return false; @@ -552,6 +738,11 @@ uint32_t ponyint_sched_cores() return scheduler_count; } +uint32_t ponyint_active_sched_count() +{ + return get_active_scheduler_count(); +} + PONY_API void pony_register_thread() { if(this_scheduler != NULL) @@ -592,6 +783,22 @@ void ponyint_sched_unnoisy_asio(int32_t from) send_msg_all(from, SCHED_UNNOISY_ASIO, 0); } +// Maybe wake up a scheduler thread if possible +void ponyint_sched_maybe_wakeup() +{ + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + + // if we have some schedulers that are sleeping, wake one up + if((current_active_scheduler_count < scheduler_count) && + !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) + { + // send signal to wake up next scheduler thread available + ponyint_thread_wake(scheduler[current_active_scheduler_count].tid, + scheduler[current_active_scheduler_count].sleep_object); + } +} + // Manage a scheduler's mute map // // When an actor attempts to send to an overloaded actor, it will be added @@ -640,7 +847,7 @@ void ponyint_sched_mute(pony_ctx_t* ctx, pony_actor_t* sender, pony_actor_t* rec void ponyint_sched_start_global_unmute(uint32_t from, pony_actor_t* actor) { - send_msg_all(from, SCHED_UNMUTE_ACTOR, (intptr_t)actor); + send_msg_all_active(from, SCHED_UNMUTE_ACTOR, (intptr_t)actor); } DECLARE_STACK(ponyint_actorstack, actorstack_t, pony_actor_t); diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 924257146d..45d6852fe8 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -19,6 +19,12 @@ typedef struct scheduler_t scheduler_t; #define SPECIAL_THREADID_IOCP -11 #define SPECIAL_THREADID_EPOLL -12 +#if !defined(PLATFORM_IS_WINDOWS) && !defined(USE_SCHEDULER_SCALING_PTHREADS) +// Signal to use for suspending/resuming threads via `sigwait`/`pthread_kill` +// If you change this, remember to change `signals` package accordingly +#define PONY_SCHED_SLEEP_WAKE_SIGNAL SIGUSR2 +#endif + PONY_EXTERN_C_BEGIN typedef void (*trace_object_fn)(pony_ctx_t* ctx, void* p, pony_type_t* t, @@ -53,6 +59,7 @@ struct scheduler_t bool terminate; bool asio_stopped; bool asio_noisy; + pony_signal_event_t sleep_object; // These are changed primarily by the owning scheduler thread. alignas(64) struct scheduler_t* last_victim; @@ -69,7 +76,7 @@ struct scheduler_t }; pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, - bool pinasio); + bool pinasio, uint32_t min_threads); bool ponyint_sched_start(bool library); @@ -93,6 +100,9 @@ void ponyint_sched_noisy_asio(int32_t from); */ void ponyint_sched_unnoisy_asio(int32_t from); +// Try and wake up a sleeping scheduler thread to help with load +void ponyint_sched_maybe_wakeup(); + PONY_EXTERN_C_END #endif diff --git a/src/libponyrt/sched/start.c b/src/libponyrt/sched/start.c index cde08f2ea0..da2e860df6 100644 --- a/src/libponyrt/sched/start.c +++ b/src/libponyrt/sched/start.c @@ -18,6 +18,7 @@ typedef struct options_t { // concurrent options uint32_t threads; + uint32_t min_threads; uint32_t cd_min_deferred; uint32_t cd_max_deferred; uint32_t cd_conf_group; @@ -38,6 +39,7 @@ static bool language_init; enum { OPT_THREADS, + OPT_MINTHREADS, OPT_CDMIN, OPT_CDMAX, OPT_CDCONF, @@ -53,6 +55,7 @@ enum static opt_arg_t args[] = { {"ponythreads", 0, OPT_ARG_REQUIRED, OPT_THREADS}, + {"ponyminthreads", 0, OPT_ARG_REQUIRED, OPT_MINTHREADS}, {"ponycdmin", 0, OPT_ARG_REQUIRED, OPT_CDMIN}, {"ponycdmax", 0, OPT_ARG_REQUIRED, OPT_CDMAX}, {"ponycdconf", 0, OPT_ARG_REQUIRED, OPT_CDCONF}, @@ -78,6 +81,7 @@ static int parse_opts(int argc, char** argv, options_t* opt) switch(id) { case OPT_THREADS: opt->threads = atoi(s.arg_val); break; + case OPT_MINTHREADS: opt->min_threads = atoi(s.arg_val); break; case OPT_CDMIN: opt->cd_min_deferred = atoi(s.arg_val); break; case OPT_CDMAX: opt->cd_max_deferred = atoi(s.arg_val); break; case OPT_CDCONF: opt->cd_conf_group = atoi(s.arg_val); break; @@ -134,7 +138,7 @@ PONY_API int pony_init(int argc, char** argv) pony_exitcode(0); pony_ctx_t* ctx = ponyint_sched_init(opt.threads, opt.noyield, opt.nopin, - opt.pinasio); + opt.pinasio, opt.min_threads); ponyint_cycle_create(ctx, opt.cd_min_deferred, opt.cd_max_deferred, opt.cd_conf_group); diff --git a/src/ponyc/main.c b/src/ponyc/main.c index 3b43ea03ab..af0e6c5107 100644 --- a/src/ponyc/main.c +++ b/src/ponyc/main.c @@ -116,49 +116,49 @@ static void usage() "The package directory defaults to the current directory.\n" , "Options:\n" - " --version, -v Print the version of the compiler and exit.\n" - " --help, -h Print this help text and exit.\n" - " --debug, -d Don't optimise the output.\n" - " --define, -D Define the specified build flag.\n" + " --version, -v Print the version of the compiler and exit.\n" + " --help, -h Print this help text and exit.\n" + " --debug, -d Don't optimise the output.\n" + " --define, -D Define the specified build flag.\n" " =name\n" - " --strip, -s Strip debug info.\n" - " --path, -p Add an additional search path.\n" - " =path Used to find packages and libraries.\n" - " --output, -o Write output to this directory.\n" - " =path Defaults to the current directory.\n" - " --bin-name, -b Name of executable binary.\n" - " =name Defaults to name of the directory.\n" - " --library, -l Generate a C-API compatible static library.\n" - " --runtimebc Compile with the LLVM bitcode file for the runtime.\n" - " --pic Compile using position independent code.\n" - " --nopic Don't compile using position independent code.\n" - " --docs, -g Generate code documentation.\n" - " --docs-public Generate code documentation for public types only.\n" + " --strip, -s Strip debug info.\n" + " --path, -p Add an additional search path.\n" + " =path Used to find packages and libraries.\n" + " --output, -o Write output to this directory.\n" + " =path Defaults to the current directory.\n" + " --bin-name, -b Name of executable binary.\n" + " =name Defaults to name of the directory.\n" + " --library, -l Generate a C-API compatible static library.\n" + " --runtimebc Compile with the LLVM bitcode file for the runtime.\n" + " --pic Compile using position independent code.\n" + " --nopic Don't compile using position independent code.\n" + " --docs, -g Generate code documentation.\n" + " --docs-public Generate code documentation for public types only.\n" , "Rarely needed options:\n" - " --safe Allow only the listed packages to use C FFI.\n" - " =package With no packages listed, only builtin is allowed.\n" - " --cpu Set the target CPU.\n" - " =name Default is the host CPU.\n" - " --features CPU features to enable or disable.\n" - " =+this,-that Use + to enable, - to disable.\n" - " Defaults to detecting all CPU features from the host.\n" - " --triple Set the target triple.\n" - " =name Defaults to the host triple.\n" - " --stats Print some compiler stats.\n" - " --link-arch Set the linking architecture.\n" - " =name Default is the host architecture.\n" - " --linker Set the linker command to use.\n" - " =name Default is the compiler used to compile ponyc.\n" + " --safe Allow only the listed packages to use C FFI.\n" + " =package With no packages listed, only builtin is allowed.\n" + " --cpu Set the target CPU.\n" + " =name Default is the host CPU.\n" + " --features CPU features to enable or disable.\n" + " =+this,-that Use + to enable, - to disable.\n" + " Defaults to detecting all CPU features from the host.\n" + " --triple Set the target triple.\n" + " =name Defaults to the host triple.\n" + " --stats Print some compiler stats.\n" + " --link-arch Set the linking architecture.\n" + " =name Default is the host architecture.\n" + " --linker Set the linker command to use.\n" + " =name Default is the compiler used to compile ponyc.\n" , "Debugging options:\n" - " --verbose, -V Verbosity level.\n" - " =0 Only print errors.\n" - " =1 Print info on compiler stages.\n" - " =2 More detailed compilation information.\n" - " =3 External tool command lines.\n" - " =4 Very low-level detail.\n" - " --pass, -r Restrict phases.\n" + " --verbose, -V Verbosity level.\n" + " =0 Only print errors.\n" + " =1 Print info on compiler stages.\n" + " =2 More detailed compilation information.\n" + " =3 External tool command lines.\n" + " =4 Very low-level detail.\n" + " --pass, -r Restrict phases.\n" " =parse\n" " =syntax\n" " =sugar\n" @@ -174,48 +174,49 @@ static void usage() " =final\n" " =reach\n" " =paint\n" - " =ir Output LLVM IR.\n" - " =bitcode Output LLVM bitcode.\n" - " =asm Output assembly.\n" - " =obj Output an object file.\n" - " =all The default: generate an executable.\n" - " --ast, -a Output an abstract syntax tree for the whole program.\n" - " --astpackage Output an abstract syntax tree for the main package.\n" - " --trace, -t Enable parse trace.\n" - " --width, -w Width to target when printing the AST.\n" - " =columns Defaults to the terminal width.\n" - " --immerr Report errors immediately rather than deferring.\n" - " --checktree Verify AST well-formedness.\n" - " --verify Verify LLVM IR.\n" - " --extfun Set function default linkage to external.\n" - " --simplebuiltin Use a minimal builtin package.\n" - " --files Print source file names as each is processed.\n" - " --bnf Print out the Pony grammar as human readable BNF.\n" - " --antlr Print out the Pony grammar as an ANTLR file.\n" - " --lint-llvm Run the LLVM linting pass on generated IR.\n" + " =ir Output LLVM IR.\n" + " =bitcode Output LLVM bitcode.\n" + " =asm Output assembly.\n" + " =obj Output an object file.\n" + " =all The default: generate an executable.\n" + " --ast, -a Output an abstract syntax tree for the whole program.\n" + " --astpackage Output an abstract syntax tree for the main package.\n" + " --trace, -t Enable parse trace.\n" + " --width, -w Width to target when printing the AST.\n" + " =columns Defaults to the terminal width.\n" + " --immerr Report errors immediately rather than deferring.\n" + " --checktree Verify AST well-formedness.\n" + " --verify Verify LLVM IR.\n" + " --extfun Set function default linkage to external.\n" + " --simplebuiltin Use a minimal builtin package.\n" + " --files Print source file names as each is processed.\n" + " --bnf Print out the Pony grammar as human readable BNF.\n" + " --antlr Print out the Pony grammar as an ANTLR file.\n" + " --lint-llvm Run the LLVM linting pass on generated IR.\n" , "Runtime options for Pony programs (not for use with ponyc):\n" - " --ponythreads Use N scheduler threads. Defaults to the number of\n" - " cores (not hyperthreads) available.\n" - " --ponycdmin Defer cycle detection until 2^N actors have blocked.\n" - " Defaults to 2^4.\n" - " --ponycdmax Always cycle detect when 2^N actors have blocked.\n" - " Defaults to 2^18.\n" - " --ponycdconf Send cycle detection CNF messages in groups of 2^N.\n" - " Defaults to 2^6.\n" - " --ponygcinitial Defer garbage collection until an actor is using at\n" - " least 2^N bytes. Defaults to 2^14.\n" - " --ponygcfactor After GC, an actor will next be GC'd at a heap memory\n" - " usage N times its current value. This is a floating\n" - " point value. Defaults to 2.0.\n" - " --ponynoyield Do not yield the CPU when no work is available.\n" - " --ponynoblock Do not send block messages to the cycle detector.\n" - " --ponynopin Do not pin scheduler threads or the ASIO thread, even\n" - " if --ponypinasio is set.\n" - " threads are pinned to CPUs.\n" - " --ponypinasio Pin the ASIO thread to a CPU the way scheduler\n" - " threads are pinned to CPUs.\n" - " --ponyversion Print the version of the compiler and exit.\n" + " --ponythreads Use N scheduler threads. Defaults to the number of\n" + " cores (not hyperthreads) available.\n" + " --ponyminthreads Minimum number of active scheduler threads allowed.\n" + " Defaults to 1.\n" + " --ponycdmin Defer cycle detection until 2^N actors have blocked.\n" + " Defaults to 2^4.\n" + " --ponycdmax Always cycle detect when 2^N actors have blocked.\n" + " Defaults to 2^18.\n" + " --ponycdconf Send cycle detection CNF messages in groups of 2^N.\n" + " Defaults to 2^6.\n" + " --ponygcinitial Defer garbage collection until an actor is using at\n" + " least 2^N bytes. Defaults to 2^14.\n" + " --ponygcfactor After GC, an actor will next be GC'd at a heap memory\n" + " usage N times its current value. This is a floating\n" + " point value. Defaults to 2.0.\n" + " --ponynoyield Do not yield the CPU when no work is available.\n" + " --ponynoblock Do not send block messages to the cycle detector.\n" + " --ponynopin Do not pin scheduler threads or the ASIO thread, even\n" + " if --ponypinasio is set.\n" + " --ponypinasio Pin the ASIO thread to a CPU the way scheduler\n" + " threads are pinned to CPUs.\n" + " --ponyversion Print the version of the compiler and exit.\n" ); } @@ -294,6 +295,12 @@ int main(int argc, char* argv[]) opt.pic = true; #endif +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // Defined "scheduler_scaling_pthreads" so that SIGUSR2 is made available for + // use by the signals package when not using signals for scheduler scaling + define_build_flag("scheduler_scaling_pthreads"); +#endif + while((id = ponyint_opt_next(&s)) != -1) { switch(id)