Skip to content

Commit

Permalink
fix: pushdowns for unpivot (#3724)
Browse files Browse the repository at this point in the history
closes #3722
  • Loading branch information
universalmind303 authored Jan 27, 2025
1 parent 603199f commit fec399e
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 2 deletions.
100 changes: 98 additions & 2 deletions src/daft-logical-plan/src/optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,7 @@ impl PushDownProjection {
| LogicalPlan::Limit(..)
| LogicalPlan::Filter(..)
| LogicalPlan::Sample(..)
| LogicalPlan::Explode(..)
| LogicalPlan::Unpivot(..) => {
| LogicalPlan::Explode(..) => {
// Get required columns from projection and upstream.
let combined_dependencies = plan
.required_columns()
Expand All @@ -360,6 +359,7 @@ impl PushDownProjection {
// Skip optimization if no columns would be pruned.
let grand_upstream_plan = &upstream_plan.arc_children()[0];
let grand_upstream_columns = grand_upstream_plan.schema().names();

if grand_upstream_columns.len() == combined_dependencies.len() {
return Ok(Transformed::no(plan));
}
Expand All @@ -381,6 +381,43 @@ impl PushDownProjection {
.or(Transformed::yes(new_plan));
Ok(new_plan)
}
LogicalPlan::Unpivot(unpivot) => {
let combined_dependencies = plan
.required_columns()
.iter()
.flatten()
.chain(upstream_plan.required_columns().iter().flatten())
.cloned()
.collect::<IndexSet<_>>();

let grand_upstream_plan = &upstream_plan.arc_children()[0];
let grand_upstream_columns = grand_upstream_plan.schema().names();
let input_columns = unpivot
.ids
.iter()
.chain(unpivot.values.iter())
.map(|e| e.name().to_string())
.collect::<IndexSet<_>>();

let can_be_pushed_down = input_columns
.intersection(&combined_dependencies)
.map(|e| col(e.as_str()))
.collect::<Vec<_>>();

if grand_upstream_columns.len() == can_be_pushed_down.len() {
return Ok(Transformed::no(plan));
}

let new_subprojection: LogicalPlan =
Project::try_new(grand_upstream_plan.clone(), can_be_pushed_down)?.into();
let new_upstream = upstream_plan.with_new_children(&[new_subprojection.into()]);
let new_plan = Arc::new(plan.with_new_children(&[new_upstream.into()]));
// Retry optimization now that the upstream node is different.
let new_plan = self
.try_optimize_node(new_plan.clone())?
.or(Transformed::yes(new_plan));
Ok(new_plan)
}
LogicalPlan::Concat(concat) => {
// Get required columns from projection and upstream.
let combined_dependencies = plan
Expand Down Expand Up @@ -680,6 +717,7 @@ mod tests {
};

use crate::{
ops::{Project, Unpivot},
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownProjection,
Expand Down Expand Up @@ -1074,4 +1112,62 @@ mod tests {
assert_optimized_plan_eq(project, expected_scan)?;
Ok(())
}

#[test]
fn test_projection_pushdown_with_unpivot() {
let scan_op = dummy_scan_operator(vec![
Field::new("year", DataType::Int64),
Field::new("id", DataType::Int64),
Field::new("Jan", DataType::Int64),
Field::new("Feb", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op.clone()).build();

let plan = LogicalPlan::Unpivot(
Unpivot::try_new(
scan_node.clone(),
vec![col("year")],
vec![col("Jan"), col("Feb")],
"month".to_string(),
"inventory".to_string(),
)
.unwrap(),
);

let plan = LogicalPlan::Project(
Project::try_new(plan.into(), vec![col("inventory").alias("year2")]).unwrap(),
)
.into();
let expected_scan = dummy_scan_node_with_pushdowns(
scan_op.clone(),
Pushdowns {
limit: None,
partition_filters: None,
columns: Some(Arc::new(vec![
"year".to_string(),
"Jan".to_string(),
"Feb".to_string(),
])),
filters: None,
},
)
.build();

let expected = LogicalPlan::Unpivot(
Unpivot::try_new(
expected_scan,
vec![col("year")],
vec![col("Jan"), col("Feb")],
"month".to_string(),
"inventory".to_string(),
)
.unwrap(),
);

let expected = LogicalPlan::Project(
Project::try_new(expected.into(), vec![col("inventory").alias("year2")]).unwrap(),
)
.into();
assert_optimized_plan_eq(plan, expected).unwrap();
}
}
25 changes: 25 additions & 0 deletions tests/dataframe/test_unpivot.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ def test_unpivot_expr(make_df, n_partitions, with_morsel_size):
assert df.to_pydict() == expected


@pytest.mark.parametrize("n_partitions", [1, 2, 4])
def test_unpivot_followed_by_projection(make_df, n_partitions, with_morsel_size):
df = make_df(
{
"year": [2020, 2021, 2022],
"id": [1, 2, 3],
"Jan": [10, 30, 50],
"Feb": [20, 40, 60],
},
repartition=n_partitions,
)

df = df.unpivot("year", ["Jan", "Feb"], variable_name="month", value_name="inventory")
df = df.with_column("inventory2", df["inventory"])
df = df.sort(["year", "month", "inventory"])

expected = {
"year": [2020, 2020, 2021, 2021, 2022, 2022],
"month": ["Feb", "Jan", "Feb", "Jan", "Feb", "Jan"],
"inventory": [20, 10, 40, 30, 60, 50],
"inventory2": [20, 10, 40, 30, 60, 50],
}
assert df.to_pydict() == expected


def test_unpivot_empty(make_df):
df = make_df(
{
Expand Down

0 comments on commit fec399e

Please sign in to comment.