Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 19, 2024
1 parent 294e8a3 commit 55a602a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 33 deletions.
72 changes: 42 additions & 30 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,35 +203,6 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
}
}

pub async fn plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_sample_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);
let extras = planner.parse_sql(sql)?;
if !acquire_queue {
let plan = planner.plan_stmt(&extras.statement).await?;
return Ok((plan, extras, AcquireQueueGuard::create(None)));
}

let need_acquire_lock = need_acquire_lock(&extras.statement);
if need_acquire_lock {
let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
let plan = planner.plan_stmt(&extras.statement).await?;
Ok((plan, extras, guard))
} else {
let plan = planner.plan_stmt(&extras.statement).await?;
let query_entry = QueryEntry::create(&ctx, &plan, &extras)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
Ok((plan, extras, guard))
}
}

/// There are two steps to execute a query:
/// 1. Plan the SQL
/// 2. Execute the plan -- interpreter
Expand Down Expand Up @@ -263,6 +234,42 @@ pub async fn interpreter_plan_sql(
result
}

async fn plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_sample_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);

// Parse the SQL query, get extract additional information.
let extras = planner.parse_sql(sql)?;
if !acquire_queue {
// If queue guard is not required, plan the statement directly.
let plan = planner.plan_stmt(&extras.statement).await?;
return Ok((plan, extras, AcquireQueueGuard::create(None)));
}

let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement);
if need_acquire_lock {
// If a lock is required, acquire the queue guard before
// planning the statement, to avoid potential deadlocks.
// See PR https://github.com/databendlabs/databend/pull/16632
let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
let plan = planner.plan_stmt(&extras.statement).await?;
Ok((plan, extras, guard))
} else {
// No lock is needed, plan the statement first, then acquire the queue guard.
let plan = planner.plan_stmt(&extras.statement).await?;
let query_entry = QueryEntry::create(&ctx, &plan, &extras)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
Ok((plan, extras, guard))
}
}

fn attach_query_hash(ctx: &Arc<QueryContext>, stmt: &mut Option<Statement>, sql: &str) {
let (query_hash, query_parameterized_hash) = if let Some(stmt) = stmt {
let query_hash = format!("{:x}", Md5::digest(stmt.to_string()));
Expand Down Expand Up @@ -336,7 +343,12 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
}
}

fn need_acquire_lock(stmt: &Statement) -> bool {
/// Check if the statement need acquire a table lock.
fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
if !ctx.get_settings().get_enable_table_lock().unwrap_or(false) {
return false;
}

match stmt {
Statement::Replace(_)
| Statement::MergeInto(_)
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ pub use access::ManagementModeAccess;
pub use common::InterpreterQueryLog;
pub use hook::HookOperator;
pub use interpreter::interpreter_plan_sql;
pub use interpreter::plan_sql;
pub use interpreter::Interpreter;
pub use interpreter::InterpreterPtr;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use serde::Deserialize;
use serde::Serialize;

use crate::interpreters::interpreter_plan_sql;
use crate::interpreters::plan_sql;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterPtr;
use crate::servers::http::middleware::sanitize_request_headers;
Expand Down Expand Up @@ -338,7 +337,7 @@ pub async fn clickhouse_handler_post(
};
info!("receive clickhouse http post, (query + body) = {}", &msg);

let (mut plan, extras, _guard) = plan_sql(ctx.clone(), &sql, true)
let (mut plan, extras, _guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;
Expand Down

0 comments on commit 55a602a

Please sign in to comment.