Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed May 12, 2024
1 parent dae3061 commit a44c967
Show file tree
Hide file tree
Showing 19 changed files with 625 additions and 328 deletions.
5 changes: 1 addition & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,10 +853,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(
vec![],
vec![count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
physical_input_schema,
name,
ignore_nulls,
*distinct,
)?;
(agg_expr, filter, physical_sort_exprs)
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
_name: &str,
_value_type: DataType,
_ordering_fields: Vec<Field>,
_is_distinct: bool,
) -> Result<Vec<Field>> {
Ok(self.state_fields.clone())
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct AccumulatorArgs<'a> {
///
/// If no `ORDER BY` is specified, `sort_exprs`` will be empty.
pub sort_exprs: &'a [Expr],
pub is_distinct: bool,
}

impl<'a> AccumulatorArgs<'a> {
Expand All @@ -74,12 +75,14 @@ impl<'a> AccumulatorArgs<'a> {
schema: &'a Schema,
ignore_nulls: bool,
sort_exprs: &'a [Expr],
is_distinct: bool,
) -> Self {
Self {
data_type,
schema,
ignore_nulls,
sort_exprs,
is_distinct
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ impl AggregateUDF {
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
is_distinct: bool,
) -> Result<Vec<Field>> {
self.inner.state_fields(name, value_type, ordering_fields)
self.inner.state_fields(name, value_type, ordering_fields, is_distinct)
}

/// See [`AggregateUDFImpl::groups_accumulator_supported`] for more details.
pub fn groups_accumulator_supported(&self, args_num: usize) -> bool {
self.inner.groups_accumulator_supported(args_num)
pub fn groups_accumulator_supported(&self, args_num: usize, is_distinct: bool) -> bool {
self.inner.groups_accumulator_supported(args_num, is_distinct)
}

/// See [`AggregateUDFImpl::create_groups_accumulator`] for more details.
Expand All @@ -199,7 +200,7 @@ impl AggregateUDF {
pub fn coerce_types(&self, _args: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("coerce_types not implemented for {:?} yet", self.name())
}

/// See [`AggregateUDFImpl::reverse_expr`] for more details.
pub fn reverse_expr(&self) -> ReversedUDAF {
self.inner.reverse_expr()
Expand Down Expand Up @@ -323,6 +324,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
is_distinct: bool,
) -> Result<Vec<Field>> {
let value_fields = vec![Field::new(
format_state_name(name, "value"),
Expand All @@ -343,7 +345,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args_num: usize) -> bool {
fn groups_accumulator_supported(&self, _args_num: usize, is_distinct: bool) -> bool {
false
}

Expand Down
Loading

0 comments on commit a44c967

Please sign in to comment.