Skip to content

Commit

Permalink
[Refactor] refactor hdfs scanner apppend_or_update column (#42248)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
(cherry picked from commit c7f5207)

# Conflicts:
#	be/src/column/chunk.h
#	be/src/exec/jni_scanner.cpp
#	be/src/exec/vectorized/hdfs_scanner.cpp
#	be/src/exec/vectorized/hdfs_scanner.h
#	be/src/exec/vectorized/hdfs_scanner_orc.cpp
#	be/src/formats/parquet/file_reader.cpp
  • Loading branch information
dirtysalt authored and mergify[bot] committed Mar 8, 2024
1 parent ff8abdf commit dd8a23f
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 4 deletions.
10 changes: 10 additions & 0 deletions be/src/column/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ void Chunk::update_column_by_index(ColumnPtr column, size_t idx) {
check_or_die();
}

void Chunk::append_or_update_column(ColumnPtr column, SlotId slot_id) {
if (_slot_id_to_index.contains(slot_id)) {
_columns[_slot_id_to_index[slot_id]] = std::move(column);
} else {
_slot_id_to_index[slot_id] = _columns.size();
_columns.emplace_back(std::move(column));
}
check_or_die();
}

void Chunk::insert_column(size_t idx, ColumnPtr column, const FieldPtr& field) {
DCHECK_LT(idx, _columns.size());
_columns.emplace(_columns.begin() + idx, std::move(column));
Expand Down
8 changes: 8 additions & 0 deletions be/src/column/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ class Chunk {
void update_column(ColumnPtr column, SlotId slot_id);
void update_column_by_index(ColumnPtr column, size_t idx);

<<<<<<< HEAD
void append_tuple_column(const ColumnPtr& column, TupleId tuple_id);
=======
void append_or_update_column(ColumnPtr column, SlotId slot_id);

void update_rows(const Chunk& src, const uint32_t* indexes);

void append_default();
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248))

void remove_column_by_index(size_t idx);

Expand Down
423 changes: 423 additions & 0 deletions be/src/exec/jni_scanner.cpp

Large diffs are not rendered by default.

24 changes: 20 additions & 4 deletions be/src/exec/vectorized/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ void HdfsScannerContext::set_columns_from_file(const std::unordered_set<std::str
}
}

<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.cpp
void HdfsScannerContext::update_not_existed_columns_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count) {
if (not_existed_slots.empty() || row_count <= 0) return;

Expand All @@ -338,12 +339,19 @@ void HdfsScannerContext::append_not_existed_columns_to_chunk(vectorized::ChunkPt
ChunkPtr& ck = (*chunk);
ck->set_num_rows(row_count);
for (auto* slot_desc : not_existed_slots) {
=======
void HdfsScannerContext::append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count) {
if (not_existed_slots.empty() || row_count < 0) return;
ChunkPtr& ck = (*chunk);
for (auto* slot_desc : not_existed_slots) {
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp
auto col = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
if (row_count > 0) {
col->append_default(row_count);
}
ck->append_column(std::move(col), slot_desc->id());
ck->append_or_update_column(std::move(col), slot_desc->id());
}
ck->set_num_rows(row_count);
}

Status HdfsScannerContext::evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter) {
Expand All @@ -369,7 +377,7 @@ StatusOr<bool> HdfsScannerContext::should_skip_by_evaluating_not_existed_slots()

// build chunk for evaluation.
ChunkPtr chunk = std::make_shared<Chunk>();
append_not_existed_columns_to_chunk(&chunk, 1);
append_or_update_not_existed_columns_to_chunk(&chunk, 1);
// do evaluation.
{
SCOPED_RAW_TIMER(&stats->expr_filter_ns);
Expand All @@ -378,6 +386,7 @@ StatusOr<bool> HdfsScannerContext::should_skip_by_evaluating_not_existed_slots()
return !(chunk->has_rows());
}

<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.cpp
void HdfsScannerContext::update_partition_column_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count) {
if (partition_columns.empty() || row_count <= 0) return;

Expand All @@ -401,9 +410,11 @@ void HdfsScannerContext::update_partition_column_of_chunk(vectorized::ChunkPtr*
void HdfsScannerContext::append_partition_column_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count) {
if (partition_columns.size() == 0) return;

=======
void HdfsScannerContext::append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count) {
if (partition_columns.size() == 0 || row_count < 0) return;
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp
ChunkPtr& ck = (*chunk);
ck->set_num_rows(row_count);

for (size_t i = 0; i < partition_columns.size(); i++) {
SlotDescriptor* slot_desc = partition_columns[i].slot_desc;
DCHECK(partition_values[i]->is_constant());
Expand All @@ -419,8 +430,13 @@ void HdfsScannerContext::append_partition_column_to_chunk(vectorized::ChunkPtr*
}
chunk_part_column->assign(row_count, 0);
}
<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.cpp
ck->append_column(std::move(chunk_part_column), slot_desc->id());
=======
ck->append_or_update_column(std::move(chunk_part_column), slot_desc->id());
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.cpp
}
ck->set_num_rows(row_count);
}

bool HdfsScannerContext::can_use_dict_filter_on_slot(SlotDescriptor* slot) const {
Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/vectorized/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,27 @@ struct HdfsScannerContext {
// set column names from file.
// and to update not_existed slots and conjuncts.
// and to update `conjunct_ctxs_by_slot` field.
<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h
void set_columns_from_file(const std::unordered_set<std::string>& names);
=======
void update_materialized_columns(const std::unordered_set<std::string>& names);

>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h
// "not existed columns" are materialized columns not found in file
// this usually happens when use changes schema. for example
// user create table with 3 fields A, B, C, and there is one file F1
// but user change schema and add one field like D.
// when user select(A, B, C, D), then D is the non-existed column in file F1.
<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h
void update_not_existed_columns_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count);
=======
void append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count);

// If there is no partition column in the chunk,append partition column to chunk,
// otherwise update partition column in chunk
void append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count);

