Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 20, 2024
1 parent 4557131 commit 6526f4e
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 165 deletions.
83 changes: 76 additions & 7 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::SystemTime;

use databend_common_ast::ast::AlterTableAction;
use databend_common_ast::ast::AlterTableStmt;
use databend_common_ast::ast::Literal;
use databend_common_ast::ast::ModifyColumnAction;
use databend_common_ast::ast::Statement;
use databend_common_base::base::short_sql;
use databend_common_base::runtime::profile::get_statistics_desc;
Expand Down Expand Up @@ -54,7 +57,10 @@ use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::ServiceQueryExecutor;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::SessionManager;
use crate::stream::DataBlockStream;
use crate::stream::ProgressStream;
Expand Down Expand Up @@ -202,17 +208,18 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
/// 2. Execute the plan -- interpreter
///
/// This function is used to plan the SQL. If an error occurs, we will log the query start and finished.
pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(Plan, PlanExtras)> {
let mut planner = Planner::new_with_query_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);
let result = planner.plan_sql(sql).await;
pub async fn interpreter_plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let result = plan_sql(ctx.clone(), sql, acquire_queue).await;

let short_sql = short_sql(
sql.to_string(),
ctx.get_settings().get_short_sql_max_length()?,
);
let mut stmt = if let Ok((_, extras)) = &result {
let mut stmt = if let Ok((_, extras, _)) = &result {
Some(extras.statement.clone())
} else {
// Only log if there's an error
Expand All @@ -227,6 +234,42 @@ pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(
result
}

async fn plan_sql(
ctx: Arc<QueryContext>,
sql: &str,
acquire_queue: bool,
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_query_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);

// Parse the SQL query, get extract additional information.
let extras = planner.parse_sql(sql)?;
if !acquire_queue {
// If queue guard is not required, plan the statement directly.
let plan = planner.plan_stmt(&extras.statement).await?;
return Ok((plan, extras, AcquireQueueGuard::create(None)));
}

let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement);
if need_acquire_lock {
// If a lock is required, acquire the queue guard before
// planning the statement, to avoid potential deadlocks.
// See PR https://github.com/databendlabs/databend/pull/16632
let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
let plan = planner.plan_stmt(&extras.statement).await?;
Ok((plan, extras, guard))
} else {
// No lock is needed, plan the statement first, then acquire the queue guard.
let plan = planner.plan_stmt(&extras.statement).await?;
let query_entry = QueryEntry::create(&ctx, &plan, &extras)?;
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
Ok((plan, extras, guard))
}
}

fn attach_query_hash(ctx: &Arc<QueryContext>, stmt: &mut Option<Statement>, sql: &str) {
let (query_hash, query_parameterized_hash) = if let Some(stmt) = stmt {
let query_hash = format!("{:x}", Md5::digest(stmt.to_string()));
Expand Down Expand Up @@ -299,3 +342,29 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
Err(error) => Err(error.clone()),
}
}

/// Check if the statement need acquire a table lock.
fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
if !ctx.get_settings().get_enable_table_lock().unwrap_or(false) {
return false;
}

