From dd8a23f24c334b37280f00f9e1235b26c1f03f9b Mon Sep 17 00:00:00 2001 From: dirtysalt Date: Sat, 9 Mar 2024 07:10:28 +0800 Subject: [PATCH] [Refactor] refactor hdfs scanner apppend_or_update column (#42248) Signed-off-by: yanz (cherry picked from commit c7f5207d760cf668464a149f540d14a72025d30f) # Conflicts: # be/src/column/chunk.h # be/src/exec/jni_scanner.cpp # be/src/exec/vectorized/hdfs_scanner.cpp # be/src/exec/vectorized/hdfs_scanner.h # be/src/exec/vectorized/hdfs_scanner_orc.cpp # be/src/formats/parquet/file_reader.cpp --- be/src/column/chunk.cpp | 10 + be/src/column/chunk.h | 8 + be/src/exec/jni_scanner.cpp | 423 ++++++++++++++++++++ be/src/exec/vectorized/hdfs_scanner.cpp | 24 +- be/src/exec/vectorized/hdfs_scanner.h | 17 + be/src/exec/vectorized/hdfs_scanner_orc.cpp | 6 + be/src/formats/parquet/file_reader.cpp | 10 + 7 files changed, 494 insertions(+), 4 deletions(-) create mode 100644 be/src/exec/jni_scanner.cpp diff --git a/be/src/column/chunk.cpp b/be/src/column/chunk.cpp index 86d33b4573e1c..625b3aa62e4f0 100644 --- a/be/src/column/chunk.cpp +++ b/be/src/column/chunk.cpp @@ -132,6 +132,16 @@ void Chunk::update_column_by_index(ColumnPtr column, size_t idx) { check_or_die(); } +void Chunk::append_or_update_column(ColumnPtr column, SlotId slot_id) { + if (_slot_id_to_index.contains(slot_id)) { + _columns[_slot_id_to_index[slot_id]] = std::move(column); + } else { + _slot_id_to_index[slot_id] = _columns.size(); + _columns.emplace_back(std::move(column)); + } + check_or_die(); +} + void Chunk::insert_column(size_t idx, ColumnPtr column, const FieldPtr& field) { DCHECK_LT(idx, _columns.size()); _columns.emplace(_columns.begin() + idx, std::move(column)); diff --git a/be/src/column/chunk.h b/be/src/column/chunk.h index 69daad0980ac9..fc61d8129dcac 100644 --- a/be/src/column/chunk.h +++ b/be/src/column/chunk.h @@ -83,7 +83,15 @@ class Chunk { void update_column(ColumnPtr column, SlotId slot_id); void update_column_by_index(ColumnPtr column, size_t idx); +<<<<<<< HEAD void append_tuple_column(const ColumnPtr& column, TupleId tuple_id); +======= + void append_or_update_column(ColumnPtr column, SlotId slot_id); + + void update_rows(const Chunk& src, const uint32_t* indexes); + + void append_default(); +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)) void remove_column_by_index(size_t idx); diff --git a/be/src/exec/jni_scanner.cpp b/be/src/exec/jni_scanner.cpp new file mode 100644 index 0000000000000..96c0b22097053 --- /dev/null +++ b/be/src/exec/jni_scanner.cpp @@ -0,0 +1,423 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "jni_scanner.h" + +#include "column/array_column.h" +#include "column/map_column.h" +#include "column/struct_column.h" +#include "column/type_traits.h" +#include "fmt/core.h" +#include "udf/java/java_udf.h" +#include "util/defer_op.h" + +namespace starrocks { + +Status JniScanner::_check_jni_exception(JNIEnv* env, const std::string& message) { + if (jthrowable thr = env->ExceptionOccurred(); thr) { + std::string jni_error_message = JVMFunctionHelper::getInstance().dumpExceptionString(thr); + env->ExceptionDescribe(); + env->ExceptionClear(); + env->DeleteLocalRef(thr); + return Status::InternalError(message + " java exception details: " + jni_error_message); + } + return Status::OK(); +} + +Status JniScanner::do_init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params) { + RETURN_IF_ERROR(detect_java_runtime()); + _init_profile(scanner_params); + SCOPED_RAW_TIMER(&_app_stats.reader_init_ns); + JNIEnv* env = JVMFunctionHelper::getInstance().getEnv(); + if (env->EnsureLocalCapacity(_jni_scanner_params.size() * 2 + 6) < 0) { + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to ensure the local capacity.")); + } + RETURN_IF_ERROR(_init_jni_table_scanner(env, runtime_state)); + RETURN_IF_ERROR(_init_jni_method(env)); + return Status::OK(); +} + +Status JniScanner::do_open(RuntimeState* state) { + JNIEnv* env = JVMFunctionHelper::getInstance().getEnv(); + SCOPED_RAW_TIMER(&_app_stats.reader_init_ns); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to open the off-heap table scanner.")); + return Status::OK(); +} + +void JniScanner::do_close(RuntimeState* runtime_state) noexcept { + JNIEnv* env = JVMFunctionHelper::getInstance().getEnv(); + if (_jni_scanner_obj != nullptr) { + if (_jni_scanner_close != nullptr) { + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); + } + env->DeleteLocalRef(_jni_scanner_obj); + _jni_scanner_obj = nullptr; + } + if (_jni_scanner_cls != nullptr) { + env->DeleteLocalRef(_jni_scanner_cls); + _jni_scanner_cls = nullptr; + } +} + +Status JniScanner::_init_jni_method(JNIEnv* env) { + // init jmethod + _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get `open` jni method")); + + _jni_scanner_get_next_chunk = env->GetMethodID(_jni_scanner_cls, "getNextOffHeapChunk", "()J"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get `getNextOffHeapChunk` jni method")); + + _jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get `close` jni method")); + + _jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseOffHeapColumnVector", "(I)V"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get `releaseOffHeapColumnVector` jni method")); + + _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseOffHeapTable", "()V"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get `releaseOffHeapTable` jni method")); + return Status::OK(); +} + +Status JniScanner::_init_jni_table_scanner(JNIEnv* env, RuntimeState* runtime_state) { + jclass scanner_factory_class = env->FindClass(_jni_scanner_factory_class.c_str()); + jmethodID scanner_factory_constructor = env->GetMethodID(scanner_factory_class, "", "()V"); + jobject scanner_factory_obj = env->NewObject(scanner_factory_class, scanner_factory_constructor); + jmethodID get_scanner_method = env->GetMethodID(scanner_factory_class, "getScannerClass", "()Ljava/lang/Class;"); + _jni_scanner_cls = (jclass)env->CallObjectMethod(scanner_factory_obj, get_scanner_method); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to init the scanner class.")); + env->DeleteLocalRef(scanner_factory_class); + env->DeleteLocalRef(scanner_factory_obj); + + jmethodID scanner_constructor = env->GetMethodID(_jni_scanner_cls, "", "(ILjava/util/Map;)V"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get a scanner class constructor.")); + + jclass hashmap_class = env->FindClass("java/util/HashMap"); + jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "", "(I)V"); + jobject hashmap_object = env->NewObject(hashmap_class, hashmap_constructor, _jni_scanner_params.size()); + jmethodID hashmap_put = + env->GetMethodID(hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to get the HashMap methods.")); + + std::string message = "Initialize a scanner with parameters: "; + for (const auto& it : _jni_scanner_params) { + jstring key = env->NewStringUTF(it.first.c_str()); + jstring value = env->NewStringUTF(it.second.c_str()); + // skip encoded object + if (_skipped_log_jni_scanner_params.find(it.first) == _skipped_log_jni_scanner_params.end()) { + message.append(it.first); + message.append("->"); + message.append(it.second); + message.append(", "); + } + + env->CallObjectMethod(hashmap_object, hashmap_put, key, value); + env->DeleteLocalRef(key); + env->DeleteLocalRef(value); + } + env->DeleteLocalRef(hashmap_class); + LOG(INFO) << message; + + int fetch_size = runtime_state->chunk_size(); + _jni_scanner_obj = env->NewObject(_jni_scanner_cls, scanner_constructor, fetch_size, hashmap_object); + env->DeleteLocalRef(hashmap_object); + DCHECK(_jni_scanner_obj != nullptr); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to initialize a scanner instance.")); + + return Status::OK(); +} + +Status JniScanner::_get_next_chunk(JNIEnv* env, long* chunk_meta) { + SCOPED_RAW_TIMER(&_app_stats.column_read_ns); + SCOPED_RAW_TIMER(&_app_stats.io_ns); + _app_stats.io_count += 1; + *chunk_meta = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_chunk); + RETURN_IF_ERROR(_check_jni_exception(env, "Failed to call the nextChunkOffHeap method of off-heap table scanner.")); + return Status::OK(); +} + +template +Status JniScanner::_append_primitive_data(const FillColumnArgs& args) { + char* column_ptr = static_cast(next_chunk_meta_as_ptr()); + using ColumnType = typename starrocks::RunTimeColumnType; + using CppType = typename starrocks::RunTimeCppType; + auto* runtime_column = down_cast(args.column); + runtime_column->resize_uninitialized(args.num_rows); + memcpy(runtime_column->get_data().data(), column_ptr, args.num_rows * sizeof(CppType)); + return Status::OK(); +} + +template +Status JniScanner::_append_string_data(const FillColumnArgs& args) { + int* offset_ptr = static_cast(next_chunk_meta_as_ptr()); + char* column_ptr = static_cast(next_chunk_meta_as_ptr()); + + auto* data_column = args.column; + using ColumnType = typename starrocks::RunTimeColumnType; + auto* runtime_column = down_cast(data_column); + Bytes& bytes = runtime_column->get_bytes(); + Offsets& offsets = runtime_column->get_offset(); + + int total_length = offset_ptr[args.num_rows]; + bytes.resize(total_length); + offsets.resize(args.num_rows + 1); + + memcpy(offsets.data(), offset_ptr, (args.num_rows + 1) * sizeof(uint32_t)); + memcpy(bytes.data(), column_ptr, total_length); + return Status::OK(); +} + +Status JniScanner::_append_array_data(const FillColumnArgs& args) { + DCHECK(args.slot_type.is_array_type()); + + auto* array_column = down_cast(args.column); + int* offset_ptr = static_cast(next_chunk_meta_as_ptr()); + + auto* offsets = array_column->offsets_column().get(); + offsets->resize_uninitialized(args.num_rows + 1); + memcpy(offsets->get_data().data(), offset_ptr, (args.num_rows + 1) * sizeof(uint32_t)); + + int total_length = offset_ptr[args.num_rows]; + Column* elements = array_column->elements_column().get(); + std::string name = args.slot_name + ".$0"; + FillColumnArgs sub_args = {.num_rows = total_length, + .slot_name = name, + .slot_type = args.slot_type.children[0], + .nulls = nullptr, + .column = elements, + .must_nullable = false}; + RETURN_IF_ERROR(_fill_column(&sub_args)); + return Status::OK(); +} + +Status JniScanner::_append_map_data(const FillColumnArgs& args) { + DCHECK(args.slot_type.is_map_type()); + + auto* map_column = down_cast(args.column); + int* offset_ptr = static_cast(next_chunk_meta_as_ptr()); + + auto* offsets = map_column->offsets_column().get(); + offsets->resize_uninitialized(args.num_rows + 1); + memcpy(offsets->get_data().data(), offset_ptr, (args.num_rows + 1) * sizeof(uint32_t)); + + int total_length = offset_ptr[args.num_rows]; + { + Column* keys = map_column->keys_column().get(); + if (!args.slot_type.children[0].is_unknown_type()) { + std::string name = args.slot_name + ".$0"; + FillColumnArgs sub_args = {.num_rows = total_length, + .slot_name = name, + .slot_type = args.slot_type.children[0], + .nulls = nullptr, + .column = keys, + .must_nullable = false}; + RETURN_IF_ERROR(_fill_column(&sub_args)); + } else { + keys->append_default(total_length); + } + } + + { + Column* values = map_column->values_column().get(); + if (!args.slot_type.children[1].is_unknown_type()) { + std::string name = args.slot_name + ".$1"; + FillColumnArgs sub_args = {.num_rows = total_length, + .slot_name = name, + .slot_type = args.slot_type.children[1], + .nulls = nullptr, + .column = values, + .must_nullable = true}; + RETURN_IF_ERROR(_fill_column(&sub_args)); + } else { + values->append_default(total_length); + } + } + return Status::OK(); +} + +Status JniScanner::_append_struct_data(const FillColumnArgs& args) { + DCHECK(args.slot_type.is_struct_type()); + + auto* struct_column = down_cast(args.column); + const TypeDescriptor& type = args.slot_type; + for (int i = 0; i < type.children.size(); i++) { + Column* column = struct_column->fields_column()[i].get(); + std::string name = args.slot_name + "." + type.field_names[i]; + FillColumnArgs sub_args = {.num_rows = args.num_rows, + .slot_name = name, + .slot_type = type.children[i], + .nulls = nullptr, + .column = column, + .must_nullable = true}; + RETURN_IF_ERROR(_fill_column(&sub_args)); + } + return Status::OK(); +} + +Status JniScanner::_fill_column(FillColumnArgs* pargs) { + FillColumnArgs& args = *pargs; + if (args.must_nullable && !args.column->is_nullable()) { + return Status::DataQualityError(fmt::format("NOT NULL column[{}] is not supported.", args.slot_name)); + } + + void* ptr = next_chunk_meta_as_ptr(); + if (ptr == nullptr) { + // struct field mismatch. + args.column->append_default(args.num_rows); + return Status::OK(); + } + + if (args.column->is_nullable()) { + // if column is nullable, we parse `null_column`, + // and update `args.nulls` and set `data_column` to `args.column` + bool* null_column_ptr = static_cast(ptr); + auto* nullable_column = down_cast(args.column); + + NullData& null_data = nullable_column->null_column_data(); + null_data.resize(args.num_rows); + memcpy(null_data.data(), null_column_ptr, args.num_rows); + nullable_column->update_has_null(); + + auto* data_column = nullable_column->data_column().get(); + pargs->column = data_column; + pargs->nulls = null_data.data(); + } else { + // otherwise we skip this chunk meta, because in Java side + // we assume every column starts with `null_column`. + } + + LogicalType column_type = args.slot_type.type; + if (column_type == LogicalType::TYPE_BOOLEAN) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_TINYINT) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_SMALLINT) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_INT) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_FLOAT) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_BIGINT) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_DOUBLE) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_VARCHAR) { + RETURN_IF_ERROR((_append_string_data(args))); + } else if (column_type == LogicalType::TYPE_CHAR) { + RETURN_IF_ERROR((_append_string_data(args))); + } else if (column_type == LogicalType::TYPE_VARBINARY) { + RETURN_IF_ERROR((_append_string_data(args))); + } else if (column_type == LogicalType::TYPE_DATE) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_DATETIME) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_DECIMAL32) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_DECIMAL64) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_DECIMAL128) { + RETURN_IF_ERROR((_append_primitive_data(args))); + } else if (column_type == LogicalType::TYPE_ARRAY) { + RETURN_IF_ERROR((_append_array_data(args))); + } else if (column_type == LogicalType::TYPE_MAP) { + RETURN_IF_ERROR((_append_map_data(args))); + } else if (column_type == LogicalType::TYPE_STRUCT) { + RETURN_IF_ERROR((_append_struct_data(args))); + } else { + return Status::InternalError(fmt::format("Type {} is not supported for off-heap table scanner", column_type)); + } + return Status::OK(); +} + +Status JniScanner::_fill_chunk(JNIEnv* env, ChunkPtr* chunk, const std::vector& slot_desc_list) { + SCOPED_RAW_TIMER(&_app_stats.column_convert_ns); + + long num_rows = next_chunk_meta_as_long(); + if (num_rows == 0) { + return Status::EndOfFile(""); + } + _app_stats.raw_rows_read += num_rows; + + for (size_t col_idx = 0; col_idx < slot_desc_list.size(); col_idx++) { + SlotDescriptor* slot_desc = slot_desc_list[col_idx]; + const std::string& slot_name = slot_desc->col_name(); + const TypeDescriptor& slot_type = slot_desc->type(); + ColumnPtr& column = (*chunk)->get_column_by_slot_id(slot_desc->id()); + FillColumnArgs args{.num_rows = num_rows, + .slot_name = slot_name, + .slot_type = slot_type, + .nulls = nullptr, + .column = column.get(), + .must_nullable = true}; + RETURN_IF_ERROR(_fill_column(&args)); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, col_idx); + RETURN_IF_ERROR(_check_jni_exception( + env, "Failed to call the releaseOffHeapColumnVector method of off-heap table scanner.")); + } + return Status::OK(); +} + +Status JniScanner::_release_off_heap_table(JNIEnv* env) { + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); + RETURN_IF_ERROR( + _check_jni_exception(env, "Failed to call the releaseOffHeapTable method of off-heap table scanner.")); + return Status::OK(); +} + +Status JniScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) { + // fill chunk with all wanted column(include partition columns) + Status status = fill_empty_chunk(chunk, _scanner_params.tuple_desc->slots()); + + // ====== conjunct evaluation ====== + // important to add columns before evaluation + // because ctxs_by_slot maybe refers to some non-existed slot or partition slot. + size_t chunk_size = (*chunk)->num_rows(); + _scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, chunk_size); + + RETURN_IF_ERROR(_scanner_ctx.evaluate_on_conjunct_ctxs_by_slot(chunk, &_chunk_filter)); + return status; +} + +Status JniScanner::fill_empty_chunk(ChunkPtr* chunk, const std::vector& slot_desc_list) { + JNIEnv* env = JVMFunctionHelper::getInstance().getEnv(); + long chunk_meta; + RETURN_IF_ERROR(_get_next_chunk(env, &chunk_meta)); + reset_chunk_meta(chunk_meta); + Status status = _fill_chunk(env, chunk, slot_desc_list); + RETURN_IF_ERROR(_release_off_heap_table(env)); + + return status; +} + +Status HiveJniScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) { + // fill chunk with all wanted column exclude partition columns + Status status = fill_empty_chunk(chunk, _scanner_params.materialize_slots); + size_t chunk_size = (*chunk)->num_rows(); + if (!_scanner_params.materialize_slots.empty()) { + // when the chunk has partition column and non partition column + // fill_empty_chunk will only fill partition column for HiveJniScanner + // In this situation, Chunk.num_rows() is not reliable temporally + auto slot_desc = _scanner_params.materialize_slots[0]; + ColumnPtr& first_non_partition_column = (*chunk)->get_column_by_slot_id(slot_desc->id()); + chunk_size = first_non_partition_column->size(); + } + + _scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, chunk_size); + // right now only hive table need append partition columns explictly, paimon and hudi reader will append partition columns in Java side + _scanner_ctx.append_or_update_partition_column_to_chunk(chunk, chunk_size); + RETURN_IF_ERROR(_scanner_ctx.evaluate_on_conjunct_ctxs_by_slot(chunk, &_chunk_filter)); + return status; +} + +} // namespace starrocks diff --git a/be/src/exec/vectorized/hdfs_scanner.cpp b/be/src/exec/vectorized/hdfs_scanner.cpp index 87bc89155020c..648005f287f5f 100644 --- a/be/src/exec/vectorized/hdfs_scanner.cpp +++ b/be/src/exec/vectorized/hdfs_scanner.cpp @@ -323,6 +323,7 @@ void HdfsScannerContext::set_columns_from_file(const std::unordered_setset_num_rows(row_count); for (auto* slot_desc : not_existed_slots) { +======= +void HdfsScannerContext::append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count) { + if (not_existed_slots.empty() || row_count < 0) return; + ChunkPtr& ck = (*chunk); + for (auto* slot_desc : not_existed_slots) { +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp auto col = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable()); if (row_count > 0) { col->append_default(row_count); } - ck->append_column(std::move(col), slot_desc->id()); + ck->append_or_update_column(std::move(col), slot_desc->id()); } + ck->set_num_rows(row_count); } Status HdfsScannerContext::evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter) { @@ -369,7 +377,7 @@ StatusOr HdfsScannerContext::should_skip_by_evaluating_not_existed_slots() // build chunk for evaluation. ChunkPtr chunk = std::make_shared(); - append_not_existed_columns_to_chunk(&chunk, 1); + append_or_update_not_existed_columns_to_chunk(&chunk, 1); // do evaluation. { SCOPED_RAW_TIMER(&stats->expr_filter_ns); @@ -378,6 +386,7 @@ StatusOr HdfsScannerContext::should_skip_by_evaluating_not_existed_slots() return !(chunk->has_rows()); } +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.cpp void HdfsScannerContext::update_partition_column_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count) { if (partition_columns.empty() || row_count <= 0) return; @@ -401,9 +410,11 @@ void HdfsScannerContext::update_partition_column_of_chunk(vectorized::ChunkPtr* void HdfsScannerContext::append_partition_column_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count) { if (partition_columns.size() == 0) return; +======= +void HdfsScannerContext::append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count) { + if (partition_columns.size() == 0 || row_count < 0) return; +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp ChunkPtr& ck = (*chunk); - ck->set_num_rows(row_count); - for (size_t i = 0; i < partition_columns.size(); i++) { SlotDescriptor* slot_desc = partition_columns[i].slot_desc; DCHECK(partition_values[i]->is_constant()); @@ -419,8 +430,13 @@ void HdfsScannerContext::append_partition_column_to_chunk(vectorized::ChunkPtr* } chunk_part_column->assign(row_count, 0); } +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.cpp ck->append_column(std::move(chunk_part_column), slot_desc->id()); +======= + ck->append_or_update_column(std::move(chunk_part_column), slot_desc->id()); +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp } + ck->set_num_rows(row_count); } bool HdfsScannerContext::can_use_dict_filter_on_slot(SlotDescriptor* slot) const { diff --git a/be/src/exec/vectorized/hdfs_scanner.h b/be/src/exec/vectorized/hdfs_scanner.h index 83bc5173fcf64..63e6087bebc48 100644 --- a/be/src/exec/vectorized/hdfs_scanner.h +++ b/be/src/exec/vectorized/hdfs_scanner.h @@ -194,13 +194,27 @@ struct HdfsScannerContext { // set column names from file. // and to update not_existed slots and conjuncts. // and to update `conjunct_ctxs_by_slot` field. +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h void set_columns_from_file(const std::unordered_set& names); +======= + void update_materialized_columns(const std::unordered_set& names); + +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h // "not existed columns" are materialized columns not found in file // this usually happens when use changes schema. for example // user create table with 3 fields A, B, C, and there is one file F1 // but user change schema and add one field like D. // when user select(A, B, C, D), then D is the non-existed column in file F1. +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h void update_not_existed_columns_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count); +======= + void append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count); + + // If there is no partition column in the chunk,append partition column to chunk, + // otherwise update partition column in chunk + void append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count); + +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h // if we can skip this file by evaluating conjuncts of non-existed columns with default value. StatusOr should_skip_by_evaluating_not_existed_slots(); std::vector not_existed_slots; @@ -209,9 +223,12 @@ struct HdfsScannerContext { // other helper functions. void update_partition_column_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count); bool can_use_dict_filter_on_slot(SlotDescriptor* slot) const; +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h void append_not_existed_columns_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count); void append_partition_column_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count); +======= +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h Status evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter); }; diff --git a/be/src/exec/vectorized/hdfs_scanner_orc.cpp b/be/src/exec/vectorized/hdfs_scanner_orc.cpp index 02c4f0bdf674f..d27d7d7f44d0a 100644 --- a/be/src/exec/vectorized/hdfs_scanner_orc.cpp +++ b/be/src/exec/vectorized/hdfs_scanner_orc.cpp @@ -413,11 +413,17 @@ Status HdfsOrcScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) *chunk = std::move(ret.value()); } +<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner_orc.cpp // important to add columns before evaluation // because ctxs_by_slot maybe refers to some non-existed slot or partition slot. _scanner_ctx.append_not_existed_columns_to_chunk(chunk, ck->num_rows()); _scanner_ctx.append_partition_column_to_chunk(chunk, ck->num_rows()); chunk_size = ck->num_rows(); +======= + // we need to append none existed column before do eval, just for count(*) optimization + _scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, rows_read); + +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner_orc.cpp // do stats before we filter rows which does not match. _stats.raw_rows_read += chunk_size; _chunk_filter.assign(chunk_size, 1); diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index 0c89d34ffe643..fc27124cd621c 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -484,8 +484,13 @@ Status FileReader::get_next(vectorized::ChunkPtr* chunk) { Status status = _row_group_readers[_cur_row_group_idx]->get_next(chunk, &row_count); if (status.ok() || status.is_end_of_file()) { if (row_count > 0) { +<<<<<<< HEAD _scanner_ctx->update_not_existed_columns_of_chunk(chunk, row_count); _scanner_ctx->update_partition_column_of_chunk(chunk, row_count); +======= + _scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, row_count); + _scanner_ctx->append_or_update_partition_column_to_chunk(chunk, row_count); +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)) _scan_row_count += (*chunk)->num_rows(); } if (status.is_end_of_file()) { @@ -509,8 +514,13 @@ Status FileReader::get_next(vectorized::ChunkPtr* chunk) { Status FileReader::_exec_only_partition_scan(vectorized::ChunkPtr* chunk) { if (_scan_row_count < _total_row_count) { size_t read_size = std::min(static_cast(_chunk_size), _total_row_count - _scan_row_count); +<<<<<<< HEAD _scanner_ctx->update_not_existed_columns_of_chunk(chunk, read_size); _scanner_ctx->update_partition_column_of_chunk(chunk, read_size); +======= + _scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, read_size); + _scanner_ctx->append_or_update_partition_column_to_chunk(chunk, read_size); +>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)) _scan_row_count += read_size; return Status::OK(); }