>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h
// if we can skip this file by evaluating conjuncts of non-existed columns with default value.
StatusOr<bool> should_skip_by_evaluating_not_existed_slots();
std::vector<SlotDescriptor*> not_existed_slots;
Expand All @@ -209,9 +223,12 @@ struct HdfsScannerContext {
// other helper functions.
void update_partition_column_of_chunk(vectorized::ChunkPtr* chunk, size_t row_count);
bool can_use_dict_filter_on_slot(SlotDescriptor* slot) const;
<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner.h

void append_not_existed_columns_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count);
void append_partition_column_to_chunk(vectorized::ChunkPtr* chunk, size_t row_count);
=======
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner.h
Status evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter);
};

Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/vectorized/hdfs_scanner_orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,17 @@ Status HdfsOrcScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk)
*chunk = std::move(ret.value());
}

<<<<<<< HEAD:be/src/exec/vectorized/hdfs_scanner_orc.cpp
// important to add columns before evaluation
// because ctxs_by_slot maybe refers to some non-existed slot or partition slot.
_scanner_ctx.append_not_existed_columns_to_chunk(chunk, ck->num_rows());
_scanner_ctx.append_partition_column_to_chunk(chunk, ck->num_rows());
chunk_size = ck->num_rows();
=======
// we need to append none existed column before do eval, just for count(*) optimization
_scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, rows_read);

>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248)):be/src/exec/hdfs_scanner_orc.cpp
// do stats before we filter rows which does not match.
_stats.raw_rows_read += chunk_size;
_chunk_filter.assign(chunk_size, 1);
Expand Down
10 changes: 10 additions & 0 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,13 @@ Status FileReader::get_next(vectorized::ChunkPtr* chunk) {
Status status = _row_group_readers[_cur_row_group_idx]->get_next(chunk, &row_count);
if (status.ok() || status.is_end_of_file()) {
if (row_count > 0) {
<<<<<<< HEAD
_scanner_ctx->update_not_existed_columns_of_chunk(chunk, row_count);
_scanner_ctx->update_partition_column_of_chunk(chunk, row_count);
=======
_scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, row_count);
_scanner_ctx->append_or_update_partition_column_to_chunk(chunk, row_count);
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248))
_scan_row_count += (*chunk)->num_rows();
}
if (status.is_end_of_file()) {
Expand All @@ -509,8 +514,13 @@ Status FileReader::get_next(vectorized::ChunkPtr* chunk) {
Status FileReader::_exec_only_partition_scan(vectorized::ChunkPtr* chunk) {
if (_scan_row_count < _total_row_count) {
size_t read_size = std::min(static_cast<size_t>(_chunk_size), _total_row_count - _scan_row_count);
<<<<<<< HEAD
_scanner_ctx->update_not_existed_columns_of_chunk(chunk, read_size);
_scanner_ctx->update_partition_column_of_chunk(chunk, read_size);
=======
_scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, read_size);
_scanner_ctx->append_or_update_partition_column_to_chunk(chunk, read_size);
>>>>>>> c7f5207d76 ([Refactor] refactor hdfs scanner apppend_or_update column (#42248))
_scan_row_count += read_size;
return Status::OK();
}
Expand Down

0 comments on commit dd8a23f

Please sign in to comment.