Skip to content

Commit

Permalink
Re-order execution plan to enable more operations to run in-place
Browse files Browse the repository at this point in the history
After creating the initial execution plan by traversing the graph backwards from
the requested outputs, re-order it by traversing the plan forwards and choosing
a preferred operator to run from the frontier of runnable operators at each
step. The initial logic for picking the next operator is to choose the first
non-in-place operator if there is one (eg. Shape) or the first operator
otherwise.

This re-ordering increases the likelihood that an operator which can run
in-place is able to do so, because its in-place input is less likely to be
required by a non in-place op that runs later.

Fixes #98
See also #399 (comment).
  • Loading branch information
robertknight committed Nov 12, 2024
1 parent 19d2147 commit a5a019f
Showing 1 changed file with 95 additions and 3 deletions.
98 changes: 95 additions & 3 deletions src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,9 +1529,89 @@ impl Graph {
Ok(())
}

/// Return a sequential plan to generate `outputs`. The plan is
/// a vec of `(op_node_id, operator)` tuples.
/// Take the current execution plan and re-order it for more
/// efficient execution.
fn sort_plan(self, mut resolved_values: FxHashSet<NodeId>) -> Vec<NodeId> {
// Build map of value node to operators that depend on the value.
let mut dependent_ops: FxHashMap<NodeId, Vec<(NodeId, &OperatorNode)>> =
FxHashMap::default();
for (op_node_id, op_node) in &self.plan {
for input_id in self.graph.operator_dependencies(op_node) {
if let Some(deps) = dependent_ops.get_mut(&input_id) {
deps.push((*op_node_id, op_node));
} else {
dependent_ops.insert(input_id, [(*op_node_id, *op_node)].into());
}
}
}

let mut output_plan = Vec::with_capacity(self.plan.len());

// Initialize frontier with all operators that can be executed
// from initially-available values.
let mut frontier: Vec<(NodeId, &OperatorNode)> = Vec::new();
for (op_node_id, op_node) in &self.plan {
if self
.graph
.operator_dependencies(op_node)
.all(|id| resolved_values.contains(&id))
{
frontier.push((*op_node_id, op_node));
}
}

debug_assert!(!frontier.is_empty(), "initial frontier is empty");

// Loop while we still have operators to compute.
while !frontier.is_empty() {
// Choose an operator to execute next and add it to the plan.
//
// We run non-in-place operators first, so that operators
// which can run in-place are more likely to have their
// inputs available for in-place execution.
let op_pos = frontier
.iter()
.position(|(_id, op)| !op.operator().can_run_in_place())
.unwrap_or(0);
let (next_op_id, op_node) = frontier.remove(op_pos);
output_plan.push(next_op_id);

// Mark the operator's outputs as computed.
resolved_values.extend(op_node.output_ids().iter().filter_map(|id| *id));

// Visit operators that depend on current op outputs. Add
// to frontier set if all dependencies have been resolved.
for output_id in op_node.output_ids() {
let Some(output_id) = output_id else {
continue;
};
let Some(deps) = dependent_ops.get(output_id) else {
continue;
};
for (candidate_op_id, candidate_op) in deps {
if frontier.iter().any(|(op_id, _)| op_id == candidate_op_id) {
continue;
}

if self
.graph
.operator_dependencies(candidate_op)
.all(|id| resolved_values.contains(&id))
{
frontier.push((*candidate_op_id, candidate_op));
}
}
}
}

output_plan
}

/// Return a sequential plan to generate `outputs`.
fn plan(mut self, outputs: &[NodeId]) -> Result<Vec<NodeId>, RunError> {
let initial_resolved_values = self.resolved_values.clone();

// Build initial plan by traversing graph backwards from outputs.
for output_id in outputs.iter() {
if self.resolved_values.contains(output_id) {
// Value is either a constant node or is produced by
Expand All @@ -1546,7 +1626,19 @@ impl Graph {
return Err(RunError::PlanningError(msg));
}
}
Ok(self.plan.into_iter().map(|(node_id, _)| node_id).collect())

// When doing partial evaluation, just return the initial plan.
// This avoids having to handle missing inputs when sorting the
// plan.
if self.options.allow_missing_inputs || self.plan.is_empty() {
return Ok(self.plan.into_iter().map(|(op_id, _)| op_id).collect());
}

// Re-order initial plan to get a more efficient execution
// order.
let sorted_plan = self.sort_plan(initial_resolved_values);

Ok(sorted_plan)
}
}

Expand Down

0 comments on commit a5a019f

Please sign in to comment.