Skip to content

Commit

Permalink
[Refactor] refactor hdfs scanner apppend_or_update column
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt committed Mar 8, 2024
1 parent 8336bc3 commit 9f29c42
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 36 deletions.
10 changes: 10 additions & 0 deletions be/src/column/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ void Chunk::update_column_by_index(ColumnPtr column, size_t idx) {
check_or_die();
}

void Chunk::append_or_update_column(ColumnPtr column, SlotId slot_id) {
if (_slot_id_to_index.contains(slot_id)) {
_columns[_slot_id_to_index[slot_id]] = std::move(column);
} else {
_slot_id_to_index[slot_id] = _columns.size();
_columns.emplace_back(std::move(column));
}
check_or_die();
}

void Chunk::insert_column(size_t idx, ColumnPtr column, const FieldPtr& field) {
DCHECK_LT(idx, _columns.size());
_columns.emplace(_columns.begin() + idx, std::move(column));
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class Chunk {
void update_column(ColumnPtr column, SlotId slot_id);
void update_column_by_index(ColumnPtr column, size_t idx);

void append_or_update_column(ColumnPtr column, SlotId slot_id);

void update_rows(const Chunk& src, const uint32_t* indexes);

void append_default();
Expand Down
33 changes: 8 additions & 25 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,17 @@ void HdfsScannerContext::update_materialized_columns(const std::unordered_set<st
materialized_columns.swap(updated_columns);
}

void HdfsScannerContext::update_not_existed_columns_of_chunk(ChunkPtr* chunk, size_t row_count) {
if (not_existed_slots.empty() || row_count <= 0) return;

ChunkPtr& ck = (*chunk);
for (auto* slot_desc : not_existed_slots) {
ck->get_column_by_slot_id(slot_desc->id())->append_default(row_count);
}
}

void HdfsScannerContext::append_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count) {
if (not_existed_slots.size() == 0) return;

void HdfsScannerContext::append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count) {
if (not_existed_slots.empty() || row_count < 0) return;
ChunkPtr& ck = (*chunk);
ck->set_num_rows(row_count);
for (auto* slot_desc : not_existed_slots) {
auto col = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
if (row_count > 0) {
col->append_default(row_count);
}
ck->append_column(std::move(col), slot_desc->id());
ck->append_or_update_column(std::move(col), slot_desc->id());
}
ck->set_num_rows(row_count);
}

Status HdfsScannerContext::evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter) {
Expand All @@ -423,7 +413,7 @@ StatusOr<bool> HdfsScannerContext::should_skip_by_evaluating_not_existed_slots()

// build chunk for evaluation.
ChunkPtr chunk = std::make_shared<Chunk>();
append_not_existed_columns_to_chunk(&chunk, 1);
append_or_update_not_existed_columns_to_chunk(&chunk, 1);
// do evaluation.
{
SCOPED_RAW_TIMER(&stats->expr_filter_ns);
Expand All @@ -433,11 +423,8 @@ StatusOr<bool> HdfsScannerContext::should_skip_by_evaluating_not_existed_slots()
}

void HdfsScannerContext::append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count) {
if (partition_columns.size() == 0 || row_count <= 0) return;

if (partition_columns.size() == 0 || row_count < 0) return;
ChunkPtr& ck = (*chunk);
ck->set_num_rows(row_count);

for (size_t i = 0; i < partition_columns.size(); i++) {
SlotDescriptor* slot_desc = partition_columns[i].slot_desc;
DCHECK(partition_values[i]->is_constant());
Expand All @@ -453,13 +440,9 @@ void HdfsScannerContext::append_or_update_partition_column_to_chunk(ChunkPtr* ch
}
chunk_part_column->assign(row_count, 0);
}

if (ck->is_slot_exist(slot_desc->id())) {
ck->update_column(std::move(chunk_part_column), slot_desc->id());
} else {
ck->append_column(std::move(chunk_part_column), slot_desc->id());
}
ck->append_or_update_column(std::move(chunk_part_column), slot_desc->id());
}
ck->set_num_rows(row_count);
}

