Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threaded custom pipelines #1003

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
401 changes: 158 additions & 243 deletions flecs.c

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions flecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -10550,9 +10550,6 @@ void ecs_reset_clock(
* default pipeline (either the builtin pipeline or the pipeline set with
* set_pipeline()). An application may run additional pipelines.
*
* Note: calling this function from an application currently only works in
* single threaded applications with a single stage.
*
* @param world The world.
* @param pipeline The pipeline to run.
*/
Expand Down
3 changes: 0 additions & 3 deletions include/flecs/addons/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ void ecs_reset_clock(
* default pipeline (either the builtin pipeline or the pipeline set with
* set_pipeline()). An application may run additional pipelines.
*
* Note: calling this function from an application currently only works in
* single threaded applications with a single stage.
*
* @param world The world.
* @param pipeline The pipeline to run.
*/
Expand Down
181 changes: 112 additions & 69 deletions src/addons/pipeline/pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,75 @@ void ecs_run_pipeline(
pipeline = world->pipeline;
}

EcsPipeline *pq = (EcsPipeline*)ecs_get(world, pipeline, EcsPipeline);
flecs_pipeline_update(world, pq->state, true);
flecs_run_pipeline((ecs_world_t*)flecs_stage_from_world(&world),
pq->state, delta_time);
/* create any worker task threads request */
if (ecs_using_task_threads(world))
{
flecs_create_worker_threads(world);
}

EcsPipeline *p = (EcsPipeline*)ecs_get(world, pipeline, EcsPipeline);
flecs_workers_progress(world, p->state, delta_time);

if (ecs_using_task_threads(world))
{
/* task threads were temporary and may now be joined */
flecs_join_worker_threads(world);
}
}

int32_t flecs_run_pipeline_ops(
ecs_world_t* world,
ecs_stage_t* stage,
int32_t stage_index,
int32_t stage_count,
ecs_ftime_t delta_time,
bool main_thread)
{
ecs_pipeline_state_t* pq = world->pq;
ecs_pipeline_op_t* op = pq->cur_op;
int32_t i = pq->cur_i;

int32_t count = ecs_vec_count(&pq->systems);
ecs_entity_t* systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;

for (; i < count; i++) {
/* Run system if:
* - this is the main thread, or if
* - the system is multithreaded
*/
if (main_thread || op->multi_threaded) {
ecs_entity_t system = systems[i];
const EcsPoly* poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
ecs_assert(poly != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_system_t* sys = ecs_poly(poly->poly, ecs_system_t);

/* Keep track of the last frame for which the system has ran, so we
* know from where to resume the schedule in case the schedule
* changes during a merge. */
sys->last_frame = world->info.frame_count_total + 1;

ecs_stage_t* s = NULL;
if (!op->no_readonly) {
/* If system is no_readonly it operates on the actual world, not
* the stage. Only pass stage to system if it's readonly. */
s = stage;
}

ecs_run_intern(world, s, system, sys, stage_index,
stage_count, delta_time, 0, 0, NULL);
}

world->info.systems_ran_frame++;
ran_since_merge++;

if (ran_since_merge == op->count) {
/* Merge */
break;
}
}

return i;
}

void flecs_run_pipeline(
Expand All @@ -528,82 +593,67 @@ void flecs_run_pipeline(
int32_t stage_index = ecs_get_stage_id(stage->thread_ctx);
int32_t stage_count = ecs_get_stage_count(world);

if (!flecs_worker_begin(world, stage, pq, true)) {
return;
}
ecs_assert(!stage_index, ECS_INVALID_OPERATION, NULL);

ecs_time_t st = {0};
bool main_thread = !stage_index;
bool measure_time = main_thread && (world->flags & EcsWorldMeasureSystemTime);
ecs_pipeline_op_t *op = ecs_vec_first_t(&pq->ops, ecs_pipeline_op_t);
int32_t i = 0;
bool multi_threaded = ecs_get_stage_count(world) > 1;;

do {
int32_t count = ecs_vec_count(&pq->systems);
ecs_entity_t *systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;
// Update the pipeline the workers will execute
world->pq = pq;

if (i == count) {
break;
}
// Update the pipeline before waking the workers.
flecs_pipeline_update(world, pq, true);

if (measure_time) {
ecs_time_measure(&st);
// If there are no operations to execute in the pipeline bail early,
// no need to wake the workers since they have nothing to do.
while (pq->cur_op != NULL) {
if (pq->cur_i == ecs_vec_count(&pq->systems)) {
flecs_pipeline_update(world, pq, false);
continue;
}

for (; i < count; i ++) {
/* Run system if:
* - this is the main thread, or if
* - the system is multithreaded
*/
if (main_thread || op->multi_threaded) {
ecs_entity_t system = systems[i];
const EcsPoly *poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
ecs_assert(poly != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_system_t *sys = ecs_poly(poly->poly, ecs_system_t);

/* Keep track of the last frame for which the system has ran, so we
* know from where to resume the schedule in case the schedule
* changes during a merge. */
sys->last_frame = world->info.frame_count_total + 1;

ecs_stage_t *s = NULL;
if (!op->no_readonly) {
/* If system is no_readonly it operates on the actual world, not
* the stage. Only pass stage to system if it's readonly. */
s = stage;
}
bool no_readonly = pq->cur_op->no_readonly;
bool op_multi_threaded = multi_threaded && pq->cur_op->multi_threaded;

ecs_run_intern(world, s, system, sys, stage_index,
stage_count, delta_time, 0, 0, NULL);
}
pq->no_readonly = no_readonly;

world->info.systems_ran_frame ++;
ran_since_merge ++;
if (!no_readonly) {
ecs_readonly_begin(world);
}

if (ran_since_merge == op->count) {
/* Merge */
break;
}
ECS_BIT_COND(world->flags, EcsWorldMultiThreaded, op_multi_threaded);
ecs_assert(world->workers_waiting == 0, ECS_INTERNAL_ERROR, NULL);

if (op_multi_threaded) {
flecs_signal_workers(world);
}

ecs_time_t st = { 0 };
bool measure_time = world->flags & EcsWorldMeasureSystemTime;
if (measure_time) {
ecs_time_measure(&st);
}

const int32_t i = flecs_run_pipeline_ops(world, stage, stage_index, stage_count, delta_time, true);

if (measure_time) {
/* Don't include merge time in system time */
world->info.system_time_total +=
(ecs_ftime_t)ecs_time_measure(&st);
world->info.system_time_total += (ecs_ftime_t)ecs_time_measure(&st);
}

/* Synchronize workers, rebuild pipeline if necessary. Pass current op
* and system index to function, so we know where to resume from. */
} while (flecs_worker_sync(world, stage, pq, &op, &i));
if (op_multi_threaded) {
flecs_wait_for_sync(world);
}

if (measure_time) {
world->info.system_time_total += (ecs_ftime_t)ecs_time_measure(&st);
}
if (!no_readonly) {
ecs_readonly_end(world);
}

flecs_worker_end(world, stage);
/* Store the current state of the schedule after we synchronized the
* threads, to avoid race conditions. */
pq->cur_i = i;

return;
flecs_pipeline_update(world, pq, false);
}
}

static
Expand Down Expand Up @@ -721,14 +771,7 @@ void ecs_set_pipeline(
ecs_check( ecs_get(world, pipeline, EcsPipeline) != NULL,
ECS_INVALID_PARAMETER, "not a pipeline");

int32_t thread_count = ecs_get_stage_count(world);
if (thread_count > 1) {
ecs_set_threads(world, 1);
}
world->pipeline = pipeline;
if (thread_count > 1) {
ecs_set_threads(world, thread_count);
}
error:
return;
}
Expand Down
35 changes: 16 additions & 19 deletions src/addons/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ typedef struct ecs_pipeline_op_t {
bool no_readonly; /* Whether systems are staged or not */
} ecs_pipeline_op_t;

typedef struct ecs_pipeline_state_t {
struct ecs_pipeline_state_t {
ecs_query_t *query; /* Pipeline query */
ecs_vec_t ops; /* Pipeline schedule */
ecs_vec_t systems; /* Vector with system ids */
Expand All @@ -35,7 +35,7 @@ typedef struct ecs_pipeline_state_t {
int32_t cur_i; /* Index in current result */
int32_t ran_since_merge; /* Index in current op */
bool no_readonly; /* Is pipeline in readonly mode */
} ecs_pipeline_state_t;
};

typedef struct EcsPipeline {
/* Stable ptr so threads can safely access while entity/components move */
Expand All @@ -56,27 +56,18 @@ void flecs_run_pipeline(
ecs_pipeline_state_t *pq,
ecs_ftime_t delta_time);

int32_t flecs_run_pipeline_ops(
ecs_world_t* world,
ecs_stage_t* stage,
int32_t stage_index,
int32_t stage_count,
ecs_ftime_t delta_time,
bool main_thread);

////////////////////////////////////////////////////////////////////////////////
//// Worker API
////////////////////////////////////////////////////////////////////////////////

bool flecs_worker_begin(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq,
bool start_of_frame);

void flecs_worker_end(
ecs_world_t *world,
ecs_stage_t *stage);

bool flecs_worker_sync(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq,
ecs_pipeline_op_t **cur_op,
int32_t *cur_i);

void flecs_workers_progress(
ecs_world_t *world,
ecs_pipeline_state_t *pq,
Expand All @@ -88,4 +79,10 @@ void flecs_create_worker_threads(
bool flecs_join_worker_threads(
ecs_world_t *world);

void flecs_signal_workers(
ecs_world_t *world);

void flecs_wait_for_sync(
ecs_world_t *world);

#endif
Loading