From d2cffa8b424b58111d40cf16361d919ac5fff4d0 Mon Sep 17 00:00:00 2001 From: NiwakaDev Date: Sat, 18 Jan 2025 23:33:43 +0900 Subject: [PATCH] fix: wip --- Cargo.lock | 22 ++--- Cargo.toml | 2 +- src/common/function/src/scalars/math.rs | 32 ++++++++ src/log-query/src/log_query.rs | 2 +- src/query/src/lib.rs | 1 + src/query/src/planner.rs | 1 + src/query/src/query_engine/state.rs | 7 +- src/query/src/within_filter.rs | 96 ++++++++++++++++++++++ tests/cases/standalone/common/basic.result | 10 +++ tests/cases/standalone/common/basic.sql | 2 + 10 files changed, 161 insertions(+), 14 deletions(-) create mode 100644 src/query/src/within_filter.rs diff --git a/Cargo.lock b/Cargo.lock index b11584039d2b..4be493275481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2202,7 +2202,7 @@ dependencies = [ "futures-util", "serde", "snafu 0.8.5", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "sqlparser_derive 0.1.1", "statrs", "store-api", @@ -3191,7 +3191,7 @@ dependencies = [ "serde", "serde_json", "snafu 0.8.5", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "sqlparser_derive 0.1.1", ] @@ -4086,7 +4086,7 @@ dependencies = [ "session", "snafu 0.8.5", "sql", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "store-api", "strfmt", "table", @@ -7560,7 +7560,7 @@ dependencies = [ "session", "snafu 0.8.5", "sql", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "store-api", "substrait 0.12.0", "table", @@ -7812,7 +7812,7 @@ dependencies = [ "serde_json", "snafu 0.8.5", "sql", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "store-api", "table", ] @@ -8839,7 +8839,7 @@ dependencies = [ "session", "snafu 0.8.5", "sql", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "statrs", "store-api", "substrait 0.12.0", @@ -10623,7 +10623,7 @@ dependencies = [ "serde", "serde_json", "snafu 0.8.5", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "sqlparser_derive 0.1.1", "store-api", "table", @@ -10689,14 +10689,14 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.45.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7#54a267ac89c09b11c0c88934690530807185d3e7" +source = "git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39#b3584823b236db3c244b70945008f0aae65c5e39" dependencies = [ "lazy_static", "log", "regex", "serde", "sqlparser 0.45.0 (registry+https://github.com/rust-lang/crates.io-index)", - "sqlparser_derive 0.2.2 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser_derive 0.2.2 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", ] [[package]] @@ -10724,7 +10724,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.2.2" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7#54a267ac89c09b11c0c88934690530807185d3e7" +source = "git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39#b3584823b236db3c244b70945008f0aae65c5e39" dependencies = [ "proc-macro2", "quote", @@ -11547,7 +11547,7 @@ dependencies = [ "serde_yaml", "snafu 0.8.5", "sql", - "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", + "sqlparser 0.45.0 (git+https://github.com/NiwakaDev/sqlparser-rs?rev=b3584823b236db3c244b70945008f0aae65c5e39)", "sqlx", "store-api", "strum 0.25.0", diff --git a/Cargo.toml b/Cargo.toml index 461ba7b8db8d..fc9ee8da6891 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,7 +182,7 @@ snafu = "0.8" sysinfo = "0.30" rustls = { version = "0.23.20", default-features = false } # override by patch, see [patch.crates-io] -sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [ +sqlparser = { git = "https://github.com/NiwakaDev/sqlparser-rs", rev = "b3584823b236db3c244b70945008f0aae65c5e39", features = [ "visitor", "serde", ] } # on branch v0.44.x diff --git a/src/common/function/src/scalars/math.rs b/src/common/function/src/scalars/math.rs index 6635e70b171f..96c24f12658c 100644 --- a/src/common/function/src/scalars/math.rs +++ b/src/common/function/src/scalars/math.rs @@ -44,6 +44,7 @@ impl MathFunction { registry.register(Arc::new(RateFunction)); registry.register(Arc::new(RangeFunction)); registry.register(Arc::new(ClampFunction)); + registry.register(Arc::new(WithinFilterFunction)); } } @@ -87,3 +88,34 @@ impl Function for RangeFunction { .context(GeneralDataFusionSnafu) } } + +#[derive(Clone, Debug, Default)] +struct WithinFilterFunction; + +impl fmt::Display for WithinFilterFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "WithinFilterFunction") + } +} + +impl Function for WithinFilterFunction { + fn name(&self) -> &str { + "within_filter" + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::boolean_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform( + 2, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result { + Err(DataFusionError::Internal("todo".into())).context(GeneralDataFusionSnafu) + } +} diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 988c9c27a9b4..86b2a2db5b87 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -352,7 +352,7 @@ mod tests { fn test_canonicalize() { // with 'start' only let mut tf = TimeFilter { - start: Some("2023-10-01".to_string()), + start: Some("2023".to_string()), end: None, span: None, }; diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 6e1fbfae0af8..d69b1baf5dd7 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -41,6 +41,7 @@ pub mod region_query; pub mod sql; pub mod stats; pub(crate) mod window_sort; +mod within_filter; #[cfg(test)] pub(crate) mod test_util; diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 20377c67c034..007e725d6492 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -105,6 +105,7 @@ impl DfLogicalPlanner { let result = sql_to_rel .statement_to_plan(df_stmt) .context(PlanSqlSnafu)?; + let plan = RangePlanRewriter::new(table_provider, query_ctx.clone()) .rewrite(result) .await?; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 74db773031d0..dbf4a7fd6236 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -57,6 +57,7 @@ use crate::query_engine::options::QueryOptions; use crate::query_engine::DefaultSerializer; use crate::range_select::planner::RangeSelectPlanner; use crate::region_query::RegionQueryHandlerRef; +use crate::within_filter::WithinFilterRule; use crate::QueryEngineContext; /// Query engine global state @@ -95,10 +96,14 @@ impl QueryEngineState { let runtime_env = Arc::new(RuntimeEnv::default()); let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false); // Apply extension rules - let mut extension_rules = Vec::new(); + // TODO: remove Vec> + let mut extension_rules: Vec< + Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>, + > = Vec::new(); // The [`TypeConversionRule`] must be at first extension_rules.insert(0, Arc::new(TypeConversionRule) as _); + extension_rules.push(Arc::new(WithinFilterRule)); // Apply the datafusion rules let mut analyzer = Analyzer::new(); diff --git a/src/query/src/within_filter.rs b/src/query/src/within_filter.rs new file mode 100644 index 000000000000..43a045506079 --- /dev/null +++ b/src/query/src/within_filter.rs @@ -0,0 +1,96 @@ +use chrono::NaiveDate; +use common_time::timestamp::{TimeUnit, Timestamp}; +use datafusion::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::{BinaryExpr, Expr, Filter, LogicalPlan, Operator}; + +use crate::optimizer::ExtensionAnalyzerRule; +use crate::QueryEngineContext; + +pub struct WithinFilterRule; + +impl ExtensionAnalyzerRule for WithinFilterRule { + fn analyze( + &self, + plan: LogicalPlan, + _ctx: &QueryEngineContext, + _config: &ConfigOptions, + ) -> Result { + plan.transform(|plan| match plan.clone() { + LogicalPlan::Filter(filter) => { + if let Expr::ScalarFunction(func) = &filter.predicate + && func.func.name() == "within_filter" + { + let column_name = func.args[0].clone(); + let time_arg = func.args[1].clone(); + if let Expr::Literal(literal) = time_arg + && let ScalarValue::Utf8(Some(s)) = literal + { + if let Ok(year) = s.parse::() { + let timestamp = NaiveDate::from_ymd_opt(year, 1, 1).unwrap(); + let timestamp = Timestamp::from_chrono_date(timestamp).unwrap(); + let value = Some(timestamp.value()); + let timestamp = match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(value, None), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(value, None) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(value, None) + } + TimeUnit::Nanosecond => { + ScalarValue::TimestampNanosecond(value, None) + } + }; + let next_timestamp = NaiveDate::from_ymd_opt(year + 1, 1, 1).unwrap(); + let next_timestamp = + Timestamp::from_chrono_date(next_timestamp).unwrap(); + let value = Some(next_timestamp.value()); + let next_timestamp = match next_timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(value, None), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(value, None) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(value, None) + } + TimeUnit::Nanosecond => { + ScalarValue::TimestampNanosecond(value, None) + } + }; + let left = Expr::BinaryExpr(BinaryExpr { + left: Box::new(column_name.clone()), + op: Operator::GtEq, + right: Box::new(Expr::Literal(timestamp)), + }); + let right = Expr::BinaryExpr(BinaryExpr { + left: Box::new(column_name), + op: Operator::Lt, + right: Box::new(Expr::Literal(next_timestamp)), + }); + let new_expr = Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + Operator::And, + Box::new(right), + )); + let new_plan = + LogicalPlan::Filter(Filter::try_new(new_expr, filter.input)?); + Ok(Transformed::yes(new_plan)) + } else { + Err(DataFusionError::NotImplemented( + "add more formats".to_string(), + )) + } + } else { + todo!(); + } + } else { + Ok(Transformed::no(plan)) + } + } + _ => Ok(Transformed::no(plan)), + }) + .map(|t| t.data) + } +} diff --git a/tests/cases/standalone/common/basic.result b/tests/cases/standalone/common/basic.result index a7a1dfb5c015..ae74bf2af5d5 100644 --- a/tests/cases/standalone/common/basic.result +++ b/tests/cases/standalone/common/basic.result @@ -29,6 +29,16 @@ SELECT * FROM system_metrics; | host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 | +-------+-------+----------+-------------+-----------+-------------------------+ +SELECT * FROM system_metrics where ts within '2022-11-03'; + ++-------+-------+----------+-------------+-----------+-------------------------+ +| host | idc | cpu_util | memory_util | disk_util | ts | ++-------+-------+----------+-------------+-----------+-------------------------+ +| host1 | idc_a | 11.8 | 10.3 | 10.3 | 2022-11-03T03:39:57.450 | +| host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 | +| host1 | idc_b | 50.0 | 66.7 | 40.6 | 2022-11-03T03:39:57.450 | ++-------+-------+----------+-------------+-----------+-------------------------+ + SELECT count(*) FROM system_metrics; +----------+ diff --git a/tests/cases/standalone/common/basic.sql b/tests/cases/standalone/common/basic.sql index 13a7d5a1c4c0..ff722cc187d3 100644 --- a/tests/cases/standalone/common/basic.sql +++ b/tests/cases/standalone/common/basic.sql @@ -17,6 +17,8 @@ VALUES SELECT * FROM system_metrics; +SELECT * FROM system_metrics where ts within '2022-11-03'; + SELECT count(*) FROM system_metrics; SELECT avg(cpu_util) FROM system_metrics;