diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 29003a5e8e..93d9fb41cd 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -215,12 +215,14 @@ static void send_msg(uint32_t from, uint32_t to, sched_msg_t msg, intptr_t arg) (void)from; } -static void send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg) +static uint32_t send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg) { uint32_t current_active_scheduler_count = get_active_scheduler_count(); for(uint32_t i = 0; i < current_active_scheduler_count; i++) send_msg(from, i, msg, arg); + + return current_active_scheduler_count; } static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg) @@ -310,21 +312,28 @@ static void wake_suspended_threads(int32_t current_scheduler_id) } // start cnf/ack cycle for quiescence if block count >= active_scheduler_count +// only if there are no noisy actors subscribed with the ASIO subsystem static void maybe_start_cnf_ack_cycle(scheduler_t* sched) { - if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && + // reset ack token because dynamic scheduler scaling means + // that a new thread can wake up changing active_scheduler_count and + // then block causing block_count >= active_scheduler_count for a + // second time and if we don't reset, we can think we've received + // enough acks when we really haven't + sched->ack_token++; + + if(!sched->asio_noisy && + atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && (sched->block_count >= get_active_scheduler_count())) { - // reset ack token count to 0 because dynamic scheduler scaling means - // that a new thread can wake up changing active_scheduler_count and - // then block causing block_count >= active_scheduler_count for a - // second time and if we don't reset, we can think we've received - // enough acks when we really haven't - sched->ack_token++; - sched->ack_count = 0; - // If we think all threads are blocked, send CNF(token) to everyone. - send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + // save the # of active schedulers to expect ACK's from + sched->ack_count = send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + pony_assert(sched->ack_count > 0); + } else { + // reset ack count + sched->ack_count = scheduler_count; + pony_assert(sched->ack_count > 0); } } @@ -342,7 +351,8 @@ static void handle_sched_unblock(scheduler_t* sched) // acks in the queue will be dropped when they are received. sched->block_count--; sched->ack_token++; - sched->ack_count = 0; + sched->ack_count = scheduler_count; + pony_assert(sched->ack_count > 0); } static bool read_msg(scheduler_t* sched) @@ -372,21 +382,21 @@ static bool read_msg(scheduler_t* sched) { case SCHED_SUSPEND: { - pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(0 == sched->index); maybe_start_cnf_ack_cycle(sched); break; } case SCHED_BLOCK: { - pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(0 == sched->index); handle_sched_block(sched); break; } case SCHED_UNBLOCK: { - pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(0 == sched->index); handle_sched_unblock(sched); break; } @@ -395,18 +405,20 @@ static bool read_msg(scheduler_t* sched) { pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); - // Echo the token back as ACK(token). - send_msg(sched->index, 0, SCHED_ACK, m->i); + // Echo the token back as ACK(token). Only if it's safe to terminate. + if(!sched->asio_noisy) + send_msg(sched->index, 0, SCHED_ACK, m->i); break; } case SCHED_ACK: { - pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + pony_assert(0 == sched->index); - // If it's the current token, increment the ack count. - if(m->i == sched->ack_token) - sched->ack_count++; + // If it's the current token, decrement the ack count for # of schedulers + // to expect an ACK from. Only if it's safe to terminate. + if(!sched->asio_noisy && (m->i == sched->ack_token)) + sched->ack_count--; break; } @@ -470,11 +482,16 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) // only scheduler 0 can initiate shutdown (it is the ony that gets all the // ACK messages as part of the CNF/ACK coordination for shutdown) - if(0 == sched->index) + // only if there are no noisy actors registered with the ASIO subsystem + if(0 == sched->index && !sched->asio_noisy) { - uint32_t current_active_scheduler_count = get_active_scheduler_count(); - - if(sched->ack_count >= current_active_scheduler_count) + // 0 means that all active schedulers at the time the CNF/ACK coordination + // was started have ACK'd and we can proceed with shutdown.. if any scheduler + // threads block/unblock/suspend before we get ACKs from them all then then + // the ack_token is incremented and the ack_count is reset to the # of active + // scheduler threads at that time and we start the countdown to + // `ack_count == 0` all over again + if(0 == sched->ack_count) { // mark cycle_detector to pause // this is required to ensure scheduler queues are empty @@ -490,14 +507,16 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) wake_suspended_threads(sched->index); sched->ack_token++; - sched->ack_count = 0; + sched->ack_count = scheduler_count; + pony_assert(sched->ack_count > 0); } else if(ponyint_asio_stoppable()) { sched->asio_stoppable = true; sched->ack_token++; - sched->ack_count = 0; - // Run another CNF/ACK cycle. - send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + // Run another CNF/ACK cycle. save the # of active schedulers to expect + // ACK's from + sched->ack_count = send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + pony_assert(sched->ack_count > 0); // re-enable cycle detector triggering pause_cycle_detection = false; @@ -712,9 +731,6 @@ static pony_actor_t* suspend_scheduler(scheduler_t* sched, atomic_store_explicit(&active_scheduler_count_check, sched_count_check + 1, memory_order_relaxed); - // ensure main active scheduler count and check variable match - // pony_assert(sched_count == sched_count_check); - #if !defined(USE_SCHEDULER_SCALING_PTHREADS) // unlock the bool that controls modifying the active scheduler count // variable if using signals @@ -1301,6 +1317,8 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin, scheduler[i].last_victim = &scheduler[i]; scheduler[i].index = i; scheduler[i].asio_noisy = false; + scheduler[i].ack_count = scheduler_count; + pony_assert(scheduler[i].ack_count > 0); ponyint_messageq_init(&scheduler[i].mq); ponyint_mpmcq_init(&scheduler[i].q); }