bool HdfsScannerContext::can_use_dict_filter_on_slot(SlotDescriptor* slot) const {
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,25 @@ struct HdfsScannerContext {
// and to update not_existed slots and conjuncts.
// and to update `conjunct_ctxs_by_slot` field.
void update_materialized_columns(const std::unordered_set<std::string>& names);

// "not existed columns" are materialized columns not found in file
// this usually happens when use changes schema. for example
// user create table with 3 fields A, B, C, and there is one file F1
// but user change schema and add one field like D.
// when user select(A, B, C, D), then D is the non-existed column in file F1.
void update_not_existed_columns_of_chunk(ChunkPtr* chunk, size_t row_count);
void append_or_update_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count);

// If there is no partition column in the chunk,append partition column to chunk,
// otherwise update partition column in chunk
void append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count);

// if we can skip this file by evaluating conjuncts of non-existed columns with default value.
StatusOr<bool> should_skip_by_evaluating_not_existed_slots();
std::vector<SlotDescriptor*> not_existed_slots;
std::vector<ExprContext*> conjunct_ctxs_of_non_existed_slots;

// other helper functions.
bool can_use_dict_filter_on_slot(SlotDescriptor* slot) const;

void append_not_existed_columns_to_chunk(ChunkPtr* chunk, size_t row_count);

// If there is no partition column in the chunk,append partition column to chunk,otherwise update partition column in chunk
void append_or_update_partition_column_to_chunk(ChunkPtr* chunk, size_t row_count);
Status evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter);
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/hdfs_scanner_orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ StatusOr<size_t> HdfsOrcScanner::_do_get_next(ChunkPtr* chunk) {
}

// we need to append none existed column before do eval, just for count(*) optimization
_scanner_ctx.append_not_existed_columns_to_chunk(chunk, rows_read);
_scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, rows_read);

// do stats before we filter rows which does not match.
_app_stats.raw_rows_read += rows_read;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/jni_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Status JniScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk) {
// important to add columns before evaluation
// because ctxs_by_slot maybe refers to some non-existed slot or partition slot.
size_t chunk_size = (*chunk)->num_rows();
_scanner_ctx.append_not_existed_columns_to_chunk(chunk, chunk_size);
_scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, chunk_size);

RETURN_IF_ERROR(_scanner_ctx.evaluate_on_conjunct_ctxs_by_slot(chunk, &_chunk_filter));
return status;
Expand Down Expand Up @@ -413,7 +413,7 @@ Status HiveJniScanner::do_get_next(RuntimeState* runtime_state, ChunkPtr* chunk)
chunk_size = first_non_partition_column->size();
}

_scanner_ctx.append_not_existed_columns_to_chunk(chunk, chunk_size);
_scanner_ctx.append_or_update_not_existed_columns_to_chunk(chunk, chunk_size);
// right now only hive table need append partition columns explictly, paimon and hudi reader will append partition columns in Java side
_scanner_ctx.append_or_update_partition_column_to_chunk(chunk, chunk_size);
RETURN_IF_ERROR(_scanner_ctx.evaluate_on_conjunct_ctxs_by_slot(chunk, &_chunk_filter));
Expand Down
4 changes: 2 additions & 2 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ Status FileReader::get_next(ChunkPtr* chunk) {
Status status = _row_group_readers[_cur_row_group_idx]->get_next(chunk, &row_count);
if (status.ok() || status.is_end_of_file()) {
if (row_count > 0) {
_scanner_ctx->update_not_existed_columns_of_chunk(chunk, row_count);
_scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, row_count);
_scanner_ctx->append_or_update_partition_column_to_chunk(chunk, row_count);
_scan_row_count += (*chunk)->num_rows();
}
Expand Down Expand Up @@ -642,7 +642,7 @@ Status FileReader::get_next(ChunkPtr* chunk) {
Status FileReader::_exec_no_materialized_column_scan(ChunkPtr* chunk) {
if (_scan_row_count < _total_row_count) {
size_t read_size = std::min(static_cast<size_t>(_chunk_size), _total_row_count - _scan_row_count);
_scanner_ctx->update_not_existed_columns_of_chunk(chunk, read_size);
_scanner_ctx->append_or_update_not_existed_columns_to_chunk(chunk, read_size);
_scanner_ctx->append_or_update_partition_column_to_chunk(chunk, read_size);
_scan_row_count += read_size;
return Status::OK();
Expand Down

0 comments on commit 9f29c42

Please sign in to comment.