Skip to content

Commit

Permalink
refactor: Add ProjectionContext in projection pusdhown opt
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 26, 2025
1 parent 1993d59 commit 9b49897
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 403 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ pub(super) fn process_functions(
proj_pd: &mut ProjectionPushDown,
input: Node,
function: FunctionIR,
mut acc_projections: Vec<ColumnNode>,
mut projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
mut ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
Expand All @@ -25,23 +23,16 @@ pub(super) fn process_functions(
swapping,
schema: _,
} => {
let clear = !acc_projections.is_empty();
let clear = !ctx.acc_projections.is_empty();
process_rename(
&mut acc_projections,
&mut projected_names,
&mut ctx.acc_projections,
&mut ctx.projected_names,
expr_arena,
existing,
new,
swapping,
)?;
proj_pd.pushdown_and_assign(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

if clear {
function.clear_cached_schema()
Expand All @@ -54,19 +45,12 @@ pub(super) fn process_functions(
columns.iter().for_each(|name| {
add_str_to_accumulated(
name.clone(),
&mut acc_projections,
&mut projected_names,
&mut ctx.acc_projections,
&mut ctx.projected_names,
expr_arena,
)
});
proj_pd.pushdown_and_assign(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
Ok(IRBuilder::new(input, expr_arena, lp_arena)
.explode(columns.clone())
.build())
Expand All @@ -78,37 +62,27 @@ pub(super) fn process_functions(
function: function.clone(),
};

process_unpivot(
proj_pd,
lp,
args,
input,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
)
process_unpivot(proj_pd, lp, args, input, ctx, lp_arena, expr_arena)
},
_ => {
if function.allow_projection_pd() && !acc_projections.is_empty() {
let original_acc_projection_len = acc_projections.len();
if function.allow_projection_pd() && !ctx.acc_projections.is_empty() {
let original_acc_projection_len = ctx.acc_projections.len();

// add columns needed for the function.
for name in function.additional_projection_pd_columns().as_ref() {
let node = expr_arena.add(AExpr::Column(name.clone()));
add_expr_to_accumulated(
node,
&mut acc_projections,
&mut projected_names,
&mut ctx.acc_projections,
&mut ctx.projected_names,
expr_arena,
)
}
let expands_schema = matches!(function, FunctionIR::Unnest { .. });

let local_projections = proj_pd.pushdown_and_assign_check_schema(
input,
acc_projections,
projections_seen,
ctx,
lp_arena,
expr_arena,
expands_schema,
Expand Down Expand Up @@ -143,13 +117,7 @@ pub(super) fn process_functions(
function: function.clone(),
};
// restart projection pushdown
proj_pd.no_pushdown_restart_opt(
lp,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
)
proj_pd.no_pushdown_restart_opt(lp, ctx, lp_arena, expr_arena)
}
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ pub(super) fn process_unpivot(
lp: IR,
args: &Arc<UnpivotArgsIR>,
input: Node,
acc_projections: Vec<ColumnNode>,
projections_seen: usize,
ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
if args.on.is_empty() {
// restart projection pushdown
proj_pd.no_pushdown_restart_opt(lp, acc_projections, projections_seen, lp_arena, expr_arena)
proj_pd.no_pushdown_restart_opt(lp, ctx, lp_arena, expr_arena)
} else {
let (mut acc_projections, mut local_projections, mut projected_names) =
split_acc_projections(
acc_projections,
ctx.acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
false,
Expand All @@ -44,15 +43,9 @@ pub(super) fn process_unpivot(
expr_arena,
)
});
let ctx = ProjectionContext::new(acc_projections, projected_names, ctx.projections_seen);

proj_pd.pushdown_and_assign(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

// re-make unpivot node so that the schema is updated
let lp = IRBuilder::new(input, expr_arena, lp_arena)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use super::*;
pub(super) fn process_generic(
proj_pd: &mut ProjectionPushDown,
lp: IR,
acc_projections: Vec<ColumnNode>,
projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
Expand All @@ -20,14 +18,7 @@ pub(super) fn process_generic(
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let mut alp = proj_pd.push_down(
alp,
acc_projections.clone(),
projected_names.clone(),
projections_seen,
lp_arena,
expr_arena,
)?;
let mut alp = proj_pd.push_down(alp, ctx.clone(), lp_arena, expr_arena)?;

// double projection can mess up the schema ordering
// here we ensure the ordering is maintained.
Expand All @@ -44,11 +35,11 @@ pub(super) fn process_generic(
// df3 => a, b
// df1 => a
// so we ensure we do the 'a' projection again before we concatenate
if !acc_projections.is_empty() && inputs.len() > 1 {
if !ctx.acc_projections.is_empty() && inputs.len() > 1 {
alp = IRBuilder::from_lp(alp, expr_arena, lp_arena)
.project_simple_nodes(acc_projections.iter().map(|e| e.0))
.project_simple_nodes(ctx.acc_projections.iter().map(|e| e.0))
.unwrap()
.build()
.build();
}
lp_arena.replace(node, alp);
Ok(node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ pub(super) fn process_group_by(
schema: SchemaRef,
maintain_order: bool,
options: Arc<GroupbyOptions>,
acc_projections: Vec<ColumnNode>,
projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
Expand All @@ -32,13 +30,13 @@ pub(super) fn process_group_by(
let input = lp_arena.add(lp);

let builder = IRBuilder::new(input, expr_arena, lp_arena);
Ok(proj_pd.finish_node_simple_projection(&acc_projections, builder))
Ok(proj_pd.finish_node_simple_projection(&ctx.acc_projections, builder))
} else {
let has_pushed_down = !acc_projections.is_empty();
let has_pushed_down = !ctx.acc_projections.is_empty();

// TODO! remove unnecessary vec alloc.
let (mut acc_projections, _local_projections, mut names) = split_acc_projections(
acc_projections,
ctx.acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
false,
Expand All @@ -48,8 +46,8 @@ pub(super) fn process_group_by(
let projected_aggs = aggs
.into_iter()
.filter(|agg| {
if has_pushed_down && projections_seen > 0 {
projected_names.contains(agg.output_name())
if has_pushed_down && ctx.projections_seen > 0 {
ctx.projected_names.contains(agg.output_name())
} else {
true
}
Expand Down Expand Up @@ -77,15 +75,9 @@ pub(super) fn process_group_by(
let node = expr_arena.add(AExpr::Column(options.index_column.clone()));
add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);
}
let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen);

proj_pd.pushdown_and_assign(
input,
acc_projections,
names,
projections_seen,
lp_arena,
expr_arena,
)?;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

let builder = IRBuilder::new(input, expr_arena, lp_arena).group_by(
keys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ pub(super) fn process_hconcat(
inputs: Vec<Node>,
schema: SchemaRef,
options: HConcatOptions,
acc_projections: Vec<ColumnNode>,
projections_seen: usize,
ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
// When applying projection pushdown to horizontal concatenation,
// we apply pushdown to all of the inputs using the subset of accumulated projections relevant to each input,
// then rebuild the concatenated schema.

let schema = if acc_projections.is_empty() {
let schema = if ctx.acc_projections.is_empty() {
schema
} else {
let mut remaining_projections: PlHashSet<_> = acc_projections.into_iter().collect();
let mut remaining_projections: PlHashSet<_> = ctx.acc_projections.into_iter().collect();

for input in inputs.iter() {
let mut input_pushdown = Vec::new();
Expand All @@ -37,14 +36,8 @@ pub(super) fn process_hconcat(
input_names.insert(name);
}
}
proj_pd.pushdown_and_assign(
*input,
input_pushdown,
input_names,
projections_seen,
lp_arena,
expr_arena,
)?;
let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.projections_seen);
proj_pd.pushdown_and_assign(*input, ctx, lp_arena, expr_arena)?;
}

let mut schemas = Vec::with_capacity(inputs.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,25 @@ pub(super) fn process_hstack(
input: Node,
mut exprs: Vec<ExprIR>,
options: ProjectionOptions,
mut acc_projections: Vec<ColumnNode>,
mut projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
mut ctx: ProjectionContext,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
if !acc_projections.is_empty() {
if !ctx.acc_projections.is_empty() {
let mut pruned_with_cols = Vec::with_capacity(exprs.len());

// Check if output names are used upstream
// if not, we can prune the `with_column` expression
// as it is not used in the output.
for e in exprs {
let is_used_upstream = projected_names.contains(e.output_name());
let is_used_upstream = ctx.projected_names.contains(e.output_name());
if is_used_upstream {
pruned_with_cols.push(e);
}
}

if pruned_with_cols.is_empty() {
proj_pd.pushdown_and_assign(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
return Ok(lp_arena.take(input));
}

Expand All @@ -42,8 +33,8 @@ pub(super) fn process_hstack(
for e in &pruned_with_cols {
add_expr_to_accumulated(
e.node(),
&mut acc_projections,
&mut projected_names,
&mut ctx.acc_projections,
&mut ctx.projected_names,
expr_arena,
);
}
Expand All @@ -60,20 +51,14 @@ pub(super) fn process_hstack(
//
// we can drop the "b" projection at this level
let (acc_projections, _, names) = split_acc_projections(
acc_projections,
ctx.acc_projections,
&lp_arena.get(input).schema(lp_arena),
expr_arena,
true, // expands_schema
);

proj_pd.pushdown_and_assign(
input,
acc_projections,
names,
projections_seen,
lp_arena,
expr_arena,
)?;
let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen);
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

let lp = IRBuilder::new(input, expr_arena, lp_arena)
.with_columns(exprs, options)
Expand Down
Loading

0 comments on commit 9b49897

Please sign in to comment.