From 9d904de10d959af4fcfda9ac9591155cb93132ec Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 28 Aug 2024 10:45:53 +0800 Subject: [PATCH] feat(query): add plan cache (#16333) * feat(query): add plan cache * feat(query): add plan cache * feat(query): add plan cache * feat(query): add plan cache * feat(query): add plan cache for query only * feat(query): add plan cache for query only * feat(query): disable temp table in plan cache * update * update * update * update * update * update * fix tests * update * update --- Cargo.lock | 2 + Cargo.toml | 1 + src/meta/app/Cargo.toml | 2 +- src/query/catalog/Cargo.toml | 2 +- src/query/catalog/src/table.rs | 4 + src/query/catalog/src/table_context.rs | 1 + src/query/functions/Cargo.toml | 2 +- src/query/service/Cargo.toml | 2 +- .../src/catalogs/default/session_catalog.rs | 2 +- .../src/interpreters/interpreter_explain.rs | 5 +- .../src/interpreters/interpreter_select.rs | 5 +- src/query/service/src/sessions/query_ctx.rs | 4 + .../service/src/sessions/query_ctx_shared.rs | 2 +- .../tests/it/sql/exec/get_table_bind_test.rs | 12 +- .../it/storages/fuse/operations/commit.rs | 4 + src/query/settings/src/settings.rs | 4 +- src/query/settings/src/settings_default.rs | 6 + .../settings/src/settings_getter_setter.rs | 4 + src/query/sql/Cargo.toml | 2 + .../binder/bind_table_reference/bind_table.rs | 4 +- src/query/sql/src/planner/binder/binder.rs | 8 +- .../src/planner/binder/copy_into_location.rs | 4 +- src/query/sql/src/planner/binder/table.rs | 2 +- src/query/sql/src/planner/mod.rs | 1 + src/query/sql/src/planner/planner.rs | 47 +++- src/query/sql/src/planner/planner_cache.rs | 226 ++++++++++++++++++ src/query/storages/common/cache/src/caches.rs | 7 + src/query/storages/fuse/Cargo.toml | 2 +- src/query/storages/result_cache/Cargo.toml | 2 +- .../storages/system/src/streams_table.rs | 2 +- src/query/storages/system/src/tables_table.rs | 2 +- .../20+_others/20_0020_planner_cache.test | 81 +++++++ .../suites/base/issues/issue_10103.test | 8 +- 33 files changed, 429 insertions(+), 33 deletions(-) create mode 100644 src/query/sql/src/planner/planner_cache.rs create mode 100644 tests/sqllogictests/suites/base/20+_others/20_0020_planner_cache.test diff --git a/Cargo.lock b/Cargo.lock index fc0f88bb5036..026dabdba3f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4140,6 +4140,7 @@ dependencies = [ "databend-common-storages-view", "databend-common-users", "databend-enterprise-data-mask-feature", + "databend-storages-common-cache", "databend-storages-common-table-meta", "derive-visitor", "educe", @@ -4162,6 +4163,7 @@ dependencies = [ "regex", "roaring", "serde", + "sha2", "simsearch", "time", "url", diff --git a/Cargo.toml b/Cargo.toml index 148f9487a006..1b222ecd2953 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -285,6 +285,7 @@ ordq = "0.2.0" parking_lot = "0.12.1" parquet = { version = "52", features = ["async"] } paste = "1.0.15" +sha2 = "0.10.8" # TODO: let's use native tls instead. iceberg = { version = "0.3.0" } iceberg-catalog-hms = { version = "0.3.0" } diff --git a/src/meta/app/Cargo.toml b/src/meta/app/Cargo.toml index ac8f89241c83..c23211545cc4 100644 --- a/src/meta/app/Cargo.toml +++ b/src/meta/app/Cargo.toml @@ -37,7 +37,7 @@ prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha1 = "0.10.5" -sha2 = "0.10.6" +sha2 = { workspace = true } thiserror = { workspace = true } [dev-dependencies] diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index 37234b32145b..922568ad019f 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -40,7 +40,7 @@ parquet = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -sha2 = "0.10.6" +sha2 = { workspace = true } thrift = "0.17.0" typetag = { workspace = true } xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] } diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 8fac6608a2b4..d76a4febd82c 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -438,6 +438,10 @@ pub trait Table: Sync + Send { is_temp } + fn is_stream(&self) -> bool { + self.engine() == "STREAM" + } + fn use_own_sample_block(&self) -> bool { false } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 420ab175b1cf..591188b85e83 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -336,6 +336,7 @@ pub trait TableContext: Send + Sync { fn set_variable(&self, key: String, value: Scalar); fn unset_variable(&self, key: &str); fn get_variable(&self, key: &str) -> Option; + fn get_all_variables(&self) -> HashMap; async fn load_datalake_schema( &self, diff --git a/src/query/functions/Cargo.toml b/src/query/functions/Cargo.toml index 755e4f05c07b..0e496bf1d0de 100644 --- a/src/query/functions/Cargo.toml +++ b/src/query/functions/Cargo.toml @@ -62,7 +62,7 @@ roaring = "0.10.1" serde = { workspace = true } serde_json = { workspace = true } sha1 = "0.10.5" -sha2 = "0.10.6" +sha2 = { workspace = true } simdutf8 = "0.1.4" simple_hll = { version = "0.0.1", features = ["serde_borsh"] } siphasher = "0.3" diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 239298604341..8d2ce1be5066 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -162,7 +162,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_stacker = { workspace = true } serde_urlencoded = "0.7.1" -sha2 = "0.10.8" +sha2 = { workspace = true } socket2 = "0.5.3" strength_reduce = "0.2.4" sysinfo = "0.30" diff --git a/src/query/service/src/catalogs/default/session_catalog.rs b/src/query/service/src/catalogs/default/session_catalog.rs index 9a72619659b1..87b93764cfed 100644 --- a/src/query/service/src/catalogs/default/session_catalog.rs +++ b/src/query/service/src/catalogs/default/session_catalog.rs @@ -357,7 +357,7 @@ impl Catalog for SessionCatalog { return self.get_table_by_info(&table); } let table = self.inner.get_table(tenant, db_name, table_name).await?; - if table.engine() == "STREAM" && is_active { + if table.is_stream() && is_active { self.txn_mgr .lock() .upsert_table_desc_to_id(table.get_table_info().clone()); diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 8f4a03ff2525..84d38b1f4c87 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -273,7 +273,10 @@ impl ExplainInterpreter { metadata: &MetadataRef, formatted_ast: &Option, ) -> Result> { - if self.ctx.get_settings().get_enable_query_result_cache()? && self.ctx.get_cacheable() { + if self.ctx.get_settings().get_enable_query_result_cache()? + && self.ctx.get_cacheable() + && formatted_ast.is_some() + { let key = gen_result_cache_key(formatted_ast.as_ref().unwrap()); let kv_store = UserApiProvider::instance().get_meta_store_client(); let cache_reader = ResultCacheReader::create( diff --git a/src/query/service/src/interpreters/interpreter_select.rs b/src/query/service/src/interpreters/interpreter_select.rs index f987a26c203b..fd124c444d61 100644 --- a/src/query/service/src/interpreters/interpreter_select.rs +++ b/src/query/service/src/interpreters/interpreter_select.rs @@ -290,7 +290,10 @@ impl Interpreter for SelectInterpreter { info!("Query physical plan: \n{}", query_plan); - if self.ctx.get_settings().get_enable_query_result_cache()? && self.ctx.get_cacheable() { + if self.ctx.get_settings().get_enable_query_result_cache()? + && self.ctx.get_cacheable() + && self.formatted_ast.is_some() + { let key = gen_result_cache_key(self.formatted_ast.as_ref().unwrap()); // 1. Try to get result from cache. let kv_store = UserApiProvider::instance().get_meta_store_client(); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index b61096163c86..3bb3974a3877 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1173,6 +1173,10 @@ impl TableContext for QueryContext { self.shared.session.session_ctx.get_variable(key) } + fn get_all_variables(&self) -> HashMap { + self.shared.session.session_ctx.get_all_variables() + } + #[async_backtrace::framed] async fn load_datalake_schema( &self, diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 3144099b96e2..9e4bab763e1c 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -379,7 +379,7 @@ impl QueryContextShared { table: Arc, catalog_name: &str, ) -> Result> { - if table.engine() == "STREAM" { + if table.is_stream() { let tenant = self.get_tenant(); let catalog = self .catalog_manager diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index d60c81043795..d9098b2e0c0c 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -880,6 +880,10 @@ impl TableContext for CtxDelegation { None } + fn get_all_variables(&self) -> HashMap { + HashMap::new() + } + fn get_license_key(&self) -> String { self.ctx.get_license_key() } @@ -1018,10 +1022,12 @@ async fn test_get_same_table_once() -> Result<()> { .load(std::sync::atomic::Ordering::SeqCst), 1 ); - assert_eq!( + + // plan cache need get table + assert!( ctx.table_from_cache - .load(std::sync::atomic::Ordering::SeqCst), - 2 + .load(std::sync::atomic::Ordering::SeqCst) + >= 2 ); Ok(()) diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index f5b82412bf62..885cba89280f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -726,6 +726,10 @@ impl TableContext for CtxDelegation { None } + fn get_all_variables(&self) -> HashMap { + HashMap::new() + } + fn set_materialized_cte( &self, _idx: (usize, usize), diff --git a/src/query/settings/src/settings.rs b/src/query/settings/src/settings.rs index 7a3b9357b636..e2e9637bfaae 100644 --- a/src/query/settings/src/settings.rs +++ b/src/query/settings/src/settings.rs @@ -31,7 +31,7 @@ use crate::settings_default::DefaultSettings; use crate::settings_default::SettingRange; use crate::SettingMode; -#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)] pub enum ScopeLevel { Default, Local, @@ -58,7 +58,7 @@ impl Debug for ScopeLevel { } } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Debug)] pub struct ChangeValue { pub level: ScopeLevel, pub value: UserSettingValue, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1134be25623b..bdede012ae3a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -380,6 +380,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: None, }), + ("enable_planner_cache", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enables caching logic plan from same query.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("enable_query_result_cache", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enables caching query results to improve performance for identical queries.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index f3328598dd37..3db301a3329c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -351,6 +351,10 @@ impl Settings { Ok(self.try_get_u64("hide_options_in_show_create_table")? != 0) } + pub fn get_enable_planner_cache(&self) -> Result { + Ok(self.try_get_u64("enable_planner_cache")? != 0) + } + pub fn get_enable_query_result_cache(&self) -> Result { Ok(self.try_get_u64("enable_query_result_cache")? != 0) } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index 4d6686145b45..2dfa30961a61 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -47,6 +47,7 @@ databend-common-storages-result-cache = { workspace = true } databend-common-storages-view = { workspace = true } databend-common-users = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } +databend-storages-common-cache = { workspace = true } databend-storages-common-table-meta = { workspace = true } derive-visitor = { workspace = true } educe = "0.4" @@ -69,6 +70,7 @@ recursive = "0.1.1" regex = { workspace = true } roaring = "0.10.1" serde = { workspace = true } +sha2 = { workspace = true } simsearch = "0.2" time = "0.3.14" url = "2.3.1" diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index e20125d3521e..767c91afd5bc 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -129,14 +129,14 @@ impl Binder { } }; - if consume && table_meta.engine() != "STREAM" { + if consume && !table_meta.is_stream() { return Err(ErrorCode::StorageUnsupported( "WITH CONSUME only support in STREAM", )); } if navigation.is_some_and(|n| matches!(n, TimeNavigation::Changes { .. })) - || table_meta.engine() == "STREAM" + || table_meta.is_stream() { let change_type = get_change_type(&table_name_alias); if change_type.is_some() { diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index d5c53957e659..2db96cfe3f6c 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -104,6 +104,8 @@ pub struct Binder { /// For the recursive cte, the cte table name occurs in the recursive cte definition and main query /// if meet recursive cte table name in cte definition, set `bind_recursive_cte` true and treat it as `CteScan`. pub bind_recursive_cte: bool, + + pub enable_result_cache: bool, } impl<'a> Binder { @@ -114,6 +116,10 @@ impl<'a> Binder { metadata: MetadataRef, ) -> Self { let dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default(); + let enable_result_cache = ctx + .get_settings() + .get_enable_query_result_cache() + .unwrap_or_default(); Binder { ctx, dialect, @@ -125,6 +131,7 @@ impl<'a> Binder { ctes_map: Box::default(), expression_scan_context: ExpressionScanContext::new(), bind_recursive_cte: false, + enable_result_cache, } } @@ -169,7 +176,6 @@ impl<'a> Binder { // Remove unused cache columns and join conditions and construct ExpressionScan's child. (s_expr, _) = self.construct_expression_scan(&s_expr, self.metadata.clone())?; - let formatted_ast = if self.ctx.get_settings().get_enable_query_result_cache()? { Some(stmt.to_string()) } else { diff --git a/src/query/sql/src/planner/binder/copy_into_location.rs b/src/query/sql/src/planner/binder/copy_into_location.rs index ddd7c575a196..965be116b751 100644 --- a/src/query/sql/src/planner/binder/copy_into_location.rs +++ b/src/query/sql/src/planner/binder/copy_into_location.rs @@ -42,7 +42,9 @@ impl<'a> Binder { &table.database, &table.table, ); - let subquery = format!("SELECT * FROM {catalog_name}.{database_name}.{table_name}"); + let subquery = format!( + "SELECT * FROM \"{catalog_name}\".\"{database_name}\".\"{table_name}\"" + ); let tokens = tokenize_sql(&subquery)?; let sub_stmt_msg = parse_sql(&tokens, self.dialect)?; let sub_stmt = sub_stmt_msg.0; diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 3fb7c2109305..4124842e1bee 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -666,7 +666,7 @@ impl Binder { self.normalize_object_identifier_triple(catalog, database, name); databend_common_base::runtime::block_on(async move { let stream = self.ctx.get_table(&catalog, &database, &name).await?; - if stream.engine() != "STREAM" { + if !stream.is_stream() { return Err(ErrorCode::TableEngineNotSupported(format!( "{database}.{name} is not STREAM", ))); diff --git a/src/query/sql/src/planner/mod.rs b/src/query/sql/src/planner/mod.rs index da682e860716..d1b63c97d16e 100644 --- a/src/query/sql/src/planner/mod.rs +++ b/src/query/sql/src/planner/mod.rs @@ -23,6 +23,7 @@ pub mod binder; pub mod dataframe; mod expression_parser; pub mod optimizer; +mod planner_cache; pub mod plans; mod stream_column; mod udf_validator; diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 91a0d864e2aa..fe6ce7697dca 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -45,7 +45,6 @@ use crate::plans::Plan; use crate::Binder; use crate::CountSetOps; use crate::Metadata; -use crate::MetadataRef; use crate::NameResolutionContext; use crate::VariableNormalizer; @@ -53,12 +52,11 @@ const PROBE_INSERT_INITIAL_TOKENS: usize = 128; const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8; pub struct Planner { - ctx: Arc, + pub(crate) ctx: Arc, } #[derive(Debug, Clone)] pub struct PlanExtras { - pub metadata: MetadataRef, pub format: Option, pub statement: Statement, } @@ -152,8 +150,33 @@ impl Planner { self.replace_stmt(&mut stmt)?; // Step 3: Bind AST with catalog, and generate a pure logical SExpr - let metadata = Arc::new(RwLock::new(Metadata::default())); let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let mut enable_planner_cache = + self.ctx.get_settings().get_enable_planner_cache()?; + let planner_cache_key = if enable_planner_cache { + Some(Self::planner_cache_key(&stmt.to_string())) + } else { + None + }; + + if enable_planner_cache { + let (c, plan) = self.get_cache( + name_resolution_ctx.clone(), + planner_cache_key.as_ref().unwrap(), + &stmt, + ); + if let Some(mut plan) = plan { + info!("logical plan from cache, time used: {:?}", start.elapsed()); + // update for clickhouse handler + plan.extras.format = format; + self.ctx + .attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql()); + return Ok((plan.plan, plan.extras)); + } + enable_planner_cache = c; + } + + let metadata = Arc::new(RwLock::new(Metadata::default())); let binder = Binder::new( self.ctx.clone(), CatalogManager::instance(), @@ -176,11 +199,21 @@ impl Planner { .with_enable_dphyp(settings.get_enable_dphyp()?); let optimized_plan = optimize(opt_ctx, plan).await?; - Ok((optimized_plan, PlanExtras { - metadata, + let result = (optimized_plan, PlanExtras { format, statement: stmt, - })) + }); + + if enable_planner_cache { + self.set_cache( + planner_cache_key.clone().unwrap(), + result.0.clone(), + result.1.clone(), + ); + Ok(result) + } else { + Ok(result) + } } .await; diff --git a/src/query/sql/src/planner/planner_cache.rs b/src/query/sql/src/planner/planner_cache.rs new file mode 100644 index 000000000000..ebe63d3aa925 --- /dev/null +++ b/src/query/sql/src/planner/planner_cache.rs @@ -0,0 +1,226 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::LazyLock; + +use databend_common_ast::ast::FunctionCall; +use databend_common_ast::ast::Identifier; +use databend_common_ast::ast::IdentifierType; +use databend_common_ast::ast::Statement; +use databend_common_ast::ast::TableReference; +use databend_common_catalog::table_context::TableContext; +use databend_common_expression::Scalar; +use databend_common_expression::TableSchemaRef; +use databend_common_settings::ChangeValue; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheValue; +use databend_storages_common_cache::InMemoryLruCache; +use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; +use derive_visitor::Drive; +use derive_visitor::Visitor; +use itertools::Itertools; +use sha2::Digest; +use sha2::Sha256; + +use crate::normalize_identifier; +use crate::plans::Plan; +use crate::NameResolutionContext; +use crate::PlanExtras; +use crate::Planner; + +#[derive(Clone)] +pub struct PlanCacheItem { + pub(crate) plan: Plan, + pub(crate) extras: PlanExtras, + pub(crate) setting_changes: Vec<(String, ChangeValue)>, + pub(crate) variables: HashMap, +} + +static PLAN_CACHE: LazyLock> = + LazyLock::new(|| InMemoryLruCache::with_items_capacity("planner_cache".to_string(), 512)); + +impl From for CacheValue { + fn from(val: PlanCacheItem) -> Self { + CacheValue::new(val, 1024) + } +} + +impl Planner { + pub fn planner_cache_key(format_sql: &str) -> String { + // use sha2 to encode the sql + format!("{:x}", Sha256::digest(format_sql)) + } + + pub fn get_cache( + &self, + name_resolution_ctx: NameResolutionContext, + key: &str, + stmt: &Statement, + ) -> (bool, Option) { + if !matches!(stmt, Statement::Query(_)) { + return (false, None); + } + + let mut visitor = TableRefVisitor { + ctx: self.ctx.clone(), + schema_snapshots: vec![], + name_resolution_ctx, + cache_miss: false, + }; + stmt.drive(&mut visitor); + + if visitor.schema_snapshots.is_empty() || visitor.cache_miss { + return (false, None); + } + + let cache = LazyLock::force(&PLAN_CACHE); + if let Some(plan_item) = cache.get(key) { + let settings = self.ctx.get_settings(); + if settings.changes().len() != plan_item.setting_changes.len() { + return (true, None); + } + + let setting_changes = settings + .changes() + .iter() + .map(|s| (s.key().clone(), s.value().clone())) + .sorted_by(|a, b| Ord::cmp(&a.0, &b.0)) + .collect::>(); + + if setting_changes != plan_item.setting_changes + || self.ctx.get_all_variables() != plan_item.variables + { + return (true, None); + } + + if let Plan::Query { metadata, .. } = &plan_item.plan { + let metadata = metadata.read(); + if visitor.schema_snapshots.iter().all(|ss| { + metadata.tables().iter().any(|table| { + !table.table().is_temp() + && table.table().options().get(OPT_KEY_SNAPSHOT_LOCATION) == Some(&ss.1) + && table.table().schema().eq(&ss.0) + }) + }) { + return (!visitor.cache_miss, Some(plan_item.as_ref().clone())); + } + } + (!visitor.cache_miss, None) + } else { + (!visitor.cache_miss, None) + } + } + + pub fn set_cache(&self, key: String, plan: Plan, extras: PlanExtras) { + let setting_changes = self + .ctx + .get_settings() + .changes() + .iter() + .map(|s| (s.key().clone(), s.value().clone())) + .sorted_by(|a, b| Ord::cmp(&a.0, &b.0)) + .collect::>(); + + let variables = self.ctx.get_all_variables(); + + let plan_item = PlanCacheItem { + plan, + extras, + setting_changes, + variables, + }; + let cache = LazyLock::force(&PLAN_CACHE); + cache.insert(key, plan_item); + } +} + +#[derive(Visitor)] +#[visitor(TableReference(enter), FunctionCall(enter))] +struct TableRefVisitor { + ctx: Arc, + schema_snapshots: Vec<(TableSchemaRef, String)>, + name_resolution_ctx: NameResolutionContext, + cache_miss: bool, +} + +impl TableRefVisitor { + fn enter_function_call(&mut self, func: &FunctionCall) { + if self.cache_miss { + return; + } + + // If the function is score, we should not cache the plan + if func.name.name.to_lowercase() == "score" { + self.cache_miss = true; + } + } + + fn enter_table_reference(&mut self, table_ref: &TableReference) { + if self.cache_miss { + return; + } + if let TableReference::Table { + catalog, + database, + table, + temporal, + consume, + .. + } = table_ref + { + if temporal.is_some() || *consume { + self.cache_miss = true; + return; + } + + let catalog = catalog.to_owned().unwrap_or(Identifier { + span: None, + name: self.ctx.get_current_catalog(), + quote: None, + ident_type: IdentifierType::None, + }); + let database = database.to_owned().unwrap_or(Identifier { + span: None, + name: self.ctx.get_current_database(), + quote: None, + ident_type: IdentifierType::None, + }); + + let catalog_name = normalize_identifier(&catalog, &self.name_resolution_ctx).name; + let database_name = normalize_identifier(&database, &self.name_resolution_ctx).name; + let table_name = normalize_identifier(table, &self.name_resolution_ctx).name; + + databend_common_base::runtime::block_on(async move { + if let Ok(table_meta) = self + .ctx + .get_table(&catalog_name, &database_name, &table_name) + .await + { + if !table_meta.is_temp() + && !table_meta.is_stage_table() + && !table_meta.is_stream() + && let Some(sn) = table_meta.options().get(OPT_KEY_SNAPSHOT_LOCATION) + { + self.schema_snapshots + .push((table_meta.schema(), sn.clone())); + return; + } + } + self.cache_miss = true; + }); + } + } +} diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index f344a436bb3b..3a4f900f9e8a 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -156,6 +156,13 @@ pub struct CacheValue { } impl CacheValue { + pub fn new(inner: T, mem_bytes: usize) -> Self { + Self { + inner: Arc::new(inner), + mem_bytes, + } + } + pub fn get_inner(&self) -> Arc { self.inner.clone() } diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 688c138ce0bf..f2c893460480 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -60,7 +60,7 @@ parquet = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -sha2 = "0.10.6" +sha2 = { workspace = true } siphasher = "0.3.10" sys-info = "0.9" tantivy = { workspace = true } diff --git a/src/query/storages/result_cache/Cargo.toml b/src/query/storages/result_cache/Cargo.toml index a64104acdfcc..61366250fbfd 100644 --- a/src/query/storages/result_cache/Cargo.toml +++ b/src/query/storages/result_cache/Cargo.toml @@ -31,7 +31,7 @@ opendal = { workspace = true } parquet = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -sha2 = "0.10.6" +sha2 = { workspace = true } tokio = { workspace = true } uuid = { workspace = true } diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 04c3597ff734..6bf7013a1652 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -193,7 +193,7 @@ impl AsyncSystemTable for StreamsTable { table.name(), db_id, t_id, - ) && table.engine() == "STREAM" + ) && table.is_stream() { let stream_info = table.get_table_info(); let stream_table = StreamTable::try_from_table(table.as_ref())?; diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index ed0db7147436..df6a4d92be30 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -444,7 +444,7 @@ where TablesTable: HistoryAware table.name(), db_id, table_id, - ) && table.engine() != "STREAM" + ) && !table.is_stream() { if !WITHOUT_VIEW && table.get_table_info().engine() == "VIEW" { catalogs.push(ctl_name.as_str()); diff --git a/tests/sqllogictests/suites/base/20+_others/20_0020_planner_cache.test b/tests/sqllogictests/suites/base/20+_others/20_0020_planner_cache.test new file mode 100644 index 000000000000..fff09d3dcaea --- /dev/null +++ b/tests/sqllogictests/suites/base/20+_others/20_0020_planner_cache.test @@ -0,0 +1,81 @@ +statement ok +DROP DATABASE IF EXISTS db20_20; + +statement ok +CREATE DATABASE db20_20; + +statement ok +USE db20_20; + +statement ok +SET enable_planner_cache = 1; + +statement ok +SET variable a = 'a'; + +statement ok +CREATE TABLE IF NOT EXISTS t1 (a INT not null); + +statement ok +INSERT INTO t1 VALUES (1), (2), (3); + +query I +SELECT *, $a FROM t1 ORDER BY a; +---- +1 a +2 a +3 a + +query I +SELECT *, $a FROM t1 ORDER BY a; +---- +1 a +2 a +3 a + +statement ok +SET variable a = 'b'; + +query I +SELECT *, $a FROM t1 ORDER BY a; +---- +1 b +2 b +3 b + +statement ok +alter table t1 add column b string default 's'; + +query I +SELECT * FROM t1 ORDER BY a; +---- +1 s +2 s +3 s + +query I +SELECT * FROM t1 ORDER BY a; +---- +1 s +2 s +3 s + +statement ok +SET enable_planner_cache = 0; + +query I +SELECT * FROM t1 ORDER BY a; +---- +1 s +2 s +3 s + +query I +SELECT * FROM t1 ORDER BY a; +---- +1 s +2 s +3 s + +statement ok +DROP DATABASE db20_20; diff --git a/tests/sqllogictests/suites/base/issues/issue_10103.test b/tests/sqllogictests/suites/base/issues/issue_10103.test index f617f68c50e8..b493fec0a544 100644 --- a/tests/sqllogictests/suites/base/issues/issue_10103.test +++ b/tests/sqllogictests/suites/base/issues/issue_10103.test @@ -8,13 +8,13 @@ statement ok USE db1 statement ok -CREATE TABLE test_table(id BIGINT, name VARCHAR, age INT) +CREATE TABLE "test-table"(id BIGINT, name VARCHAR, age INT) statement ok CREATE TABLE test_ts_table(ts TIMESTAMP, name VARCHAR, age INT) statement ok -insert into test_table (id,name,age) values (1676805481000000,'2',3), (1676805481000000, '5', 6) +insert into "test-table" (id,name,age) values (1676805481000000,'2',3), (1676805481000000, '5', 6) statement ok DROP STAGE IF EXISTS test_10103 @@ -23,7 +23,7 @@ statement ok CREATE STAGE IF NOT EXISTS test_10103 statement ok -copy into @test_10103 from test_table FILE_FORMAT = (type = CSV) +copy into @test_10103 from "test-table" FILE_FORMAT = (type = CSV) statement ok copy into test_ts_table from @test_10103 FILE_FORMAT = (type = CSV) @@ -34,7 +34,7 @@ SELECT ts FROM test_ts_table LIMIT 1 2023-02-19 11:18:01.000000 statement ok -drop table test_table all +drop table "test-table" all statement ok drop table test_ts_table all