Skip to content

Commit

Permalink
[CH] Fix issues due to ClickHouse/ClickHouse#71539. (#7952)
Browse files Browse the repository at this point in the history
* Fix issues due to ClickHouse/ClickHouse#71539.

Issue 1
BuildQueryPipelineSettings is created manually instead of calling  BuildQueryPipelineSettings::fromContext(); so even ClickHouse/ClickHouse#71890 disable 'query_plan_merge_filters', UTs are still failed.

To fix  this issue, we need set correct default parameters in CHUtil.cpp

Issue 2
If we set query_plan_merge_filters to true, then ClickHouse/ClickHouse#71539 will try to split the left most AND atom to a separate DAG and hence create FilterTransformer for each And atom, which cause collecting metrics failed.

I am not sure the benefits of setting it to true, let's keep it to false.

* Calling `QueryPlan::explainPlan` after building pipeline is not correct, due to `action_dag`  is [moved](https://github.com/ClickHouse/ClickHouse/blob/22d2c856a70dfb8b6e4c506fcb22ac03d59df9be/src/Processors/QueryPlan/FilterStep.cpp#L161).
  • Loading branch information
baibaichen authored Nov 14, 2024
1 parent 7b40050 commit f65087e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
11 changes: 11 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ namespace Setting
{
extern const SettingsUInt64 prefer_external_sort_block_bytes;
extern const SettingsUInt64 max_bytes_before_external_sort;
extern const SettingsBool query_plan_merge_filters;
extern const SettingsBool compile_expressions;
extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -722,6 +725,14 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
settings.set("max_download_threads", 1);
settings.set("input_format_parquet_enable_row_group_prefetch", false);

/// update per https://github.com/ClickHouse/ClickHouse/pull/71539
/// if true, we can't get correct metrics for the query
settings[Setting::query_plan_merge_filters] = false;
/// We now set BuildQueryPipelineSettings according to config.
settings[Setting::compile_expressions] = true;
settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE;
///

for (const auto & [key, value] : spark_conf_map)
{
// Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings
Expand Down
16 changes: 10 additions & 6 deletions cpp-ch/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace pb_util = google::protobuf::util;
namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
void dumpPlan(DB::QueryPlan & plan, const char * type, bool force, LoggerPtr logger)
{
if (!logger)
{
Expand All @@ -51,10 +51,12 @@ void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
return;

auto out = local_engine::PlanUtil::explainPlan(plan);
auto task_id = local_engine::QueryContext::instance().currentTaskIdOrEmpty();
task_id = task_id.empty() ? "" : "(" + task_id + ")";
if (force) // force
LOG_ERROR(logger, "clickhouse plan({}):\n{}", local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, out);
else
LOG_DEBUG(logger, "clickhouse plan({}):\n{}", local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, out);
}

void dumpMessage(const google::protobuf::Message & message, const char * type, bool force, LoggerPtr logger)
Expand All @@ -70,13 +72,15 @@ void dumpMessage(const google::protobuf::Message & message, const char * type, b
return;
pb_util::JsonOptions options;
std::string json;
if (auto s = google::protobuf::json::MessageToJsonString(message, &json, options); !s.ok())
if (auto s = MessageToJsonString(message, &json, options); !s.ok())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type);

auto task_id = local_engine::QueryContext::instance().currentTaskIdOrEmpty();
task_id = task_id.empty() ? "" : "(" + task_id + ")";
if (force) // force
LOG_ERROR(logger, "{}({}):\n{}", type, local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, json);
else
LOG_DEBUG(logger, "{}({}):\n{}", type, local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, json);
}

void headBlock(const DB::Block & block, size_t count)
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/DebugUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class QueryPlan;
namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr);
void dumpPlan(DB::QueryPlan & plan, const char * type = "clickhouse plan", bool force = false, LoggerPtr = nullptr);
void dumpMessage(const google::protobuf::Message & message, const char * type, bool force = false, LoggerPtr = nullptr);

void headBlock(const DB::Block & block, size_t count = 10);
Expand Down
33 changes: 13 additions & 20 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,18 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, char c)
return res;
}

void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel & root_rel) const
void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan)
{
const substrait::PlanRel & root_rel = plan.relations().at(0);
if (root_rel.root().names_size())
{
ActionsDAG actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())};
NamesWithAliases aliases;
const auto cols = query_plan->getCurrentHeader().getNamesAndTypesList();
if (cols.getNames().size() != static_cast<size_t>(root_rel.root().names_size()))
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
debug::dumpPlan(*query_plan, "clickhouse plan", true);
debug::dumpMessage(plan, "substrait::Plan", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait plan name size {}.",
Expand All @@ -134,8 +135,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con
const auto & original_cols = original_header.getColumnsWithTypeAndName();
if (static_cast<size_t>(output_schema.types_size()) != original_cols.size())
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
debug::dumpPlan(*query_plan, "clickhouse plan", true);
debug::dumpMessage(plan, "substrait::Plan", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait plan output schema size {}, subtrait plan name size {}.",
Expand Down Expand Up @@ -198,7 +199,7 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
std::list<const substrait::Rel *> rel_stack;
auto query_plan = parseOp(first_read_rel, rel_stack);
if (!writePipeline)
adjustOutput(query_plan, root_rel);
adjustOutput(query_plan, plan);

#ifndef NDEBUG
PlanUtil::checkOuputType(*query_plan);
Expand Down Expand Up @@ -297,12 +298,10 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl
settings,
0);
const QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings[Setting::query_plan_enable_optimizations]};
return query_plan.buildQueryPipeline(
optimization_settings,
BuildQueryPipelineSettings{
.actions_settings
= ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3, .compile_expressions = CompileExpressions::yes},
.process_list_element = query_status});
BuildQueryPipelineSettings build_settings = BuildQueryPipelineSettings::fromContext(context);
build_settings.process_list_element = query_status;
build_settings.progress_callback = nullptr;
return query_plan.buildQueryPipeline(optimization_settings,build_settings);
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::string_view plan)
Expand All @@ -311,19 +310,18 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::s
return createExecutor(parse(s_plan), s_plan);
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan)
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const
{
Stopwatch stopwatch;

const Settings & settings = parser_context->queryContext()->getSettingsRef();
DB::QueryPipelineBuilderPtr builder = nullptr;
try
{
builder = buildQueryPipeline(*query_plan);
}
catch (...)
{
LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}", PlanUtil::explainPlan(*query_plan));
debug::dumpPlan(*query_plan, "Invalid clickhouse plan", true);
throw;
}

Expand All @@ -333,11 +331,6 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
if (root_rel.root().input().has_write())
addSinkTransform(parser_context->queryContext(), root_rel.root().input().write(), builder);
LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
LOG_DEBUG(
getLogger("SerializedPlanParser"),
"clickhouse plan [optimization={}]:\n{}",
settings[Setting::query_plan_enable_optimizations],
PlanUtil::explainPlan(*query_plan));

auto config = ExecutorConfig::loadFromContext(parser_context->queryContext());
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(builder), config.dump_pipeline);
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class NonNullableColumnsResolver
class SerializedPlanParser
{
private:
std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan);
std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const;

public:
explicit SerializedPlanParser(std::shared_ptr<const ParserContext> parser_context_);
Expand Down Expand Up @@ -118,7 +118,7 @@ class SerializedPlanParser

private:
DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack);
void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel & root_rel) const;
static void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan);

std::vector<jobject> input_iters;
std::vector<std::string> split_infos;
Expand Down

0 comments on commit f65087e

Please sign in to comment.