Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scan-opt-3] Custom implementation for memcpy #159

Open
wants to merge 27 commits into
base: arrow-4.0.0-oap
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
baee5ad
Add parquet scan benchmark
zhixingheyi-tian Jul 19, 2022
7fdf914
Add Usage
zhixingheyi-tian Jul 22, 2022
3b8ffa3
perf report
zhixingheyi-tian Jul 22, 2022
a7c3879
Add Optimize append
zhixingheyi-tian Aug 11, 2022
642bfa4
Complete plaindecoder code and passed test
zhixingheyi-tian Aug 16, 2022
25ad799
Add code for DictDecoder
zhixingheyi-tian Aug 18, 2022
f3dce6b
Resume CMakeLists.txt
zhixingheyi-tian Aug 19, 2022
e6da89e
Merge remote-tracking branch 'upstream/arrow-4.0.0-oap' into parquet_…
zhixingheyi-tian Aug 19, 2022
5c4d253
Fix offset validate
zhixingheyi-tian Aug 22, 2022
64123b0
reduce buffer capacity
zhixingheyi-tian Aug 23, 2022
3bf58f8
Add Patch version
zhixingheyi-tian Aug 29, 2022
86cc395
Add Fix for write validate
zhixingheyi-tian Aug 30, 2022
145a562
Set false for resize
zhixingheyi-tian Aug 30, 2022
2b8aabf
Fix customer case issue
zhixingheyi-tian Aug 30, 2022
510acb0
Add patch version
zhixingheyi-tian Aug 30, 2022
972cc4e
Remove cout
zhixingheyi-tian Aug 30, 2022
1433cde
Add Function opt
zhixingheyi-tian Sep 2, 2022
f37f44a
Add to DecodeArrowDense_opt
zhixingheyi-tian Sep 2, 2022
2e2d39d
Merge remote-tracking branch 'upstream/arrow-4.0.0-oap' into Decode_f…
zhixingheyi-tian Sep 6, 2022
b9c2644
clean comment
zhixingheyi-tian Sep 6, 2022
710edb4
Clean unnecessary paras
zhixingheyi-tian Sep 6, 2022
a5e4080
Delete Patch version
zhixingheyi-tian Sep 6, 2022
b9e52c3
Modify Gbenchmark version to avoid conflict
zhixingheyi-tian Sep 7, 2022
541dd29
custom memcpy
zhixingheyi-tian Sep 7, 2022
3a08a63
add -march=native
zhixingheyi-tian Sep 7, 2022
2fbcdd3
Refine avx512 relation code
zhixingheyi-tian Sep 14, 2022
46a9666
comment cout
zhixingheyi-tian Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ endif()

set(ARROW_VERSION "4.0.0")
#add_compile_options(-g -O0)
add_compile_options(-g -march=native)

string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}")

Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/arrow/parquet_scan_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,16 @@ class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark
properties, &parquet_reader));

std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
// ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(
// row_group_indices, local_column_indices, &record_batch_reader));
// need varify complex type, so remove local_column_indices
ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(
row_group_indices, local_column_indices, &record_batch_reader));
row_group_indices, &record_batch_reader));
do {
TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch));

if (record_batch) {
// std::cout << " record_batch->ToString(): " << record_batch->ToString() << std::endl;
// batches.push_back(record_batch);
num_batches += 1;
num_rows += record_batch->num_rows();
Expand Down
4 changes: 0 additions & 4 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ class FileReaderImpl : public FileReader {
: pool_(pool),
reader_(std::move(reader)),
reader_properties_(std::move(properties)) {}

~FileReaderImpl() {
std::cout << "Patch version-0830" << std::endl;
}

Status Init() {
return SchemaManifest::Make(reader_->metadata()->schema(),
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,10 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_
case ::arrow::Type::DATE64:
RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::FIXED_SIZE_BINARY: {
RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_BINARY:
Expand Down
28 changes: 9 additions & 19 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1570,39 +1570,27 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
}

void ReadValuesDense(int64_t values_to_read) override {
// int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
// static_cast<int>(values_to_read), &accumulator_);
int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
static_cast<int>(values_to_read), 0,
NULLPTR, (reinterpret_cast<int32_t *>(offset_->mutable_data()) + values_written_),
values_, 0, &accumulator_, &bianry_length_);
values_, 0, &bianry_length_);
DCHECK_EQ(num_decoded, values_to_read);
// ResetValues();
}

// void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
// int64_t num_decoded = this->current_decoder_->DecodeArrow(
// static_cast<int>(values_to_read), static_cast<int>(null_count),
// valid_bits_->mutable_data(), values_written_, &accumulator_);
// DCHECK_EQ(num_decoded, values_to_read - null_count);
// ResetValues();
// }

void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), (reinterpret_cast<int32_t *>(offset_->mutable_data()) + values_written_),
values_, values_written_, &accumulator_, &bianry_length_);
values_, values_written_, &bianry_length_);
DCHECK_EQ(num_decoded, values_to_read - null_count);
// ResetValues();
}

void ReserveValues(int64_t extra_values) {
const int64_t new_values_capacity =
UpdateCapacity(values_capacity_, values_written_, extra_values);
if (new_values_capacity > values_capacity_) {
PARQUET_THROW_NOT_OK(
values_->Resize(new_values_capacity * 20, false));
values_->Resize(new_values_capacity * binary_per_row_length_, false));
PARQUET_THROW_NOT_OK(
offset_->Resize((new_values_capacity+1) * 4, false));

Expand All @@ -1626,7 +1614,6 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,

std::shared_ptr<ResizableBuffer> ReleaseValues() override {
auto result = values_;
// PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true));
values_ = AllocateBuffer(this->pool_);
values_capacity_ = 0;
return result;
Expand All @@ -1639,8 +1626,13 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
const auto first_offset = offsetArr[0];
const auto last_offset = offsetArr[values_written_];
int64_t binary_length = last_offset - first_offset;
// std::cout << "binary_length:" << binary_length << std::endl;
values_->SetSize(binary_length);

if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
binary_per_row_length_ = binary_length / values_written_ + 1;
// std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl;
hasCal_average_len_ = true;
}

offset_ = AllocateBuffer(this->pool_);
bianry_length_ = 0;
Expand All @@ -1667,9 +1659,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,

int32_t bianry_length_ = 0;

// std::shared_ptr<::arrow::ResizableBuffer> values_;
std::shared_ptr<::arrow::ResizableBuffer> offset_;
// std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
};

class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;

static constexpr int32_t kDefaultBinaryPerRowSzie = 20;

class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
Expand Down Expand Up @@ -301,6 +303,9 @@ class RecordReader {
int64_t levels_position_;
int64_t levels_capacity_;

bool hasCal_average_len_ = false;
int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie;

std::shared_ptr<::arrow::ResizableBuffer> values_;
// In the case of false, don't allocate the values buffer (when we directly read into
// builder classes).
Expand Down
Loading