Skip to content

Commit

Permalink
Fix early quiescence/termination bug
Browse files Browse the repository at this point in the history
Prior to this commit, the logic to detect quiescence had a race
condition in relation to dynamic scheduler scaling and it was
possible for the runtime to incorrectly detect quiescence and
termninate early if a scheduler thread suspended at just the
right time.

This commit changes the quiescence logic to keep an accurate track
of exactly how many scheduler threads are active at the time the
quiescence detection protocol begins so it can ensure that any
scheduler threads suspending or unsuspending can no longer cause
an incorrect determination that might lead to early termination of
the runtime.
  • Loading branch information
dipinhora committed Nov 26, 2024
1 parent f0d7afd commit f275dba
Showing 1 changed file with 50 additions and 32 deletions.
82 changes: 50 additions & 32 deletions src/libponyrt/sched/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ static void send_msg(uint32_t from, uint32_t to, sched_msg_t msg, intptr_t arg)
(void)from;
}

static void send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg)
static uint32_t send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg)
{
uint32_t current_active_scheduler_count = get_active_scheduler_count();

for(uint32_t i = 0; i < current_active_scheduler_count; i++)
send_msg(from, i, msg, arg);

return current_active_scheduler_count;
}

static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg)
Expand Down Expand Up @@ -310,21 +312,28 @@ static void wake_suspended_threads(int32_t current_scheduler_id)
}

// start cnf/ack cycle for quiescence if block count >= active_scheduler_count
// only if there are no noisy actors subscribed with the ASIO subsystem
static void maybe_start_cnf_ack_cycle(scheduler_t* sched)
{
if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) &&
// reset ack token because dynamic scheduler scaling means
// that a new thread can wake up changing active_scheduler_count and
// then block causing block_count >= active_scheduler_count for a
// second time and if we don't reset, we can think we've received
// enough acks when we really haven't
sched->ack_token++;

if(!sched->asio_noisy &&
atomic_load_explicit(&detect_quiescence, memory_order_relaxed) &&
(sched->block_count >= get_active_scheduler_count()))
{
// reset ack token count to 0 because dynamic scheduler scaling means
// that a new thread can wake up changing active_scheduler_count and
// then block causing block_count >= active_scheduler_count for a
// second time and if we don't reset, we can think we've received
// enough acks when we really haven't
sched->ack_token++;
sched->ack_count = 0;

// If we think all threads are blocked, send CNF(token) to everyone.
send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token);
// save the # of active schedulers to expect ACK's from
sched->ack_count = send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token);
pony_assert(sched->ack_count > 0);
} else {
// reset ack count
sched->ack_count = scheduler_count;
pony_assert(sched->ack_count > 0);
}
}

Expand All @@ -342,7 +351,8 @@ static void handle_sched_unblock(scheduler_t* sched)
// acks in the queue will be dropped when they are received.
sched->block_count--;
sched->ack_token++;
sched->ack_count = 0;
sched->ack_count = scheduler_count;
pony_assert(sched->ack_count > 0);
}

static bool read_msg(scheduler_t* sched)
Expand Down Expand Up @@ -372,21 +382,21 @@ static bool read_msg(scheduler_t* sched)
{
case SCHED_SUSPEND:
{
pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index);
pony_assert(0 == sched->index);
maybe_start_cnf_ack_cycle(sched);
break;
}

case SCHED_BLOCK:
{
pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index);
pony_assert(0 == sched->index);
handle_sched_block(sched);
break;
}

case SCHED_UNBLOCK:
{
pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index);
pony_assert(0 == sched->index);
handle_sched_unblock(sched);
break;
}
Expand All @@ -395,18 +405,20 @@ static bool read_msg(scheduler_t* sched)
{
pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index);

// Echo the token back as ACK(token).
send_msg(sched->index, 0, SCHED_ACK, m->i);
// Echo the token back as ACK(token). Only if it's safe to terminate.
if(!sched->asio_noisy)
send_msg(sched->index, 0, SCHED_ACK, m->i);
break;
}

case SCHED_ACK:
{
pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index);
pony_assert(0 == sched->index);

// If it's the current token, increment the ack count.
if(m->i == sched->ack_token)
sched->ack_count++;
// If it's the current token, decrement the ack count for # of schedulers
// to expect an ACK from. Only if it's safe to terminate.
if(!sched->asio_noisy && (m->i == sched->ack_token))
sched->ack_count--;
break;
}

Expand Down Expand Up @@ -470,11 +482,16 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2)

// only scheduler 0 can initiate shutdown (it is the ony that gets all the
// ACK messages as part of the CNF/ACK coordination for shutdown)
if(0 == sched->index)
// only if there are no noisy actors registered with the ASIO subsystem
if(0 == sched->index && !sched->asio_noisy)
{
uint32_t current_active_scheduler_count = get_active_scheduler_count();

if(sched->ack_count >= current_active_scheduler_count)
// 0 means that all active schedulers at the time the CNF/ACK coordination
// was started have ACK'd and we can proceed with shutdown.. if any scheduler
// threads block/unblock/suspend before we get ACKs from them all then then
// the ack_token is incremented and the ack_count is reset to the # of active
// scheduler threads at that time and we start the countdown to
// `ack_count == 0` all over again
if(0 == sched->ack_count)
{
// mark cycle_detector to pause
// this is required to ensure scheduler queues are empty
Expand All @@ -490,14 +507,16 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2)
wake_suspended_threads(sched->index);

sched->ack_token++;
sched->ack_count = 0;
sched->ack_count = scheduler_count;
pony_assert(sched->ack_count > 0);
} else if(ponyint_asio_stoppable()) {
sched->asio_stoppable = true;
sched->ack_token++;
sched->ack_count = 0;

// Run another CNF/ACK cycle.
send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token);
// Run another CNF/ACK cycle. save the # of active schedulers to expect
// ACK's from
sched->ack_count = send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token);
pony_assert(sched->ack_count > 0);

// re-enable cycle detector triggering
pause_cycle_detection = false;
Expand Down Expand Up @@ -712,9 +731,6 @@ static pony_actor_t* suspend_scheduler(scheduler_t* sched,
atomic_store_explicit(&active_scheduler_count_check,
sched_count_check + 1, memory_order_relaxed);

// ensure main active scheduler count and check variable match
// pony_assert(sched_count == sched_count_check);

#if !defined(USE_SCHEDULER_SCALING_PTHREADS)
// unlock the bool that controls modifying the active scheduler count
// variable if using signals
Expand Down Expand Up @@ -1301,6 +1317,8 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin,
scheduler[i].last_victim = &scheduler[i];
scheduler[i].index = i;
scheduler[i].asio_noisy = false;
scheduler[i].ack_count = scheduler_count;
pony_assert(scheduler[i].ack_count > 0);
ponyint_messageq_init(&scheduler[i].mq);
ponyint_mpmcq_init(&scheduler[i].q);
}
Expand Down

0 comments on commit f275dba

Please sign in to comment.