diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 778c4f257b19..2413fae9e350 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -80,6 +80,9 @@ namespace Setting { extern const SettingsUInt64 prefer_external_sort_block_bytes; extern const SettingsUInt64 max_bytes_before_external_sort; +extern const SettingsBool query_plan_merge_filters; +extern const SettingsBool compile_expressions; +extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation; } namespace ErrorCodes { @@ -722,6 +725,14 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_ settings.set("max_download_threads", 1); settings.set("input_format_parquet_enable_row_group_prefetch", false); + /// update per https://github.com/ClickHouse/ClickHouse/pull/71539 + /// if true, we can't get correct metrics for the query + settings[Setting::query_plan_merge_filters] = false; + /// We now set BuildQueryPipelineSettings according to config. + settings[Setting::compile_expressions] = true; + settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE; + /// + for (const auto & [key, value] : spark_conf_map) { // Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp b/cpp-ch/local-engine/Common/DebugUtils.cpp index 2fcab59bf856..8a4323cb1c13 100644 --- a/cpp-ch/local-engine/Common/DebugUtils.cpp +++ b/cpp-ch/local-engine/Common/DebugUtils.cpp @@ -38,7 +38,7 @@ namespace pb_util = google::protobuf::util; namespace debug { -void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger) +void dumpPlan(DB::QueryPlan & plan, const char * type, bool force, LoggerPtr logger) { if (!logger) { @@ -51,10 +51,12 @@ void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger) return; auto out = local_engine::PlanUtil::explainPlan(plan); + auto task_id = local_engine::QueryContext::instance().currentTaskIdOrEmpty(); + task_id = task_id.empty() ? "" : "(" + task_id + ")"; if (force) // force - LOG_ERROR(logger, "clickhouse plan({}):\n{}", local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out); + LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, out); else - LOG_DEBUG(logger, "clickhouse plan({}):\n{}", local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out); + LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, out); } void dumpMessage(const google::protobuf::Message & message, const char * type, bool force, LoggerPtr logger) @@ -70,13 +72,15 @@ void dumpMessage(const google::protobuf::Message & message, const char * type, b return; pb_util::JsonOptions options; std::string json; - if (auto s = google::protobuf::json::MessageToJsonString(message, &json, options); !s.ok()) + if (auto s = MessageToJsonString(message, &json, options); !s.ok()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type); + auto task_id = local_engine::QueryContext::instance().currentTaskIdOrEmpty(); + task_id = task_id.empty() ? "" : "(" + task_id + ")"; if (force) // force - LOG_ERROR(logger, "{}({}):\n{}", type, local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json); + LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, json); else - LOG_DEBUG(logger, "{}({}):\n{}", type, local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json); + LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, json); } void headBlock(const DB::Block & block, size_t count) diff --git a/cpp-ch/local-engine/Common/DebugUtils.h b/cpp-ch/local-engine/Common/DebugUtils.h index 55a0be5140c5..338326b05e0e 100644 --- a/cpp-ch/local-engine/Common/DebugUtils.h +++ b/cpp-ch/local-engine/Common/DebugUtils.h @@ -29,7 +29,7 @@ class QueryPlan; namespace debug { -void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr); +void dumpPlan(DB::QueryPlan & plan, const char * type = "clickhouse plan", bool force = false, LoggerPtr = nullptr); void dumpMessage(const google::protobuf::Message & message, const char * type, bool force = false, LoggerPtr = nullptr); void headBlock(const DB::Block & block, size_t count = 10); diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 748ff88acbd1..4e461a5c4954 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -101,8 +101,9 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, char c) return res; } -void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel & root_rel) const +void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan) { + const substrait::PlanRel & root_rel = plan.relations().at(0); if (root_rel.root().names_size()) { ActionsDAG actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())}; @@ -110,8 +111,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con const auto cols = query_plan->getCurrentHeader().getNamesAndTypesList(); if (cols.getNames().size() != static_cast(root_rel.root().names_size())) { - debug::dumpPlan(*query_plan, true); - debug::dumpMessage(root_rel, "substrait::PlanRel", true); + debug::dumpPlan(*query_plan, "clickhouse plan", true); + debug::dumpMessage(plan, "substrait::Plan", true); throw Exception( ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan name size {}.", @@ -134,8 +135,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con const auto & original_cols = original_header.getColumnsWithTypeAndName(); if (static_cast(output_schema.types_size()) != original_cols.size()) { - debug::dumpPlan(*query_plan, true); - debug::dumpMessage(root_rel, "substrait::PlanRel", true); + debug::dumpPlan(*query_plan, "clickhouse plan", true); + debug::dumpMessage(plan, "substrait::Plan", true); throw Exception( ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan output schema size {}, subtrait plan name size {}.", @@ -198,7 +199,7 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) std::list rel_stack; auto query_plan = parseOp(first_read_rel, rel_stack); if (!writePipeline) - adjustOutput(query_plan, root_rel); + adjustOutput(query_plan, plan); #ifndef NDEBUG PlanUtil::checkOuputType(*query_plan); @@ -297,12 +298,10 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl settings, 0); const QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings[Setting::query_plan_enable_optimizations]}; - return query_plan.buildQueryPipeline( - optimization_settings, - BuildQueryPipelineSettings{ - .actions_settings - = ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3, .compile_expressions = CompileExpressions::yes}, - .process_list_element = query_status}); + BuildQueryPipelineSettings build_settings = BuildQueryPipelineSettings::fromContext(context); + build_settings.process_list_element = query_status; + build_settings.progress_callback = nullptr; + return query_plan.buildQueryPipeline(optimization_settings,build_settings); } std::unique_ptr SerializedPlanParser::createExecutor(const std::string_view plan) @@ -311,11 +310,10 @@ std::unique_ptr SerializedPlanParser::createExecutor(const std::s return createExecutor(parse(s_plan), s_plan); } -std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) +std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const { Stopwatch stopwatch; - const Settings & settings = parser_context->queryContext()->getSettingsRef(); DB::QueryPipelineBuilderPtr builder = nullptr; try { @@ -323,7 +321,7 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla } catch (...) { - LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}", PlanUtil::explainPlan(*query_plan)); + debug::dumpPlan(*query_plan, "Invalid clickhouse plan", true); throw; } @@ -333,11 +331,6 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla if (root_rel.root().input().has_write()) addSinkTransform(parser_context->queryContext(), root_rel.root().input().write(), builder); LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); - LOG_DEBUG( - getLogger("SerializedPlanParser"), - "clickhouse plan [optimization={}]:\n{}", - settings[Setting::query_plan_enable_optimizations], - PlanUtil::explainPlan(*query_plan)); auto config = ExecutorConfig::loadFromContext(parser_context->queryContext()); return std::make_unique(std::move(query_plan), std::move(builder), config.dump_pipeline); diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index f0ec608a330f..eadc7112c266 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -68,7 +68,7 @@ class NonNullableColumnsResolver class SerializedPlanParser { private: - std::unique_ptr createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan); + std::unique_ptr createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const; public: explicit SerializedPlanParser(std::shared_ptr parser_context_); @@ -118,7 +118,7 @@ class SerializedPlanParser private: DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack); - void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel & root_rel) const; + static void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan); std::vector input_iters; std::vector split_infos;