match stmt {
Statement::Replace(_)
| Statement::MergeInto(_)
| Statement::Update(_)
| Statement::Delete(_)
| Statement::OptimizeTable(_)
| Statement::TruncateTable(_) => true,

Statement::AlterTable(AlterTableStmt { action, .. }) => matches!(
action,
AlterTableAction::ReclusterTable { .. }
| AlterTableAction::ModifyColumn {
action: ModifyColumnAction::SetDataType(_),
}
),

_ => false,
}
}
20 changes: 2 additions & 18 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_expression::DataSchemaRef;
use databend_common_formats::ClickhouseFormatType;
use databend_common_formats::FileFormatOptionsExt;
use databend_common_formats::FileFormatTypeExt;
use databend_common_sql::Planner;
use fastrace::func_path;
use fastrace::prelude::*;
use futures::StreamExt;
Expand Down Expand Up @@ -56,9 +55,7 @@ use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterPtr;
use crate::servers::http::middleware::sanitize_request_headers;
use crate::servers::http::v1::HttpQueryContext;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::SessionType;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -256,16 +253,11 @@ pub async fn clickhouse_handler_get(
let default_format = get_default_format(&params, headers).map_err(BadRequest)?;
let sql = params.query();
// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(context.clone(), &sql)
let (plan, extras, _guard) = interpreter_plan_sql(context.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;

let query_entry = QueryEntry::create(&context, &plan, &extras).map_err(BadRequest)?;
let _guard = QueriesQueueManager::instance()
.acquire(query_entry)
.await
.map_err(BadRequest)?;
let format = get_format_with_default(extras.format, default_format)?;
let interpreter = InterpreterFactory::get(context.clone(), &plan)
.await
Expand Down Expand Up @@ -345,19 +337,11 @@ pub async fn clickhouse_handler_post(
};
info!("receive clickhouse http post, (query + body) = {}", &msg);

let mut planner = Planner::new(ctx.clone());
let (mut plan, extras) = planner
.plan_sql(&sql)
let (mut plan, extras, _guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;

let entry = QueryEntry::create(&ctx, &plan, &extras).map_err(BadRequest)?;
let _guard = QueriesQueueManager::instance()
.acquire(entry)
.await
.map_err(BadRequest)?;

let mut handle = None;
let output_schema = plan.schema();

Expand Down
21 changes: 1 addition & 20 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::http_query::ResponseState;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::Session;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -346,32 +344,15 @@ impl ExecuteState {
info!("http query prepare to plan sql");

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql)
let (plan, _, queue_guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
.await
.map_err(|err| err.display_with_sql(&sql))
.with_context(make_error)?;

let query_queue_manager = QueriesQueueManager::instance();

info!(
"http query preparing to acquire from query queue, length: {}",
query_queue_manager.length()
);

let entry = QueryEntry::create(&ctx, &plan, &extras).with_context(make_error)?;
let queue_guard = query_queue_manager
.acquire(entry)
.await
.with_context(make_error)?;
{
// set_var may change settings
let mut guard = format_settings.write();
*guard = Some(ctx.get_format_settings().with_context(make_error)?);
}
info!(
"http query finished acquiring from queue, length: {}",
query_queue_manager.length()
);

let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ use crate::servers::mysql::writers::ProgressReporter;
use crate::servers::mysql::writers::QueryResult;
use crate::servers::mysql::MySQLFederated;
use crate::servers::mysql::MYSQL_VERSION;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::stream::DataBlockStream;
Expand Down Expand Up @@ -377,10 +375,7 @@ impl InteractiveWorkerBase {
context.set_id(query_id);

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(context.clone(), query).await?;

let entry = QueryEntry::create(&context, &plan, &extras)?;
let _guard = QueriesQueueManager::instance().acquire(entry).await?;
let (plan, _, _guard) = interpreter_plan_sql(context.clone(), query, true).await?;

let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
let has_result_set = plan.has_result_set();
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/sessions/queue_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub struct QueryEntry {
}

impl QueryEntry {
fn create_entry(
pub fn create_entry(
ctx: &Arc<QueryContext>,
plan_extras: &PlanExtras,
need_acquire_to_queue: bool,
Expand Down Expand Up @@ -402,6 +402,7 @@ impl QueryEntry {
| Plan::VacuumTable(_)
| Plan::VacuumTemporaryFiles(_)
| Plan::RefreshIndex(_)
| Plan::ReclusterTable { .. }
| Plan::TruncateTable(_) => {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl SuggestedBackgroundTasksSource {
sql: String,
) -> Result<Option<RecordBatch>> {
// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, _) = interpreter_plan_sql(ctx.clone(), sql.as_str()).await?;
let (plan, _, _) = interpreter_plan_sql(ctx.clone(), sql.as_str(), false).await?;

let data_schema = plan.schema();
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/servers/admin/v1/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
.get_current_session()
.set_authed_user(user, None)
.await?;
let (plan, _) = interpreter_plan_sql(query_ctx.clone(), sql).await?;
let (plan, _, _) = interpreter_plan_sql(query_ctx.clone(), sql, false).await?;

InterpreterFactory::get(query_ctx.clone(), &plan).await
}
Expand Down
Loading

0 comments on commit 6526f4e

Please sign in to comment.