Skip to content

Commit

Permalink
feat(query): add plan cache (#16333)
Browse files Browse the repository at this point in the history
* feat(query): add plan cache

* feat(query): add plan cache

* feat(query): add plan cache

* feat(query): add plan cache

* feat(query): add plan cache for query only

* feat(query): add plan cache for query only

* feat(query): disable temp table in plan cache

* update

* update

* update

* update

* update

* update

* fix tests

* update

* update
  • Loading branch information
sundy-li authored Aug 28, 2024
1 parent a055124 commit 9d904de
Show file tree
Hide file tree
Showing 33 changed files with 429 additions and 33 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ ordq = "0.2.0"
parking_lot = "0.12.1"
parquet = { version = "52", features = ["async"] }
paste = "1.0.15"
sha2 = "0.10.8"
# TODO: let's use native tls instead.
iceberg = { version = "0.3.0" }
iceberg-catalog-hms = { version = "0.3.0" }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha1 = "0.10.5"
sha2 = "0.10.6"
sha2 = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ parquet = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = "0.10.6"
sha2 = { workspace = true }
thrift = "0.17.0"
typetag = { workspace = true }
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
Expand Down
4 changes: 4 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ pub trait Table: Sync + Send {
is_temp
}

fn is_stream(&self) -> bool {
self.engine() == "STREAM"
}

fn use_own_sample_block(&self) -> bool {
false
}
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub trait TableContext: Send + Sync {
fn set_variable(&self, key: String, value: Scalar);
fn unset_variable(&self, key: &str);
fn get_variable(&self, key: &str) -> Option<Scalar>;
fn get_all_variables(&self) -> HashMap<String, Scalar>;

async fn load_datalake_schema(
&self,
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ roaring = "0.10.1"
serde = { workspace = true }
serde_json = { workspace = true }
sha1 = "0.10.5"
sha2 = "0.10.6"
sha2 = { workspace = true }
simdutf8 = "0.1.4"
simple_hll = { version = "0.0.1", features = ["serde_borsh"] }
siphasher = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_stacker = { workspace = true }
serde_urlencoded = "0.7.1"
sha2 = "0.10.8"
sha2 = { workspace = true }
socket2 = "0.5.3"
strength_reduce = "0.2.4"
sysinfo = "0.30"
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Catalog for SessionCatalog {
return self.get_table_by_info(&table);
}
let table = self.inner.get_table(tenant, db_name, table_name).await?;
if table.engine() == "STREAM" && is_active {
if table.is_stream() && is_active {
self.txn_mgr
.lock()
.upsert_table_desc_to_id(table.get_table_info().clone());
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ impl ExplainInterpreter {
metadata: &MetadataRef,
formatted_ast: &Option<String>,
) -> Result<Vec<DataBlock>> {
if self.ctx.get_settings().get_enable_query_result_cache()? && self.ctx.get_cacheable() {
if self.ctx.get_settings().get_enable_query_result_cache()?
&& self.ctx.get_cacheable()
&& formatted_ast.is_some()
{
let key = gen_result_cache_key(formatted_ast.as_ref().unwrap());
let kv_store = UserApiProvider::instance().get_meta_store_client();
let cache_reader = ResultCacheReader::create(
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ impl Interpreter for SelectInterpreter {

info!("Query physical plan: \n{}", query_plan);

if self.ctx.get_settings().get_enable_query_result_cache()? && self.ctx.get_cacheable() {
if self.ctx.get_settings().get_enable_query_result_cache()?
&& self.ctx.get_cacheable()
&& self.formatted_ast.is_some()
{
let key = gen_result_cache_key(self.formatted_ast.as_ref().unwrap());
// 1. Try to get result from cache.
let kv_store = UserApiProvider::instance().get_meta_store_client();
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,10 @@ impl TableContext for QueryContext {
self.shared.session.session_ctx.get_variable(key)
}

fn get_all_variables(&self) -> HashMap<String, Scalar> {
self.shared.session.session_ctx.get_all_variables()
}

#[async_backtrace::framed]
async fn load_datalake_schema(
&self,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl QueryContextShared {
table: Arc<dyn Table>,
catalog_name: &str,
) -> Result<Arc<dyn Table>> {
if table.engine() == "STREAM" {
if table.is_stream() {
let tenant = self.get_tenant();
let catalog = self
.catalog_manager
Expand Down
12 changes: 9 additions & 3 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,10 @@ impl TableContext for CtxDelegation {
None
}

fn get_all_variables(&self) -> HashMap<String, Scalar> {
HashMap::new()
}

fn get_license_key(&self) -> String {
self.ctx.get_license_key()
}
Expand Down Expand Up @@ -1018,10 +1022,12 @@ async fn test_get_same_table_once() -> Result<()> {
.load(std::sync::atomic::Ordering::SeqCst),
1
);
assert_eq!(

// plan cache need get table
assert!(
ctx.table_from_cache
.load(std::sync::atomic::Ordering::SeqCst),
2
.load(std::sync::atomic::Ordering::SeqCst)
>= 2
);

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ impl TableContext for CtxDelegation {
None
}

fn get_all_variables(&self) -> HashMap<String, Scalar> {
HashMap::new()
}

fn set_materialized_cte(
&self,
_idx: (usize, usize),
Expand Down
4 changes: 2 additions & 2 deletions src/query/settings/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::settings_default::DefaultSettings;
use crate::settings_default::SettingRange;
use crate::SettingMode;

#[derive(serde::Serialize, serde::Deserialize, Clone)]
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)]
pub enum ScopeLevel {
Default,
Local,
Expand All @@ -58,7 +58,7 @@ impl Debug for ScopeLevel {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Debug)]
pub struct ChangeValue {
pub level: ScopeLevel,
pub value: UserSettingValue,
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: None,
}),
("enable_planner_cache", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables caching logic plan from same query.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_query_result_cache", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables caching query results to improve performance for identical queries.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ impl Settings {
Ok(self.try_get_u64("hide_options_in_show_create_table")? != 0)
}

pub fn get_enable_planner_cache(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_planner_cache")? != 0)
}

pub fn get_enable_query_result_cache(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_query_result_cache")? != 0)
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ databend-common-storages-result-cache = { workspace = true }
databend-common-storages-view = { workspace = true }
databend-common-users = { workspace = true }
databend-enterprise-data-mask-feature = { workspace = true }
databend-storages-common-cache = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
derive-visitor = { workspace = true }
educe = "0.4"
Expand All @@ -69,6 +70,7 @@ recursive = "0.1.1"
regex = { workspace = true }
roaring = "0.10.1"
serde = { workspace = true }
sha2 = { workspace = true }
simsearch = "0.2"
time = "0.3.14"
url = "2.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ impl Binder {
}
};

if consume && table_meta.engine() != "STREAM" {
if consume && !table_meta.is_stream() {
return Err(ErrorCode::StorageUnsupported(
"WITH CONSUME only support in STREAM",
));
}

if navigation.is_some_and(|n| matches!(n, TimeNavigation::Changes { .. }))
|| table_meta.engine() == "STREAM"
|| table_meta.is_stream()
{
let change_type = get_change_type(&table_name_alias);
if change_type.is_some() {
Expand Down
8 changes: 7 additions & 1 deletion src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ pub struct Binder {
/// For the recursive cte, the cte table name occurs in the recursive cte definition and main query
/// if meet recursive cte table name in cte definition, set `bind_recursive_cte` true and treat it as `CteScan`.
pub bind_recursive_cte: bool,

pub enable_result_cache: bool,
}

impl<'a> Binder {
Expand All @@ -114,6 +116,10 @@ impl<'a> Binder {
metadata: MetadataRef,
) -> Self {
let dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
let enable_result_cache = ctx
.get_settings()
.get_enable_query_result_cache()
.unwrap_or_default();
Binder {
ctx,
dialect,
Expand All @@ -125,6 +131,7 @@ impl<'a> Binder {
ctes_map: Box::default(),
expression_scan_context: ExpressionScanContext::new(),
bind_recursive_cte: false,
enable_result_cache,
}
}

Expand Down Expand Up @@ -169,7 +176,6 @@ impl<'a> Binder {

// Remove unused cache columns and join conditions and construct ExpressionScan's child.
(s_expr, _) = self.construct_expression_scan(&s_expr, self.metadata.clone())?;

let formatted_ast = if self.ctx.get_settings().get_enable_query_result_cache()? {
Some(stmt.to_string())
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl<'a> Binder {
&table.database,
&table.table,
);
let subquery = format!("SELECT * FROM {catalog_name}.{database_name}.{table_name}");
let subquery = format!(
"SELECT * FROM \"{catalog_name}\".\"{database_name}\".\"{table_name}\""
);
let tokens = tokenize_sql(&subquery)?;
let sub_stmt_msg = parse_sql(&tokens, self.dialect)?;
let sub_stmt = sub_stmt_msg.0;
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ impl Binder {
self.normalize_object_identifier_triple(catalog, database, name);
databend_common_base::runtime::block_on(async move {
let stream = self.ctx.get_table(&catalog, &database, &name).await?;
if stream.engine() != "STREAM" {
if !stream.is_stream() {
return Err(ErrorCode::TableEngineNotSupported(format!(
"{database}.{name} is not STREAM",
)));
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod binder;
pub mod dataframe;
mod expression_parser;
pub mod optimizer;
mod planner_cache;
pub mod plans;
mod stream_column;
mod udf_validator;
Expand Down
47 changes: 40 additions & 7 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,18 @@ use crate::plans::Plan;
use crate::Binder;
use crate::CountSetOps;
use crate::Metadata;
use crate::MetadataRef;
use crate::NameResolutionContext;
use crate::VariableNormalizer;

const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;

pub struct Planner {
ctx: Arc<dyn TableContext>,
pub(crate) ctx: Arc<dyn TableContext>,
}

#[derive(Debug, Clone)]
pub struct PlanExtras {
pub metadata: MetadataRef,
pub format: Option<String>,
pub statement: Statement,
}
Expand Down Expand Up @@ -152,8 +150,33 @@ impl Planner {
self.replace_stmt(&mut stmt)?;

// Step 3: Bind AST with catalog, and generate a pure logical SExpr
let metadata = Arc::new(RwLock::new(Metadata::default()));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut enable_planner_cache =
self.ctx.get_settings().get_enable_planner_cache()?;
let planner_cache_key = if enable_planner_cache {
Some(Self::planner_cache_key(&stmt.to_string()))
} else {
None
};

if enable_planner_cache {
let (c, plan) = self.get_cache(
name_resolution_ctx.clone(),
planner_cache_key.as_ref().unwrap(),
&stmt,
);
if let Some(mut plan) = plan {
info!("logical plan from cache, time used: {:?}", start.elapsed());
// update for clickhouse handler
plan.extras.format = format;
self.ctx
.attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql());
return Ok((plan.plan, plan.extras));
}
enable_planner_cache = c;
}

let metadata = Arc::new(RwLock::new(Metadata::default()));
let binder = Binder::new(
self.ctx.clone(),
CatalogManager::instance(),
Expand All @@ -176,11 +199,21 @@ impl Planner {
.with_enable_dphyp(settings.get_enable_dphyp()?);

let optimized_plan = optimize(opt_ctx, plan).await?;
Ok((optimized_plan, PlanExtras {
metadata,
let result = (optimized_plan, PlanExtras {
format,
statement: stmt,
}))
});

if enable_planner_cache {
self.set_cache(
planner_cache_key.clone().unwrap(),
result.0.clone(),
result.1.clone(),
);
Ok(result)
} else {
Ok(result)
}
}
.await;

Expand Down
Loading

0 comments on commit 9d904de

Please sign in to comment.