From 5683e894ca0613958642eb90a4a5838d57446232 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sun, 24 Nov 2024 12:50:08 -0500 Subject: [PATCH 01/13] Add support for pinning actors to a dedicated scheduler thread The overall design goal and approach was to make it possible to have pinned actors while minimizing impact to pre-existing non-pinned actor workloads. This meant there could be no impact on message sends (i.e. can't check to see if the receiving actor is a pinned actor or not to decide what to do with it if it is unscheduled). These goals were chosen because it is expected that `pinned` actors will be a niche/small part of any pony application's overall workload. The approach taken has negligible performance impact to existing scheduler logic. It adds a couple of extra checks to see if an actor that is ready to run is a pinned actor or not and if not, there is no other overhead involved. The scheduler quiescence logic has an extra check for an atomic counter of pinned actors but that is also negligible if no pinned actors are ever used. The overall logic for pinned actors works as follows: * The `main` thread is dedicated to running pinned actors (and only pinned actors). This thread previously initialized the runtime and then sat around waiting for all schedulers to reach quiescence so now it runs pinned actors in the meantime if there are any. * The `pinned actor thread` (`main`) runs a custom run loop for pinned actors that does not participate in work stealing or any other normal scheduler messaging except for unmuting messages and the termination message. It also will only ever run `pinned` actors and any non-`pinned` actors will get pushed onto the `inject` queue. * Normal schedulers will only ever run non-`pinned` actors and any `pinned` actors will get pushed onto the `pinned actor thread`'s queue. * From an api perspective, there is now an `actor_pinning` package in the stdlib. An actor can request to be pinned, check that it has successfully been pinned (so that it can safely do whatever it needs to do while pinned), and request to be unpinned. While the above is not necessarily the most efficient way to run `pinned` actors, it meets the original design goals of making it possible while minimizing impact of pre-existing non-pinned actor workloads. --- packages/actor_pinning/actor_pinning.pony | 81 ++++ packages/actor_pinning/auth.pony | 3 + src/libponyrt/actor/actor.c | 34 +- src/libponyrt/actor/actor.h | 6 + src/libponyrt/sched/scheduler.c | 458 +++++++++++++++++- src/libponyrt/sched/scheduler.h | 10 +- src/libponyrt/sched/systematic_testing.c | 52 +- src/libponyrt/sched/systematic_testing.h | 6 +- .../pinned-actor/expected-exit-code.txt | 1 + .../full-program-tests/pinned-actor/main.pony | 43 ++ 10 files changed, 653 insertions(+), 41 deletions(-) create mode 100644 packages/actor_pinning/actor_pinning.pony create mode 100644 packages/actor_pinning/auth.pony create mode 100644 test/full-program-tests/pinned-actor/expected-exit-code.txt create mode 100644 test/full-program-tests/pinned-actor/main.pony diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony new file mode 100644 index 0000000000..f3f3f8c59b --- /dev/null +++ b/packages/actor_pinning/actor_pinning.pony @@ -0,0 +1,81 @@ +""" +# Actor Pinning Package + +The Actor Pinning package allows Pony programmers to pin actors to a dedicated +scheduler thread. This can be required/used for interfacing with C libraries +that rely on thread local storage. A common example of this is graphics/windowing +libraries. + +The way it works is that an actor can request that it be pinned and then check +to confirm that the pinning was successfully applied after which all subsequent +behaviors on that actor will run on the same scheduler thread until the actor is +destroyed or the actor requests to be unpinned. + +## Example program + +```pony +// Here we have the Main actor that upon construction requests a PinUnpinActorAuth +// token from AmbientAuth and then requests that it be pinned. It then recursively +// calls the `check_pinned` behavior until the runtime reports that it has +// successfully been pinned after which it starts `do_stuff` to do whatever +// work it needs to do that requires it to be pinned. Once it has completed all +// of its work, it calls `done` to request that the runtime `unpin` it. + +use "actor_pinning" + +actor Main + let _env: Env + let _auth: PinUnpinActorAuth + + new create(env: Env) => + _env = env + _auth = PinUnpinActorAuth(env.root) + ActorPinning.pin(_auth) + check_pinned() + + be check_pinned() => + if ActorPinning.is_successfully_pinned(_auth) then + // do stuff that requires this actor to be pinned + do_stuff(10) + else + check_pinned() + end + + be do_stuff(i: I32) => + if i < 0 then + done() + else + do_stuff(i - 1) + end + + be done() => + ActorPinning.unpin(_auth) +``` + +## Caveat + +Pinned actors could prevent the runtime from reaching quiescence. When the +cycle detector reaps pinned actors it allows the runtime to reach quiescence. +If the cycle detector is unable to reap all pinned actors or if the cycle +detector is disabled (via `--ponynoblock`), it is up to the programmer to +manually `unpin` all pinned actors or else the runtime will be unable to reach +quiescence and the program will never terminate. +""" + +use @pony_actor_set_pinned[None]() +use @pony_actor_unset_pinned[None]() +use @pony_scheduler_index[I32]() + +primitive ActorPinning + fun pin(auth: PinUnpinActorAuth) => + @pony_actor_set_pinned() + + fun unpin(auth: PinUnpinActorAuth) => + @pony_actor_unset_pinned() + + fun is_successfully_pinned(auth: PinUnpinActorAuth): Bool => + let sched: I32 = @pony_scheduler_index() + + // the `-999` constant is the same value as `PONY_PINNED_ACTOR_THREAD_INDEX` + // defined in `scheduler.h` in the runtime + sched == -999 diff --git a/packages/actor_pinning/auth.pony b/packages/actor_pinning/auth.pony new file mode 100644 index 0000000000..9e93c62bbb --- /dev/null +++ b/packages/actor_pinning/auth.pony @@ -0,0 +1,3 @@ +primitive PinUnpinActorAuth + new create(from: AmbientAuth) => + None diff --git a/src/libponyrt/actor/actor.c b/src/libponyrt/actor/actor.c index b9f26b7f76..fcc9e3a449 100644 --- a/src/libponyrt/actor/actor.c +++ b/src/libponyrt/actor/actor.c @@ -36,6 +36,7 @@ enum FLAG_UNSCHEDULED = 1 << 3, FLAG_CD_CONTACTED = 1 << 4, FLAG_RC_OVER_ZERO_SEEN = 1 << 5, + FLAG_PINNED = 1 << 6, }; enum @@ -814,6 +815,12 @@ void ponyint_actor_destroy(pony_actor_t* actor) print_actor_stats(actor); #endif + if(ponyint_actor_is_pinned(actor)) + { + unset_internal_flag(actor, FLAG_PINNED); + ponyint_sched_decrement_pinned_actor_count(); + } + // Free variable sized actors correctly. ponyint_pool_free_size(actor->type->size, actor); } @@ -1046,7 +1053,7 @@ PONY_API void pony_sendv_single(pony_ctx_t* ctx, pony_actor_t* to, { // if the receiving actor is currently not unscheduled AND it's not // muted, schedule it. - ponyint_sched_add_inject_or_sched(ctx, to); + ponyint_sched_add(ctx, to); } } } @@ -1219,6 +1226,31 @@ PONY_API void pony_triggergc(pony_ctx_t* ctx) ctx->current->heap.next_gc = 0; } +bool ponyint_actor_is_pinned(pony_actor_t* actor) +{ + return has_internal_flag(actor, FLAG_PINNED); +} + +PONY_API void pony_actor_set_pinned() +{ + pony_ctx_t* ctx = pony_ctx(); + if(!ponyint_actor_is_pinned(ctx->current)) + { + set_internal_flag(ctx->current, FLAG_PINNED); + ponyint_sched_increment_pinned_actor_count(); + } +} + +PONY_API void pony_actor_unset_pinned() +{ + pony_ctx_t* ctx = pony_ctx(); + if(ponyint_actor_is_pinned(ctx->current)) + { + unset_internal_flag(ctx->current, FLAG_PINNED); + ponyint_sched_decrement_pinned_actor_count(); + } +} + void ponyint_become(pony_ctx_t* ctx, pony_actor_t* actor) { ctx->current = actor; diff --git a/src/libponyrt/actor/actor.h b/src/libponyrt/actor/actor.h index 68be5558ed..b77f6a6697 100644 --- a/src/libponyrt/actor/actor.h +++ b/src/libponyrt/actor/actor.h @@ -130,6 +130,12 @@ gc_t* ponyint_actor_gc(pony_actor_t* actor); heap_t* ponyint_actor_heap(pony_actor_t* actor); +bool ponyint_actor_is_pinned(pony_actor_t* actor); + +PONY_API void pony_actor_set_pinned(); + +PONY_API void pony_actor_unset_pinned(); + bool ponyint_actor_pendingdestroy(pony_actor_t* actor); void ponyint_actor_setpendingdestroy(pony_actor_t* actor); diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index bd0ac50e9b..5bd4b892fd 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -47,6 +47,10 @@ static PONY_ATOMIC(bool) temporarily_disable_scheduler_scaling; static PONY_ATOMIC(bool) detect_quiescence; static bool use_yield; static mpmcq_t inject; +static PONY_ATOMIC(bool) pinned_actor_scheduler_suspended; +static PONY_ATOMIC(bool) pinned_actor_scheduler_suspended_check; +static scheduler_t* pinned_actor_scheduler; +static PONY_ATOMIC(uint32_t) pinned_actor_count; static __pony_thread_local scheduler_t* this_scheduler; #if defined(USE_SCHEDULER_SCALING_PTHREADS) @@ -159,6 +163,22 @@ static uint32_t get_active_scheduler_count_check() return atomic_load_explicit(&active_scheduler_count_check, memory_order_relaxed); } +static uint32_t get_pinned_actor_count() +{ + return atomic_load_explicit(&pinned_actor_count, memory_order_relaxed); +} + +void ponyint_sched_increment_pinned_actor_count() +{ + atomic_fetch_add_explicit(&pinned_actor_count, 1, memory_order_relaxed); +} + +void ponyint_sched_decrement_pinned_actor_count() +{ + uint32_t old = atomic_fetch_sub_explicit(&pinned_actor_count, 1, memory_order_relaxed); + pony_assert(0 < old); +} + /** * Gets the whether dynamic scheduler scaling is temporarily disabled */ @@ -199,6 +219,31 @@ static pony_actor_t* pop_global(scheduler_t* sched) return pop(sched); } +/** + * Sends a message to the pinned actor thread. + */ + +static void send_msg_pinned_actor_thread(uint32_t from, sched_msg_t msg, intptr_t arg) +{ + pony_msgi_t* m = (pony_msgi_t*)pony_alloc_msg( + POOL_INDEX(sizeof(pony_msgi_t)), msg); + +#ifdef USE_RUNTIMESTATS_MESSAGES + this_scheduler->ctx.schedulerstats.num_inflight_messages--; + this_scheduler->ctx.schedulerstats.mem_used_inflight_messages += sizeof(pony_msgi_t); + this_scheduler->ctx.schedulerstats.mem_used_inflight_messages -= POOL_ALLOC_SIZE(pony_msgi_t); +#endif + + m->i = arg; + + ponyint_thread_messageq_push(&pinned_actor_scheduler->mq, &m->msg, &m->msg +#ifdef USE_DYNAMIC_TRACE + , from, PONY_PINNED_ACTOR_THREAD_INDEX +#endif + ); + (void)from; +} + /** * Sends a message to a thread. */ @@ -264,6 +309,58 @@ static void signal_suspended_threads(uint32_t sched_count, int32_t curr_sched_id } } +static void signal_suspended_pinned_actor_thread() +{ +#if defined(USE_SYSTEMATIC_TESTING) + SYSTEMATIC_TESTING_YIELD(); +#else + ponyint_thread_wake(pinned_actor_scheduler->tid, pinned_actor_scheduler->sleep_object); +#endif +} + +static void wake_suspended_pinned_actor_thread() +{ +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // acquire mutex if using pthreads + if(!pthread_mutex_lock(&sched_mut)) +#else + // get the bool that controls modifying the active scheduler count variable + // if using signals + if(!atomic_load_explicit(&scheduler_count_changing, memory_order_relaxed) + && !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) +#endif + { + atomic_store_explicit(&pinned_actor_scheduler_suspended, false, memory_order_relaxed); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals. + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif + } + + signal_suspended_pinned_actor_thread(); + + // wait for the sleeping thread to wake and update check variable + while(atomic_load_explicit(&pinned_actor_scheduler_suspended_check, memory_order_relaxed)) + { + // send signals to the pinned actor scheduler thread that should be awake + // this is somewhat wasteful if the scheduler thread is already awake + // but is necessary in case the signal to wake the thread was missed + // NOTE: this intentionally allows for the case where the scheduler + // thread might miss the signal and not wake up. That is handled + // by a combination of the check variable and this while loop + signal_suspended_pinned_actor_thread(); + } +} + static void wake_suspended_threads(int32_t current_scheduler_id) { uint32_t current_active_scheduler_count = get_active_scheduler_count(); @@ -420,6 +517,7 @@ static bool read_msg(scheduler_t* sched) case SCHED_CNF: { pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(PONY_PINNED_ACTOR_THREAD_INDEX != sched->index); // Echo the token back as ACK(token). send_msg(sched->index, 0, SCHED_ACK, m->i); @@ -457,6 +555,7 @@ static bool read_msg(scheduler_t* sched) case SCHED_NOISY_ASIO: { pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(PONY_PINNED_ACTOR_THREAD_INDEX != sched->index); // mark asio as being noisy sched->asio_noisy++; @@ -466,6 +565,7 @@ static bool read_msg(scheduler_t* sched) case SCHED_UNNOISY_ASIO: { pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(PONY_PINNED_ACTOR_THREAD_INDEX != sched->index); // mark asio as not being noisy sched->asio_noisy--; @@ -499,8 +599,8 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) // only scheduler 0 can initiate shutdown (it is the ony that gets all the // ACK messages as part of the CNF/ACK coordination for shutdown) // only if there are no noisy actors registered with the ASIO subsystem - // and the mutemap is empty... - if(0 == sched->index && !sched->asio_noisy && ponyint_mutemap_size(&sched->mute_mapping) == 0) + // and the mutemap is empty... and no pinned actors in the system... + if(0 == sched->index && !sched->asio_noisy && ponyint_mutemap_size(&sched->mute_mapping) == 0 && 0 == get_pinned_actor_count()) { // 0 means that all schedulers have ACK'd and we can proceed with shutdown.. // if any scheduler threads block/unblock before we get ACKs from them all @@ -519,6 +619,10 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) // successfully stopped ASIO thread // tell all scheduler threads to terminate send_msg_all(sched->index, SCHED_TERMINATE, 0); + send_msg_pinned_actor_thread(sched->index, SCHED_TERMINATE, 0); + + wake_suspended_threads(sched->index); + wake_suspended_pinned_actor_thread(); sched->ack_token++; sched->ack_count = scheduler_count; @@ -958,7 +1062,10 @@ static pony_actor_t* steal(scheduler_t* sched) // if we're scheduler 0 and we're in a termination CNF/ACK cycle // make sure all threads are awake in case any missed a wake up signal if(sched->index == 0 && get_temporarily_disable_scheduler_scaling()) + { wake_suspended_threads(sched->index); + wake_suspended_pinned_actor_thread(); + } } // Only send unblock message if a corresponding block message was sent @@ -1062,11 +1169,33 @@ static void run(scheduler_t* sched) actor = pop_global(sched); } - if(actor == NULL) + // if it's a pinned actor, send it to the pinned_actor_scheduler and get + // another actor to process + while(NULL != actor && ponyint_actor_is_pinned(actor)) + { + ponyint_mpmcq_push(&pinned_actor_scheduler->q, actor); + // wake pinned_actor_scheduler + wake_suspended_pinned_actor_thread(); + actor = pop_global(sched); + } + + while(actor == NULL) { // We had an empty queue and no rescheduled actor. actor = steal(sched); + // if it's a pinned actor, send it to the pinned_actor_scheduler and get + // another actor to process + if(NULL != actor && ponyint_actor_is_pinned(actor)) + { + ponyint_mpmcq_push(&pinned_actor_scheduler->q, actor); + // wake pinned_actor_scheduler + wake_suspended_pinned_actor_thread(); + actor = NULL; + // try and steal again + continue; + } + if(actor == NULL) { #ifdef USE_RUNTIMESTATS @@ -1095,6 +1224,8 @@ static void run(scheduler_t* sched) if(ponyint_mutemap_size(&sched->mute_mapping) > 0) ponyint_sched_maybe_wakeup(sched->index); + pony_assert(!ponyint_actor_is_pinned(actor)); + // Run the current actor and get the next actor. bool reschedule = ponyint_actor_run(&sched->ctx, actor, false); sched->ctx.current = NULL; @@ -1156,6 +1287,239 @@ static DECLARE_THREAD_FN(run_thread) return 0; } + +static void perhaps_suspend_pinned_actor_scheduler( + scheduler_t* sched, uint64_t tsc, uint64_t tsc2) +{ + // if we're not terminating + if ((!sched->terminate) +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // try to acquire mutex if using pthreads + && !pthread_mutex_trylock(&sched_mut) +#else + // try and get the bool that controls modifying the pinned_actor_scheduler_suspended + // variable if using signals + && (!atomic_load_explicit(&scheduler_count_changing, memory_order_relaxed) + && !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) +#endif + ) + { + atomic_store_explicit(&pinned_actor_scheduler_suspended, true, memory_order_relaxed); + atomic_store_explicit(&pinned_actor_scheduler_suspended_check, true, memory_order_relaxed); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the pinned_actor_scheduler_suspended + // variable if using signals + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + + // dtrace suspend notification + DTRACE1(THREAD_SUSPEND, (uintptr_t)sched); + + // sleep waiting for signal to wake up again +#if defined(USE_SYSTEMATIC_TESTING) +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + SYSTEMATIC_TESTING_SUSPEND(&sched_mut); +#else + SYSTEMATIC_TESTING_SUSPEND(); +#endif +#else +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + ponyint_thread_suspend(sched->sleep_object, &sched_mut); +#else + ponyint_thread_suspend(sched->sleep_object); +#endif +#endif + + // dtrace resume notification + DTRACE1(THREAD_RESUME, (uintptr_t)sched); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // When using signals, need to acquire sched count changing variable + while (true) + { + // get the bool that controls modifying the pinned_actor_scheduler_suspended + // variable if using signals + if(!atomic_load_explicit(&scheduler_count_changing, memory_order_relaxed) + && !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) + { +#endif + + atomic_store_explicit(&pinned_actor_scheduler_suspended, false, memory_order_relaxed); + atomic_store_explicit(&pinned_actor_scheduler_suspended_check, false, memory_order_relaxed); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the pinned_actor_scheduler_suspended + // variable if using signals + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); + + // break while loop + break; + } + } +#endif + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif + } else { + // unable to get the lock to suspend so sleep for a bit +#if defined(USE_SYSTEMATIC_TESTING) + (void)tsc; + (void)tsc2; + SYSTEMATIC_TESTING_YIELD(); +#else + ponyint_cpu_core_pause(tsc, tsc2, true); +#endif + } +} + +/** + * Run a custom scheduler thread for pinned actors until termination. + * This thread does not partiticpate in most normal scheduler messaging + * like CNF/ACK/block/unblock/suspend/noisy/unnoisy. it does participate in + * muting messages and termination messages. + */ +static void run_pinned_actors() +{ + pony_assert(PONY_PINNED_ACTOR_THREAD_INDEX == this_scheduler->index); + + scheduler_t* sched = this_scheduler; + +#if defined(USE_SYSTEMATIC_TESTING) + // start processing + SYSTEMATIC_TESTING_START(scheduler, ponyint_asio_get_backend_tid(), ponyint_asio_get_backend_sleep_object(), sched->tid, sched->sleep_object); +#endif + +#ifdef USE_RUNTIMESTATS + uint64_t last_stats_print_tsc = ponyint_cpu_tick(); + sched->ctx.last_tsc = ponyint_cpu_tick(); +#endif + + pony_actor_t* actor = NULL; + uint64_t tsc = ponyint_cpu_tick(); + + while(true) + { +#ifdef USE_RUNTIMESTATS + if(print_stats) + { + // convert to cycles for use with ponyint_cpu_tick() + // 1 second = 2000000000 cycles (approx.) + // based on same scale as ponyint_cpu_core_pause() uses + uint64_t new_tsc = ponyint_cpu_tick(); + if((new_tsc - last_stats_print_tsc) > print_stats_interval) + { + last_stats_print_tsc = new_tsc; + print_scheduler_stats(sched); + } + } +#endif + + // process pending messages; this might add an actor to the inject queue + // due to an unmuted actor but that is for other scheduler threads to deal with + // technically, this is inefficient since any actor unmuted by the pinned actor + // scheduler should be handled by the pinned actor scheduler but for the moment + // that is how things work and the actor will eventually come back to this thread + // to be run anyways. + read_msg(sched); + + // Termination. all the normal scheduler threads have decided there is no + // more work to do so we can shutdown + if(sched->terminate) + { +#ifdef USE_RUNTIMESTATS + uint64_t used_cpu = ponyint_sched_cpu_used(&sched->ctx); + sched->ctx.schedulerstats.misc_cpu += used_cpu; + print_scheduler_stats(sched); +#endif + + pony_assert(pop(sched) == NULL); + SYSTEMATIC_TESTING_STOP_THREAD(); + return; + } + + // get the next pinned actor to run if we don't already have one + if(actor == NULL) + actor = pop(sched); + + // if it's a not pinned actor, send it to a normal scheduler and get + // another pinned actor to process; these are likely the result of pinned + // actors sending messages to non-pinned actors + while(NULL != actor && !ponyint_actor_is_pinned(actor)) + { + // Put on the shared mpmcq. + ponyint_mpmcq_push(&inject, actor); + actor = pop(sched); + } + + if(actor == NULL) + { + uint64_t tsc2 = ponyint_cpu_tick(); + uint64_t clocks_elapsed = tsc2 - tsc; + + // We had an empty queue and no actor. need to suspend or sleep only if + // mutemap is empty as this thread doesn't participate in work stealing + if(ponyint_mutemap_size(&sched->mute_mapping) == 0 && clocks_elapsed > scheduler_suspend_threshold) + { + // suspend + perhaps_suspend_pinned_actor_scheduler(sched, tsc, tsc2); + } else { + #if defined(USE_SYSTEMATIC_TESTING) + SYSTEMATIC_TESTING_YIELD(); + #else + // don't suspend the thread but sleep instead to not burn cpu + ponyint_cpu_core_pause(tsc, tsc2, true); + #endif + } + } else { + pony_assert(ponyint_actor_is_pinned(actor)); + + tsc = ponyint_cpu_tick(); + + // Run the current actor and get the next actor. + bool reschedule = ponyint_actor_run(&sched->ctx, actor, false); + sched->ctx.current = NULL; + SYSTEMATIC_TESTING_YIELD(); + + // there's a small chance that the pinned actor was the only active one + // and all the other scheduler threads went to sleep (including scheduler + // 0 if ASIO is noisy) and so we try and wake up at least one scheduler + // thread just in case if that's the case + // maybe wake up a scheduler thread if they've all fallen asleep + ponyint_sched_maybe_wakeup_if_all_asleep(PONY_PINNED_ACTOR_THREAD_INDEX); + + pony_actor_t* next = pop(sched); + + if(reschedule) + { + if(next != NULL) + { + // If we have a next actor, we go on the back of the queue. Otherwise, + // we continue to run this actor. + push(sched, actor); + DTRACE2(ACTOR_DESCHEDULED, (uintptr_t)sched, (uintptr_t)actor); + actor = next; + DTRACE2(ACTOR_SCHEDULED, (uintptr_t)sched, (uintptr_t)actor); + } + } else { + // We aren't rescheduling, so run the next actor. This may be NULL if our + // queue was empty. + DTRACE2(ACTOR_DESCHEDULED, (uintptr_t)sched, (uintptr_t)actor); + actor = next; + if (DTRACE_ENABLED(ACTOR_SCHEDULED) && actor != NULL) { + DTRACE2(ACTOR_SCHEDULED, (uintptr_t)sched, (uintptr_t)actor); + } + } + } + } +} + static void ponyint_sched_shutdown() { uint32_t start = 0; @@ -1203,9 +1567,11 @@ static void ponyint_sched_shutdown() * sizeof(scheduler_t))); #endif scheduler = NULL; + pinned_actor_scheduler = NULL; scheduler_count = 0; atomic_store_explicit(&active_scheduler_count, 0, memory_order_relaxed); atomic_store_explicit(&temporarily_disable_scheduler_scaling, false, memory_order_relaxed); + atomic_store_explicit(&pinned_actor_count, 0, memory_order_relaxed); ponyint_mpmcq_destroy(&inject); } @@ -1262,6 +1628,7 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, atomic_store_explicit(&active_scheduler_count_check, scheduler_count, memory_order_relaxed); atomic_store_explicit(&temporarily_disable_scheduler_scaling, false, memory_order_relaxed); + atomic_store_explicit(&pinned_actor_count, 0, memory_order_relaxed); scheduler = (scheduler_t*)ponyint_pool_alloc_size( scheduler_count * sizeof(scheduler_t)); #ifdef USE_RUNTIMESTATS @@ -1321,6 +1688,47 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, ponyint_mpmcq_init(&inject); ponyint_asio_init(asio_cpu); + // set up main thread as scheduler for running pinned actors +#if defined(PLATFORM_IS_WINDOWS) + // create wait event objects + this_scheduler->sleep_object = CreateEvent(NULL, FALSE, FALSE, NULL); +#elif defined(USE_SCHEDULER_SCALING_PTHREADS) + // create pthread condition object +#ifdef USE_RUNTIMESTATS + mem_used += sizeof(pthread_cond_t); + mem_allocated += POOL_ALLOC_SIZE(pthread_cond_t); +#endif + this_scheduler->sleep_object = POOL_ALLOC(pthread_cond_t); + int ret = pthread_cond_init(this_scheduler->sleep_object, NULL); + if(ret != 0) + { + // if it failed, set `sleep_object` to `NULL` for error +#ifdef USE_RUNTIMESTATS + mem_used -= sizeof(pthread_cond_t); + mem_allocated -= POOL_ALLOC_SIZE(pthread_cond_t); +#endif + POOL_FREE(pthread_cond_t, this_scheduler->sleep_object); + this_scheduler->sleep_object = NULL; + } +#else + this_scheduler->sleep_object = PONY_SCHED_SLEEP_WAKE_SIGNAL; +#endif + + this_scheduler->ctx.scheduler = this_scheduler; + this_scheduler->last_victim = this_scheduler; + this_scheduler->index = PONY_PINNED_ACTOR_THREAD_INDEX; + this_scheduler->asio_noisy = false; + ponyint_messageq_init(&this_scheduler->mq); + ponyint_mpmcq_init(&this_scheduler->q); + +#if defined(PLATFORM_IS_WINDOWS) || defined(USE_SCHEDULER_SCALING_PTHREADS) + // there was an error creating a wait event or a pthread condition object + if(this_scheduler->sleep_object == NULL) + return false; +#endif + + pinned_actor_scheduler = this_scheduler; + return pony_ctx(); } @@ -1331,6 +1739,9 @@ bool ponyint_sched_start(bool library) if(!ponyint_asio_start()) return false; + atomic_store_explicit(&pinned_actor_scheduler_suspended, false, memory_order_relaxed); + atomic_store_explicit(&pinned_actor_scheduler_suspended_check, false, memory_order_relaxed); + atomic_store_explicit(&detect_quiescence, !library, memory_order_relaxed); DTRACE0(RT_START); @@ -1349,16 +1760,35 @@ bool ponyint_sched_start(bool library) return false; } -#if defined(USE_SYSTEMATIC_TESTING) - // start processing - SYSTEMATIC_TESTING_START(scheduler, ponyint_asio_get_backend_tid(), ponyint_asio_get_backend_sleep_object()); +#if !defined(PLATFORM_IS_WINDOWS) && !defined(USE_SCHEDULER_SCALING_PTHREADS) + // Make sure we block signals related to scheduler sleeping/waking + // so they queue up to avoid race conditions + sigset_t set; + sigemptyset(&set); + sigaddset(&set, PONY_SCHED_SLEEP_WAKE_SIGNAL); + pthread_sigmask(SIG_BLOCK, &set, NULL); #endif + // custom run loop for pinned actors + run_pinned_actors(); + if(!library) { ponyint_sched_shutdown(); } + ponyint_mpmcq_cleanup(); + ponyint_pool_thread_cleanup(); + + while(ponyint_thread_messageq_pop(&this_scheduler->mq +#ifdef USE_DYNAMIC_TRACE + , i +#endif + ) != NULL) { ; } + ponyint_mutemap_destroy(&this_scheduler->mute_mapping); + ponyint_messageq_destroy(&this_scheduler->mq, false); + ponyint_mpmcq_destroy(&this_scheduler->q); + return true; } @@ -1368,18 +1798,6 @@ void ponyint_sched_stop() ponyint_sched_shutdown(); } -void ponyint_sched_add_inject_or_sched(pony_ctx_t* ctx, pony_actor_t* actor) -{ - if(ctx->scheduler != NULL) - { - // Add to the current scheduler thread. - push(ctx->scheduler, actor); - } else { - // Put on the shared mpmcq. - ponyint_mpmcq_push(&inject, actor); - } -} - void ponyint_sched_add(pony_ctx_t* ctx, pony_actor_t* actor) { pony_assert(NULL != ctx->scheduler); @@ -1633,6 +2051,10 @@ void ponyint_sched_mute(pony_ctx_t* ctx, pony_actor_t* sender, pony_actor_t* rec void ponyint_sched_start_global_unmute(uint32_t from, pony_actor_t* actor) { send_msg_all_active(from, SCHED_UNMUTE_ACTOR, (intptr_t)actor); + + // only send if pinned actor thread is not suspended + if(!atomic_load_explicit(&pinned_actor_scheduler_suspended, memory_order_relaxed)) + send_msg_pinned_actor_thread(from, SCHED_UNMUTE_ACTOR, (intptr_t)actor); } DECLARE_STACK(ponyint_actorstack, actorstack_t, pony_actor_t); diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 6777641958..3a54b72a9f 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -20,6 +20,10 @@ typedef struct scheduler_t scheduler_t; #define SPECIAL_THREADID_EPOLL -12 #define PONY_UNKNOWN_SCHEDULER_INDEX -1 +// the `-999` constant is the same value that is hardcoded in `actor_pinning.pony` +// in the `actor_pinning` package +#define PONY_PINNED_ACTOR_THREAD_INDEX -999 + #if !defined(PLATFORM_IS_WINDOWS) && !defined(USE_SCHEDULER_SCALING_PTHREADS) // Signal to use for suspending/resuming threads via `sigwait`/`pthread_kill` // If you change this, remember to change `signals` package accordingly @@ -121,9 +125,11 @@ bool ponyint_sched_start(bool library); void ponyint_sched_stop(); -void ponyint_sched_add(pony_ctx_t* ctx, pony_actor_t* actor); +void ponyint_sched_increment_pinned_actor_count(); -void ponyint_sched_add_inject_or_sched(pony_ctx_t* ctx, pony_actor_t* actor); +void ponyint_sched_decrement_pinned_actor_count(); + +void ponyint_sched_add(pony_ctx_t* ctx, pony_actor_t* actor); void ponyint_sched_add_inject(pony_actor_t* actor); diff --git a/src/libponyrt/sched/systematic_testing.c b/src/libponyrt/sched/systematic_testing.c index cbe4d91061..a5464bef73 100644 --- a/src/libponyrt/sched/systematic_testing.c +++ b/src/libponyrt/sched/systematic_testing.c @@ -71,8 +71,8 @@ void ponyint_systematic_testing_init(uint64_t random_seed, uint32_t max_threads) // interleaving everywhere if needed srand((int)random_seed); - // initialize thead tracking array (should be max_threads + 1 to account for asio) - total_threads = max_threads + 1; + // initialize thead tracking array (should be max_threads + 2 to account for asio and pinned actor threads) + total_threads = max_threads + 2; size_t mem_needed = total_threads * sizeof(systematic_testing_thread_t); threads_to_track = (systematic_testing_thread_t*)ponyint_pool_alloc_size( mem_needed); @@ -90,6 +90,10 @@ void ponyint_systematic_testing_wait_start(pony_thread_id_t thread, pony_signal_ { SYSTEMATIC_TESTING_PRINTF("thread %lu: waiting to start...\n", thread); +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + pthread_mutex_lock(&systematic_testing_mut); +#endif + atomic_fetch_add_explicit(&waiting_to_start_count, 1, memory_order_relaxed); // sleep until it is this threads turn to do some work @@ -107,40 +111,54 @@ void ponyint_systematic_testing_wait_start(pony_thread_id_t thread, pony_signal_ #endif } - SYSTEMATIC_TESTING_PRINTF("thread %lu: started...\n", thread); + SYSTEMATIC_TESTING_PRINTF("thread %lu: started... waiting_to_start_count: %i\n", thread, atomic_load_explicit(&waiting_to_start_count, memory_order_relaxed)); } -void ponyint_systematic_testing_start(scheduler_t* schedulers, pony_thread_id_t asio_thread, pony_signal_event_t asio_signal) +void ponyint_systematic_testing_start(scheduler_t* schedulers, pony_thread_id_t asio_thread, pony_signal_event_t asio_signal, pony_thread_id_t pinned_actor_thread, pony_signal_event_t pinned_actor_signal) { - threads_to_track[0].tid = asio_thread; - threads_to_track[0].sleep_object = asio_signal; + threads_to_track[0].tid = pinned_actor_thread; + threads_to_track[0].sleep_object = pinned_actor_signal; threads_to_track[0].stopped = false; - for(uint32_t i = 1; i < total_threads; i++) + threads_to_track[1].tid = asio_thread; + threads_to_track[1].sleep_object = asio_signal; + threads_to_track[1].stopped = false; + + for(uint32_t i = 2; i < total_threads; i++) { - threads_to_track[i].tid = schedulers[i-1].tid; - threads_to_track[i].sleep_object = schedulers[i-1].sleep_object; + threads_to_track[i].tid = schedulers[i-2].tid; + threads_to_track[i].sleep_object = schedulers[i-2].sleep_object; threads_to_track[i].stopped = false; } - // always start the first scheduler thread (not asio which is 0) - active_thread = &threads_to_track[1]; - - while(total_threads != atomic_load_explicit(&waiting_to_start_count, memory_order_relaxed)) + while((total_threads - 1) != atomic_load_explicit(&waiting_to_start_count, memory_order_relaxed)) { - SYSTEMATIC_TESTING_PRINTF("Waiting for all threads to be ready before starting execution..\n"); + SYSTEMATIC_TESTING_PRINTF("Waiting for all %i threads to be ready before starting execution.. currently: %i\n", (total_threads - 1), atomic_load_explicit(&waiting_to_start_count, memory_order_relaxed)); ponyint_cpu_core_pause(1, 10000002, true); } +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + pthread_mutex_lock(&systematic_testing_mut); +#endif + + // always start the first scheduler thread (not asio which is 1 nor the pinned actor thread which is 0) + active_thread = &threads_to_track[2]; + SYSTEMATIC_TESTING_PRINTF("Starting systematic testing with thread %lu...\n", active_thread->tid); ponyint_thread_wake(active_thread->tid, active_thread->sleep_object); + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + ponyint_thread_suspend(threads_to_track[0].sleep_object, &systematic_testing_mut); +#else + ponyint_thread_suspend(threads_to_track[0].sleep_object); +#endif } static uint32_t get_next_index() { uint32_t active_scheduler_count = pony_active_schedulers(); - uint32_t active_count = active_scheduler_count + 1; // account for asio + uint32_t active_count = active_scheduler_count + 2; // account for asio and pinned actor thread uint32_t next_index = 0; do { @@ -233,8 +251,8 @@ void ponyint_systematic_testing_suspend() bool ponyint_systematic_testing_asio_stopped() { - // asio is always the first thread - return threads_to_track[0].stopped; + // asio is always the second thread + return threads_to_track[1].stopped; } void ponyint_systematic_testing_stop_thread() diff --git a/src/libponyrt/sched/systematic_testing.h b/src/libponyrt/sched/systematic_testing.h index 0704e2a749..24e06ef767 100644 --- a/src/libponyrt/sched/systematic_testing.h +++ b/src/libponyrt/sched/systematic_testing.h @@ -24,7 +24,7 @@ size_t ponyint_systematic_testing_static_alloc_size(); #endif void ponyint_systematic_testing_init(uint64_t random_seed, uint32_t max_threads); -void ponyint_systematic_testing_start(scheduler_t* schedulers, pony_thread_id_t asio_thread, pony_signal_event_t asio_signal); +void ponyint_systematic_testing_start(scheduler_t* schedulers, pony_thread_id_t asio_thread, pony_signal_event_t asio_signal, pony_thread_id_t pinned_actor_thread, pony_signal_event_t pinned_actor_signal); void ponyint_systematic_testing_wait_start(pony_thread_id_t thread, pony_signal_event_t signal); void ponyint_systematic_testing_yield(); bool ponyint_systematic_testing_asio_stopped(); @@ -38,7 +38,7 @@ void ponyint_systematic_testing_suspend(); // TODO systematic testing: maybe make output conditional based on a `verbosity` argument that is CLI driven? #define SYSTEMATIC_TESTING_PRINTF(f_, ...) fprintf(stderr, (f_), ##__VA_ARGS__) #define SYSTEMATIC_TESTING_INIT(RANDOM_SEED, MAX_THREADS) ponyint_systematic_testing_init(RANDOM_SEED, MAX_THREADS) -#define SYSTEMATIC_TESTING_START(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT) ponyint_systematic_testing_start(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT) +#define SYSTEMATIC_TESTING_START(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT, PINNED_ACTOR_THREAD, PINNED_ACTOR_SLEEP_OBJECT) ponyint_systematic_testing_start(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT, PINNED_ACTOR_THREAD, PINNED_ACTOR_SLEEP_OBJECT) #define SYSTEMATIC_TESTING_WAIT_START(THREAD, SIGNAL) ponyint_systematic_testing_wait_start(THREAD, SIGNAL) #define SYSTEMATIC_TESTING_YIELD() ponyint_systematic_testing_yield() #define SYSTEMATIC_TESTING_ASIO_STOPPED() ponyint_systematic_testing_asio_stopped() @@ -55,7 +55,7 @@ PONY_EXTERN_C_END #define SYSTEMATIC_TESTING_PRINTF(f_, ...) #define SYSTEMATIC_TESTING_INIT(RANDOM_SEED, MAX_THREADS) -#define SYSTEMATIC_TESTING_START(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT) +#define SYSTEMATIC_TESTING_START(SCHEDULERS, ASIO_THREAD, ASIO_SLEEP_OBJECT, PINNED_ACTOR_THREAD, PINNED_ACTOR_SLEEP_OBJECT) #define SYSTEMATIC_TESTING_WAIT_START(THREAD, SIGNAL) #define SYSTEMATIC_TESTING_YIELD() #define SYSTEMATIC_TESTING_ASIO_STOPPED() diff --git a/test/full-program-tests/pinned-actor/expected-exit-code.txt b/test/full-program-tests/pinned-actor/expected-exit-code.txt new file mode 100644 index 0000000000..7813681f5b --- /dev/null +++ b/test/full-program-tests/pinned-actor/expected-exit-code.txt @@ -0,0 +1 @@ +5 \ No newline at end of file diff --git a/test/full-program-tests/pinned-actor/main.pony b/test/full-program-tests/pinned-actor/main.pony new file mode 100644 index 0000000000..b8a6fe7737 --- /dev/null +++ b/test/full-program-tests/pinned-actor/main.pony @@ -0,0 +1,43 @@ +use @pony_exitcode[None](code: I32) +use @pony_ctx[Pointer[None]]() +use @pony_sched_index[I32](ctx: Pointer[None]) + +use "actor_pinning" +use "time" + +actor Main + let _env: Env + let _auth: PinUnpinActorAuth + + new create(env: Env) => + _env = env + _auth = PinUnpinActorAuth(env.root) + ActorPinning.pin(_auth) + _env.out.print("initializing... sched index: " + @pony_sched_index(@pony_ctx()).string()) + let interval: U64 = (10 * 1_000_000_000) / 10 + let timers = Timers + let timer = Timer(Tick(this, 1), interval, interval) + timers(consume timer) + + be check_pinned() => + let sched: I32 = @pony_sched_index(@pony_ctx()) + _env.out.print("sched index: " + sched.string()) + if ActorPinning.is_successfully_pinned(_auth) then + @pony_exitcode(5) + else + check_pinned() + end + +class Tick is TimerNotify + let _main: Main + var _tick_count: I64 + + new iso create(main: Main, tick_count: I64) => + _main = main + _tick_count = tick_count + + fun ref apply(timer: Timer, count: U64): Bool => + _tick_count = _tick_count - (count.i64()) + let done = _tick_count <= 0 + _main.check_pinned() + not (done) From a8bd3620800fb77c301b74c09a197d9ab55fd29d Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sun, 24 Nov 2024 13:34:41 -0500 Subject: [PATCH 02/13] Fix unused variable error --- src/libponyrt/sched/scheduler.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 5bd4b892fd..6c184b8b5b 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -177,6 +177,7 @@ void ponyint_sched_decrement_pinned_actor_count() { uint32_t old = atomic_fetch_sub_explicit(&pinned_actor_count, 1, memory_order_relaxed); pony_assert(0 < old); + (void)old; } /** From 630e707b1fa0ce9125697a2d8d0d6f9e00ea37a9 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sun, 24 Nov 2024 14:50:48 -0500 Subject: [PATCH 03/13] Fix dtrace undeclared variable error --- src/libponyrt/sched/scheduler.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 6c184b8b5b..d3f9dd049d 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -1783,7 +1783,7 @@ bool ponyint_sched_start(bool library) while(ponyint_thread_messageq_pop(&this_scheduler->mq #ifdef USE_DYNAMIC_TRACE - , i + , PONY_PINNED_ACTOR_THREAD_INDEX #endif ) != NULL) { ; } ponyint_mutemap_destroy(&this_scheduler->mute_mapping); From 9f2ffc95fade28bea203e14691baa67a93645376 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sun, 24 Nov 2024 14:53:21 -0500 Subject: [PATCH 04/13] Fix windows pinned-actor test linking error --- test/full-program-tests/pinned-actor/main.pony | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/full-program-tests/pinned-actor/main.pony b/test/full-program-tests/pinned-actor/main.pony index b8a6fe7737..d5999b9262 100644 --- a/test/full-program-tests/pinned-actor/main.pony +++ b/test/full-program-tests/pinned-actor/main.pony @@ -1,6 +1,4 @@ use @pony_exitcode[None](code: I32) -use @pony_ctx[Pointer[None]]() -use @pony_sched_index[I32](ctx: Pointer[None]) use "actor_pinning" use "time" @@ -13,15 +11,12 @@ actor Main _env = env _auth = PinUnpinActorAuth(env.root) ActorPinning.pin(_auth) - _env.out.print("initializing... sched index: " + @pony_sched_index(@pony_ctx()).string()) let interval: U64 = (10 * 1_000_000_000) / 10 let timers = Timers let timer = Timer(Tick(this, 1), interval, interval) timers(consume timer) be check_pinned() => - let sched: I32 = @pony_sched_index(@pony_ctx()) - _env.out.print("sched index: " + sched.string()) if ActorPinning.is_successfully_pinned(_auth) then @pony_exitcode(5) else From 450bfde64ab90c668da9db8fd4f0fdfe39197731 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Mon, 25 Nov 2024 10:58:40 -0500 Subject: [PATCH 05/13] Add greedy actor caveat to actor pinning package --- packages/actor_pinning/actor_pinning.pony | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony index f3f3f8c59b..61048e9f1e 100644 --- a/packages/actor_pinning/actor_pinning.pony +++ b/packages/actor_pinning/actor_pinning.pony @@ -59,7 +59,11 @@ cycle detector reaps pinned actors it allows the runtime to reach quiescence. If the cycle detector is unable to reap all pinned actors or if the cycle detector is disabled (via `--ponynoblock`), it is up to the programmer to manually `unpin` all pinned actors or else the runtime will be unable to reach -quiescence and the program will never terminate. +quiescence and the program will never terminate. Additionally, due to the fact +that Pony uses cooperative scheduling of actors and that all pinned actors run +on a single shared scheduler thread, any "greedy" actors that monopolize the +cpu (with long running behaviors) will negatively inmpact all other pinned +actors by starving them of cpu. """ use @pony_actor_set_pinned[None]() From 334d9981af00a9856b56d3cd8a00b251e3c884a0 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Mon, 25 Nov 2024 10:59:11 -0500 Subject: [PATCH 06/13] Add release notes --- .release-notes/4547.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .release-notes/4547.md diff --git a/.release-notes/4547.md b/.release-notes/4547.md new file mode 100644 index 0000000000..04573e30fd --- /dev/null +++ b/.release-notes/4547.md @@ -0,0 +1,7 @@ +## Add support for pinning actors to a dedicated scheduler thread + +Pony programmers can now pin actors to a dedicated scheduler thread. This can be required/used for interfacing with C libraries that rely on thread local storage. A common example of this is graphics/windowing libraries. + +The way it works is that an actor can request that it be pinned and then check to confirm that the pinning was successfully applied after which all subsequent behaviors on that actor will run on the same scheduler thread until the actor is destroyed or the actor requests to be unpinned. + +Additional details can be found in the `actor_pinning` package in the standard library. \ No newline at end of file From 80aa0989029b84831288d1616bba0e3d4f81ce34 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Mon, 25 Nov 2024 19:34:43 -0500 Subject: [PATCH 07/13] Rename `pin` to `request_pin` and `unpin` to `request_unpin` and also update the package documentation to clarify that `pinning` is not an immediate action --- packages/actor_pinning/actor_pinning.pony | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony index 61048e9f1e..2652d30f11 100644 --- a/packages/actor_pinning/actor_pinning.pony +++ b/packages/actor_pinning/actor_pinning.pony @@ -6,10 +6,12 @@ scheduler thread. This can be required/used for interfacing with C libraries that rely on thread local storage. A common example of this is graphics/windowing libraries. -The way it works is that an actor can request that it be pinned and then check -to confirm that the pinning was successfully applied after which all subsequent -behaviors on that actor will run on the same scheduler thread until the actor is -destroyed or the actor requests to be unpinned. +The way it works is that an actor can request that it be pinned (which may or +may not happen immediately) and then it must wait and check to confirm that the +pinning was successfully applied (prior to running any workload that required the +actor to be pinned) after which all subsequent behaviors on that actor will run +on the same scheduler thread until the actor is destroyed or the actor requests +to be unpinned. ## Example program @@ -71,10 +73,10 @@ use @pony_actor_unset_pinned[None]() use @pony_scheduler_index[I32]() primitive ActorPinning - fun pin(auth: PinUnpinActorAuth) => + fun request_pin(auth: PinUnpinActorAuth) => @pony_actor_set_pinned() - fun unpin(auth: PinUnpinActorAuth) => + fun request_unpin(auth: PinUnpinActorAuth) => @pony_actor_unset_pinned() fun is_successfully_pinned(auth: PinUnpinActorAuth): Bool => From 0f787fa8b44a3802c4659342de14efd143614a15 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Mon, 25 Nov 2024 19:35:38 -0500 Subject: [PATCH 08/13] update release notes --- .release-notes/4547.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.release-notes/4547.md b/.release-notes/4547.md index 04573e30fd..fd47120a6e 100644 --- a/.release-notes/4547.md +++ b/.release-notes/4547.md @@ -2,6 +2,6 @@ Pony programmers can now pin actors to a dedicated scheduler thread. This can be required/used for interfacing with C libraries that rely on thread local storage. A common example of this is graphics/windowing libraries. -The way it works is that an actor can request that it be pinned and then check to confirm that the pinning was successfully applied after which all subsequent behaviors on that actor will run on the same scheduler thread until the actor is destroyed or the actor requests to be unpinned. +The way it works is that an actor can request that it be pinned (which may or may not happen immediately) and then it must wait and check to confirm that the pinning was successfully applied (prior to running any workload that required the actor to be pinned) after which all subsequent behaviors on that actor will run on the same scheduler thread until the actor is destroyed or the actor requests to be unpinned. Additional details can be found in the `actor_pinning` package in the standard library. \ No newline at end of file From 189151bc67864f945a3911b5de04145db7932dd5 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Mon, 25 Nov 2024 19:45:32 -0500 Subject: [PATCH 09/13] Use correct function names now that they've been renamed --- packages/actor_pinning/actor_pinning.pony | 4 ++-- test/full-program-tests/pinned-actor/main.pony | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony index 2652d30f11..69f9e3c96e 100644 --- a/packages/actor_pinning/actor_pinning.pony +++ b/packages/actor_pinning/actor_pinning.pony @@ -32,7 +32,7 @@ actor Main new create(env: Env) => _env = env _auth = PinUnpinActorAuth(env.root) - ActorPinning.pin(_auth) + ActorPinning.request_pin(_auth) check_pinned() be check_pinned() => @@ -51,7 +51,7 @@ actor Main end be done() => - ActorPinning.unpin(_auth) + ActorPinning.request_unpin(_auth) ``` ## Caveat diff --git a/test/full-program-tests/pinned-actor/main.pony b/test/full-program-tests/pinned-actor/main.pony index d5999b9262..d5e9cd73e3 100644 --- a/test/full-program-tests/pinned-actor/main.pony +++ b/test/full-program-tests/pinned-actor/main.pony @@ -10,7 +10,7 @@ actor Main new create(env: Env) => _env = env _auth = PinUnpinActorAuth(env.root) - ActorPinning.pin(_auth) + ActorPinning.request_pin(_auth) let interval: U64 = (10 * 1_000_000_000) / 10 let timers = Timers let timer = Timer(Tick(this, 1), interval, interval) From 76a48fa63f2408926d824ab2313cbdb2d940ea3c Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sat, 30 Nov 2024 22:51:38 -0500 Subject: [PATCH 10/13] update caveat Co-authored-by: Sean T Allen --- packages/actor_pinning/actor_pinning.pony | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony index 69f9e3c96e..650afb0e02 100644 --- a/packages/actor_pinning/actor_pinning.pony +++ b/packages/actor_pinning/actor_pinning.pony @@ -56,12 +56,23 @@ actor Main ## Caveat -Pinned actors could prevent the runtime from reaching quiescence. When the -cycle detector reaps pinned actors it allows the runtime to reach quiescence. -If the cycle detector is unable to reap all pinned actors or if the cycle -detector is disabled (via `--ponynoblock`), it is up to the programmer to -manually `unpin` all pinned actors or else the runtime will be unable to reach -quiescence and the program will never terminate. Additionally, due to the fact +Actor pinning is an experimental feature; use with caution. + +The current implementation of pinning actors can interact in surprising ways with program +shutdown. If there are any live pinned actors in the program, it will continue to run even if +there is no additional work to do. + +Actors that are garbage collected will be unpinned. This includes actors that are garbage +collected by the cycle detector. + +At this time, using pinned actors is an advanced feature that requires knowledge of runtime to +use safely. We strongly suggest that as part of your program design, that you make sure that +you have a process for unpinning any actors that you have pinned. Designing actor unpinning +into your program will avoid any issues with program shutdown. + +As we further implement actor pinning, we hope to remove this caveat. + +Additionally, due to the fact that Pony uses cooperative scheduling of actors and that all pinned actors run on a single shared scheduler thread, any "greedy" actors that monopolize the cpu (with long running behaviors) will negatively inmpact all other pinned From 417b55d5a6443241245cf8ecb23855480793fb31 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Sat, 30 Nov 2024 22:56:40 -0500 Subject: [PATCH 11/13] Fatten up the release notes --- .release-notes/4547.md | 55 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/.release-notes/4547.md b/.release-notes/4547.md index fd47120a6e..a1854313be 100644 --- a/.release-notes/4547.md +++ b/.release-notes/4547.md @@ -4,4 +4,57 @@ Pony programmers can now pin actors to a dedicated scheduler thread. This can b The way it works is that an actor can request that it be pinned (which may or may not happen immediately) and then it must wait and check to confirm that the pinning was successfully applied (prior to running any workload that required the actor to be pinned) after which all subsequent behaviors on that actor will run on the same scheduler thread until the actor is destroyed or the actor requests to be unpinned. -Additional details can be found in the `actor_pinning` package in the standard library. \ No newline at end of file +### Caveat + +Actor pinning is an experimental feature; use with caution. + +The current implementation of pinning actors can interact in surprising ways with program shutdown. If there are any live pinned actors in the program, it will continue to run even if there is no additional work to do. + +Actors that are garbage collected will be unpinned. This includes actors that are garbage collected by the cycle detector. + +At this time, using pinned actors is an advanced feature that requires knowledge of runtime to use safely. We strongly suggest that as part of your program design, that you make sure that you have a process for unpinning any actors that you have pinned. Designing actor unpinning into your program will avoid any issues with program shutdown. + +As we further implement actor pinning, we hope to remove this caveat. + +Additionally, due to the fact that Pony uses cooperative scheduling of actors and that all pinned actors run on a single shared scheduler thread, any "greedy" actors that monopolize the cpu (with long running behaviors) will negatively inmpact all other pinned actors by starving them of cpu. + +### Example program + +```pony +// Here we have the Main actor that upon construction requests a PinUnpinActorAuth +// token from AmbientAuth and then requests that it be pinned. It then recursively +// calls the `check_pinned` behavior until the runtime reports that it has +// successfully been pinned after which it starts `do_stuff` to do whatever +// work it needs to do that requires it to be pinned. Once it has completed all +// of its work, it calls `done` to request that the runtime `unpin` it. + +use "actor_pinning" + +actor Main + let _env: Env + let _auth: PinUnpinActorAuth + + new create(env: Env) => + _env = env + _auth = PinUnpinActorAuth(env.root) + ActorPinning.request_pin(_auth) + check_pinned() + + be check_pinned() => + if ActorPinning.is_successfully_pinned(_auth) then + // do stuff that requires this actor to be pinned + do_stuff(10) + else + check_pinned() + end + + be do_stuff(i: I32) => + if i < 0 then + done() + else + do_stuff(i - 1) + end + + be done() => + ActorPinning.request_unpin(_auth) +``` From 8c338ae76552bad5e49d108d5566a0200dd31df6 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Tue, 3 Dec 2024 21:31:20 -0500 Subject: [PATCH 12/13] make pinned actor thread participate in CNF/ACK for termination --- .release-notes/4547.md | 12 +-- packages/actor_pinning/actor_pinning.pony | 25 +----- src/libponyrt/actor/actor.c | 18 +--- src/libponyrt/sched/scheduler.c | 105 ++++++++++------------ src/libponyrt/sched/scheduler.h | 4 - 5 files changed, 54 insertions(+), 110 deletions(-) diff --git a/.release-notes/4547.md b/.release-notes/4547.md index a1854313be..69efb039ac 100644 --- a/.release-notes/4547.md +++ b/.release-notes/4547.md @@ -6,17 +6,7 @@ The way it works is that an actor can request that it be pinned (which may or ma ### Caveat -Actor pinning is an experimental feature; use with caution. - -The current implementation of pinning actors can interact in surprising ways with program shutdown. If there are any live pinned actors in the program, it will continue to run even if there is no additional work to do. - -Actors that are garbage collected will be unpinned. This includes actors that are garbage collected by the cycle detector. - -At this time, using pinned actors is an advanced feature that requires knowledge of runtime to use safely. We strongly suggest that as part of your program design, that you make sure that you have a process for unpinning any actors that you have pinned. Designing actor unpinning into your program will avoid any issues with program shutdown. - -As we further implement actor pinning, we hope to remove this caveat. - -Additionally, due to the fact that Pony uses cooperative scheduling of actors and that all pinned actors run on a single shared scheduler thread, any "greedy" actors that monopolize the cpu (with long running behaviors) will negatively inmpact all other pinned actors by starving them of cpu. +Due to the fact that Pony uses cooperative scheduling of actors and that all pinned actors run on a single shared scheduler thread, any "greedy" actors that monopolize the cpu (with long running behaviors) will negatively inmpact all other pinned actors by starving them of cpu. ### Example program diff --git a/packages/actor_pinning/actor_pinning.pony b/packages/actor_pinning/actor_pinning.pony index 650afb0e02..a37dd912cb 100644 --- a/packages/actor_pinning/actor_pinning.pony +++ b/packages/actor_pinning/actor_pinning.pony @@ -56,27 +56,10 @@ actor Main ## Caveat -Actor pinning is an experimental feature; use with caution. - -The current implementation of pinning actors can interact in surprising ways with program -shutdown. If there are any live pinned actors in the program, it will continue to run even if -there is no additional work to do. - -Actors that are garbage collected will be unpinned. This includes actors that are garbage -collected by the cycle detector. - -At this time, using pinned actors is an advanced feature that requires knowledge of runtime to -use safely. We strongly suggest that as part of your program design, that you make sure that -you have a process for unpinning any actors that you have pinned. Designing actor unpinning -into your program will avoid any issues with program shutdown. - -As we further implement actor pinning, we hope to remove this caveat. - -Additionally, due to the fact -that Pony uses cooperative scheduling of actors and that all pinned actors run -on a single shared scheduler thread, any "greedy" actors that monopolize the -cpu (with long running behaviors) will negatively inmpact all other pinned -actors by starving them of cpu. +Due to the fact that Pony uses cooperative scheduling of actors and that all +pinned actors run on a single shared scheduler thread, any "greedy" actors that +monopolize the cpu (with long running behaviors) will negatively inmpact all +other pinned actors by starving them of cpu. """ use @pony_actor_set_pinned[None]() diff --git a/src/libponyrt/actor/actor.c b/src/libponyrt/actor/actor.c index fcc9e3a449..05e777fc47 100644 --- a/src/libponyrt/actor/actor.c +++ b/src/libponyrt/actor/actor.c @@ -815,12 +815,6 @@ void ponyint_actor_destroy(pony_actor_t* actor) print_actor_stats(actor); #endif - if(ponyint_actor_is_pinned(actor)) - { - unset_internal_flag(actor, FLAG_PINNED); - ponyint_sched_decrement_pinned_actor_count(); - } - // Free variable sized actors correctly. ponyint_pool_free_size(actor->type->size, actor); } @@ -1234,21 +1228,13 @@ bool ponyint_actor_is_pinned(pony_actor_t* actor) PONY_API void pony_actor_set_pinned() { pony_ctx_t* ctx = pony_ctx(); - if(!ponyint_actor_is_pinned(ctx->current)) - { - set_internal_flag(ctx->current, FLAG_PINNED); - ponyint_sched_increment_pinned_actor_count(); - } + set_internal_flag(ctx->current, FLAG_PINNED); } PONY_API void pony_actor_unset_pinned() { pony_ctx_t* ctx = pony_ctx(); - if(ponyint_actor_is_pinned(ctx->current)) - { - unset_internal_flag(ctx->current, FLAG_PINNED); - ponyint_sched_decrement_pinned_actor_count(); - } + unset_internal_flag(ctx->current, FLAG_PINNED); } void ponyint_become(pony_ctx_t* ctx, pony_actor_t* actor) diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index d3f9dd049d..3eec3da71c 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -50,7 +50,6 @@ static mpmcq_t inject; static PONY_ATOMIC(bool) pinned_actor_scheduler_suspended; static PONY_ATOMIC(bool) pinned_actor_scheduler_suspended_check; static scheduler_t* pinned_actor_scheduler; -static PONY_ATOMIC(uint32_t) pinned_actor_count; static __pony_thread_local scheduler_t* this_scheduler; #if defined(USE_SCHEDULER_SCALING_PTHREADS) @@ -163,23 +162,6 @@ static uint32_t get_active_scheduler_count_check() return atomic_load_explicit(&active_scheduler_count_check, memory_order_relaxed); } -static uint32_t get_pinned_actor_count() -{ - return atomic_load_explicit(&pinned_actor_count, memory_order_relaxed); -} - -void ponyint_sched_increment_pinned_actor_count() -{ - atomic_fetch_add_explicit(&pinned_actor_count, 1, memory_order_relaxed); -} - -void ponyint_sched_decrement_pinned_actor_count() -{ - uint32_t old = atomic_fetch_sub_explicit(&pinned_actor_count, 1, memory_order_relaxed); - pony_assert(0 < old); - (void)old; -} - /** * Gets the whether dynamic scheduler scaling is temporarily disabled */ @@ -321,44 +303,47 @@ static void signal_suspended_pinned_actor_thread() static void wake_suspended_pinned_actor_thread() { -#if defined(USE_SCHEDULER_SCALING_PTHREADS) - // acquire mutex if using pthreads - if(!pthread_mutex_lock(&sched_mut)) -#else - // get the bool that controls modifying the active scheduler count variable - // if using signals - if(!atomic_load_explicit(&scheduler_count_changing, memory_order_relaxed) - && !atomic_exchange_explicit(&scheduler_count_changing, true, - memory_order_acquire)) -#endif + while(atomic_load_explicit(&pinned_actor_scheduler_suspended_check, memory_order_relaxed)) { - atomic_store_explicit(&pinned_actor_scheduler_suspended, false, memory_order_relaxed); - -#if !defined(USE_SCHEDULER_SCALING_PTHREADS) - // unlock the bool that controls modifying the active scheduler count - // variable if using signals. - atomic_store_explicit(&scheduler_count_changing, false, - memory_order_release); -#endif + #if defined(USE_SCHEDULER_SCALING_PTHREADS) + // acquire mutex if using pthreads + if(!pthread_mutex_lock(&sched_mut)) + #else + // get the bool that controls modifying the active scheduler count variable + // if using signals + if(!atomic_load_explicit(&scheduler_count_changing, memory_order_relaxed) + && !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire)) + #endif + { + atomic_store_explicit(&pinned_actor_scheduler_suspended, false, memory_order_relaxed); -#if defined(USE_SCHEDULER_SCALING_PTHREADS) - // unlock mutex if using pthreads - pthread_mutex_unlock(&sched_mut); -#endif - } + #if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals. + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); + #endif - signal_suspended_pinned_actor_thread(); + #if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); + #endif + } - // wait for the sleeping thread to wake and update check variable - while(atomic_load_explicit(&pinned_actor_scheduler_suspended_check, memory_order_relaxed)) - { - // send signals to the pinned actor scheduler thread that should be awake - // this is somewhat wasteful if the scheduler thread is already awake - // but is necessary in case the signal to wake the thread was missed - // NOTE: this intentionally allows for the case where the scheduler - // thread might miss the signal and not wake up. That is handled - // by a combination of the check variable and this while loop signal_suspended_pinned_actor_thread(); + + // wait for the sleeping thread to wake and update check variable + while(atomic_load_explicit(&pinned_actor_scheduler_suspended_check, memory_order_relaxed)) + { + // send signals to the pinned actor scheduler thread that should be awake + // this is somewhat wasteful if the scheduler thread is already awake + // but is necessary in case the signal to wake the thread was missed + // NOTE: this intentionally allows for the case where the scheduler + // thread might miss the signal and not wake up. That is handled + // by a combination of the check variable and this while loop + signal_suspended_pinned_actor_thread(); + } } } @@ -446,8 +431,10 @@ static void handle_sched_block(scheduler_t* sched) sched->block_count == scheduler_count) { // If we think all threads are blocked, send CNF(token) to everyone. - sched->ack_count = scheduler_count; + // and to the pinned actor thread + sched->ack_count = scheduler_count + 1; send_msg_all(sched->index, SCHED_CNF, sched->ack_token); + send_msg_pinned_actor_thread(sched->index, SCHED_CNF, sched->ack_token); // disable dynamic scheduler scaling since we need all scheulder awake // for shutdown and a scheduler suspending during this process is @@ -518,7 +505,6 @@ static bool read_msg(scheduler_t* sched) case SCHED_CNF: { pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); - pony_assert(PONY_PINNED_ACTOR_THREAD_INDEX != sched->index); // Echo the token back as ACK(token). send_msg(sched->index, 0, SCHED_ACK, m->i); @@ -600,8 +586,8 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) // only scheduler 0 can initiate shutdown (it is the ony that gets all the // ACK messages as part of the CNF/ACK coordination for shutdown) // only if there are no noisy actors registered with the ASIO subsystem - // and the mutemap is empty... and no pinned actors in the system... - if(0 == sched->index && !sched->asio_noisy && ponyint_mutemap_size(&sched->mute_mapping) == 0 && 0 == get_pinned_actor_count()) + // and the mutemap is empty... + if(0 == sched->index && !sched->asio_noisy && ponyint_mutemap_size(&sched->mute_mapping) == 0) { // 0 means that all schedulers have ACK'd and we can proceed with shutdown.. // if any scheduler threads block/unblock before we get ACKs from them all @@ -633,8 +619,11 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) sched->ack_token++; // Run another CNF/ACK cycle. - sched->ack_count = scheduler_count; + // send CNF(token) to everyone. + // and to the pinned actor thread + sched->ack_count = scheduler_count + 1; send_msg_all(sched->index, SCHED_CNF, sched->ack_token); + send_msg_pinned_actor_thread(sched->index, SCHED_CNF, sched->ack_token); } else { // reset ack_token/count for shutdown coordination sched->ack_token++; @@ -1293,7 +1282,9 @@ static void perhaps_suspend_pinned_actor_scheduler( scheduler_t* sched, uint64_t tsc, uint64_t tsc2) { // if we're not terminating + // and dynamic scheduler scaling is not disabled for shutdown if ((!sched->terminate) + && !get_temporarily_disable_scheduler_scaling() #if defined(USE_SCHEDULER_SCALING_PTHREADS) // try to acquire mutex if using pthreads && !pthread_mutex_trylock(&sched_mut) @@ -1572,7 +1563,6 @@ static void ponyint_sched_shutdown() scheduler_count = 0; atomic_store_explicit(&active_scheduler_count, 0, memory_order_relaxed); atomic_store_explicit(&temporarily_disable_scheduler_scaling, false, memory_order_relaxed); - atomic_store_explicit(&pinned_actor_count, 0, memory_order_relaxed); ponyint_mpmcq_destroy(&inject); } @@ -1629,7 +1619,6 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, atomic_store_explicit(&active_scheduler_count_check, scheduler_count, memory_order_relaxed); atomic_store_explicit(&temporarily_disable_scheduler_scaling, false, memory_order_relaxed); - atomic_store_explicit(&pinned_actor_count, 0, memory_order_relaxed); scheduler = (scheduler_t*)ponyint_pool_alloc_size( scheduler_count * sizeof(scheduler_t)); #ifdef USE_RUNTIMESTATS diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 3a54b72a9f..764397cc1c 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -125,10 +125,6 @@ bool ponyint_sched_start(bool library); void ponyint_sched_stop(); -void ponyint_sched_increment_pinned_actor_count(); - -void ponyint_sched_decrement_pinned_actor_count(); - void ponyint_sched_add(pony_ctx_t* ctx, pony_actor_t* actor); void ponyint_sched_add_inject(pony_actor_t* actor); From 2dfa7d7029bc27c6c7dd72e555680709ee862e65 Mon Sep 17 00:00:00 2001 From: Dipin Hora Date: Tue, 3 Dec 2024 21:39:27 -0500 Subject: [PATCH 13/13] add option to pin pinned actor thread --- packages/builtin/runtime_options.pony | 6 ++++++ src/libponyrt/options/options.h | 4 ++++ src/libponyrt/sched/cpu.c | 18 +++++++++++++++--- src/libponyrt/sched/cpu.h | 2 +- src/libponyrt/sched/scheduler.c | 4 ++-- src/libponyrt/sched/scheduler.h | 2 +- src/libponyrt/sched/start.c | 8 ++++++-- 7 files changed, 35 insertions(+), 9 deletions(-) diff --git a/packages/builtin/runtime_options.pony b/packages/builtin/runtime_options.pony index 1dd803f14b..81e9c4ddf8 100644 --- a/packages/builtin/runtime_options.pony +++ b/packages/builtin/runtime_options.pony @@ -104,6 +104,12 @@ struct RuntimeOptions Requires `--ponypin` to be set to have any effect. """ + var ponypinpinnedactorthread: Bool = false + """ + Pin the pinned actor thread to a CPU the way scheduler threads are pinned to CPUs. + Requires `--ponypin` to be set to have any effect. + """ + var ponyprintstatsinterval: U32 = -1 """ Print actor stats before an actor is destroyed and print scheduler stats diff --git a/src/libponyrt/options/options.h b/src/libponyrt/options/options.h index 6055a827c6..b1509531c1 100644 --- a/src/libponyrt/options/options.h +++ b/src/libponyrt/options/options.h @@ -43,6 +43,10 @@ " --ponypinasio Pin the ASIO thread to a CPU the way scheduler\n" \ " threads are pinned to CPUs. Requires `--ponypin` to\n" \ " be set to have any effect.\n" \ + " --ponypinpinnedactorthread\n" \ + " Pin the pinned actor thread to a CPU the way scheduler\n" \ + " threads are pinned to CPUs. Requires `--ponypin` to\n" \ + " be set to have any effect.\n" \ " --ponyprintstatsinterval\n" \ " Print actor stats before an actor is destroyed and\n" \ " print scheduler stats every X seconds. Defaults to -1 (never).\n" \ diff --git a/src/libponyrt/sched/cpu.c b/src/libponyrt/sched/cpu.c index d8737dce1e..fe300d751b 100644 --- a/src/libponyrt/sched/cpu.c +++ b/src/libponyrt/sched/cpu.c @@ -227,9 +227,10 @@ uint32_t ponyint_cpu_count() } uint32_t ponyint_cpu_assign(uint32_t count, scheduler_t* scheduler, - bool pin, bool pinasio) + bool pin, bool pinasio, bool pinpat) { uint32_t asio_cpu = -1; + uint32_t pat_cpu = -1; if(!pin) { @@ -255,11 +256,12 @@ uint32_t ponyint_cpu_assign(uint32_t count, scheduler_t* scheduler, if(pinasio) asio_cpu = avail_cpu_list[count % avail_cpu_count]; + if(pinpat) + pat_cpu = avail_cpu_list[(count + 1) % avail_cpu_count]; + ponyint_pool_free_size(avail_cpu_size * sizeof(uint32_t), avail_cpu_list); avail_cpu_list = NULL; avail_cpu_count = avail_cpu_size = 0; - - return asio_cpu; #elif defined(PLATFORM_IS_BSD) // FreeBSD does not currently do thread pinning, as we can't yet determine // which cores are hyperthreads. @@ -269,6 +271,9 @@ uint32_t ponyint_cpu_assign(uint32_t count, scheduler_t* scheduler, if(pinasio) asio_cpu = count % hw_cpu_count; + if(pinpat) + pat_cpu = (count + 1) % hw_cpu_count; + for(uint32_t i = 0; i < count; i++) { scheduler[i].cpu = i % hw_cpu_count; @@ -286,8 +291,15 @@ uint32_t ponyint_cpu_assign(uint32_t count, scheduler_t* scheduler, // asio_cpu of -1 if(pinasio) asio_cpu = count; + + if(pinpat) + pat_cpu = (count + 1); #endif + // set the affinity of the current thread (nain thread) which is the pinned + // actor thread + ponyint_cpu_affinity(pat_cpu); + return asio_cpu; } diff --git a/src/libponyrt/sched/cpu.h b/src/libponyrt/sched/cpu.h index f29f8ea67f..36eb450265 100644 --- a/src/libponyrt/sched/cpu.h +++ b/src/libponyrt/sched/cpu.h @@ -13,7 +13,7 @@ void ponyint_cpu_init(); uint32_t ponyint_cpu_count(); uint32_t ponyint_cpu_assign(uint32_t count, scheduler_t* scheduler, - bool nopin, bool pinasio); + bool nopin, bool pinasio, bool pinpat); void ponyint_cpu_affinity(uint32_t cpu); diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 3eec3da71c..730bc34250 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -1568,7 +1568,7 @@ static void ponyint_sched_shutdown() } pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, - bool pinasio, uint32_t min_threads, uint32_t thread_suspend_threshold, + bool pinasio, bool pinpat, uint32_t min_threads, uint32_t thread_suspend_threshold, uint32_t stats_interval #if defined(USE_SYSTEMATIC_TESTING) , uint64_t systematic_testing_seed) @@ -1629,7 +1629,7 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, memset(scheduler, 0, scheduler_count * sizeof(scheduler_t)); uint32_t asio_cpu = ponyint_cpu_assign(scheduler_count, scheduler, pin, - pinasio); + pinasio, pinpat); #if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) pthread_once(&sched_mut_once, sched_mut_init); diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 764397cc1c..4fefd6c051 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -113,7 +113,7 @@ struct scheduler_t }; pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, - bool pinasio, uint32_t min_threads, uint32_t thread_suspend_threshold, + bool pinasio, bool pinpat, uint32_t min_threads, uint32_t thread_suspend_threshold, uint32_t stats_interval #if defined(USE_SYSTEMATIC_TESTING) , uint64_t systematic_testing_seed); diff --git a/src/libponyrt/sched/start.c b/src/libponyrt/sched/start.c index 3744b069b2..a7acca53fa 100644 --- a/src/libponyrt/sched/start.c +++ b/src/libponyrt/sched/start.c @@ -33,12 +33,13 @@ typedef struct options_t bool noblock; bool pin; bool pinasio; + bool pinpat; uint32_t stats_interval; bool version; + bool ponyhelp; #if defined(USE_SYSTEMATIC_TESTING) uint64_t systematic_testing_seed; #endif - bool ponyhelp; } options_t; typedef enum running_kind_t @@ -68,6 +69,7 @@ enum OPT_NOBLOCK, OPT_PIN, OPT_PINASIO, + OPT_PINPAT, OPT_STATSINTERVAL, OPT_VERSION, #if defined(USE_SYSTEMATIC_TESTING) @@ -89,6 +91,7 @@ static opt_arg_t args[] = {"ponynoblock", 0, OPT_ARG_NONE, OPT_NOBLOCK}, {"ponypin", 0, OPT_ARG_NONE, OPT_PIN}, {"ponypinasio", 0, OPT_ARG_NONE, OPT_PINASIO}, + {"ponypinpinnedactorthread", 0, OPT_ARG_NONE, OPT_PINPAT}, {"ponyprintstatsinterval", 0, OPT_ARG_REQUIRED, OPT_STATSINTERVAL}, {"ponyversion", 0, OPT_ARG_NONE, OPT_VERSION}, #if defined(USE_SYSTEMATIC_TESTING) @@ -181,6 +184,7 @@ static int parse_opts(int argc, char** argv, options_t* opt) case OPT_NOBLOCK: opt->noblock = true; break; case OPT_PIN: opt->pin = true; break; case OPT_PINASIO: opt->pinasio = true; break; + case OPT_PINPAT: opt->pinpat = true; break; case OPT_STATSINTERVAL: if(parse_uint(&opt->stats_interval, 1, s.arg_val)) err_out(id, "can't be less than 1 second"); break; case OPT_VERSION: opt->version = true; break; #if defined(USE_SYSTEMATIC_TESTING) @@ -293,7 +297,7 @@ PONY_API int pony_init(int argc, char** argv) pony_exitcode(0); pony_ctx_t* ctx = ponyint_sched_init(opt.threads, opt.noyield, opt.pin, - opt.pinasio, opt.min_threads, opt.thread_suspend_threshold, opt.stats_interval + opt.pinasio, opt.pinpat, opt.min_threads, opt.thread_suspend_threshold, opt.stats_interval #if defined(USE_SYSTEMATIC_TESTING) , opt.systematic_testing_seed); #else