diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1d58528cf7059..c0ec785b45046 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -49,6 +49,7 @@ endif() set(ARROW_VERSION "4.0.0") #add_compile_options(-g -O0) +add_compile_options(-g -march=native) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") diff --git a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc index 2ab95e1c380d0..d1fab5b2e13a1 100644 --- a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc +++ b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc @@ -130,12 +130,16 @@ class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark properties, &parquet_reader)); std::vector> batches; + // ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + // row_group_indices, local_column_indices, &record_batch_reader)); + // need varify complex type, so remove local_column_indices ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( - row_group_indices, local_column_indices, &record_batch_reader)); + row_group_indices, &record_batch_reader)); do { TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); if (record_batch) { + // std::cout << " record_batch->ToString(): " << record_batch->ToString() << std::endl; // batches.push_back(record_batch); num_batches += 1; num_rows += record_batch->num_rows(); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 55208e503e723..d886c956bc877 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -142,10 +142,6 @@ class FileReaderImpl : public FileReader { : pool_(pool), reader_(std::move(reader)), reader_properties_(std::move(properties)) {} - - ~FileReaderImpl() { - std::cout << "Patch version-0830" << std::endl; - } Status Init() { return SchemaManifest::Make(reader_->metadata()->schema(), diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 22d19a4c899b0..f1c31357b578c 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -704,7 +704,10 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::DATE64: RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result)); break; - case ::arrow::Type::FIXED_SIZE_BINARY: + case ::arrow::Type::FIXED_SIZE_BINARY: { + RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::BINARY: case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index f4979d2d97f57..401eeca5bb5db 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1570,31 +1570,19 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, } void ReadValuesDense(int64_t values_to_read) override { - // int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - // static_cast(values_to_read), &accumulator_); int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), 0, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, 0, &accumulator_, &bianry_length_); + values_, 0, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); - // ResetValues(); } - // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - // int64_t num_decoded = this->current_decoder_->DecodeArrow( - // static_cast(values_to_read), static_cast(null_count), - // valid_bits_->mutable_data(), values_written_, &accumulator_); - // DCHECK_EQ(num_decoded, values_to_read - null_count); - // ResetValues(); - // } - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, values_written_, &accumulator_, &bianry_length_); + values_, values_written_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); - // ResetValues(); } void ReserveValues(int64_t extra_values) { @@ -1602,7 +1590,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * 20, false)); + values_->Resize(new_values_capacity * binary_per_row_length_, false)); PARQUET_THROW_NOT_OK( offset_->Resize((new_values_capacity+1) * 4, false)); @@ -1626,7 +1614,6 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, std::shared_ptr ReleaseValues() override { auto result = values_; - // PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; @@ -1639,8 +1626,13 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, const auto first_offset = offsetArr[0]; const auto last_offset = offsetArr[values_written_]; int64_t binary_length = last_offset - first_offset; - // std::cout << "binary_length:" << binary_length << std::endl; values_->SetSize(binary_length); + + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + binary_per_row_length_ = binary_length / values_written_ + 1; + // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + hasCal_average_len_ = true; + } offset_ = AllocateBuffer(this->pool_); bianry_length_ = 0; @@ -1667,9 +1659,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, int32_t bianry_length_ = 0; - // std::shared_ptr<::arrow::ResizableBuffer> values_; std::shared_ptr<::arrow::ResizableBuffer> offset_; - // std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 535c885f53852..e5d635753999d 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -54,6 +54,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; +static constexpr int32_t kDefaultBinaryPerRowSzie = 20; + class PARQUET_EXPORT LevelDecoder { public: LevelDecoder(); @@ -301,6 +303,9 @@ class RecordReader { int64_t levels_position_; int64_t levels_capacity_; + bool hasCal_average_len_ = false; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie; + std::shared_ptr<::arrow::ResizableBuffer> values_; // In the case of false, don't allocate the values buffer (when we directly read into // builder classes). diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 824fb655620bf..76080f87ce440 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/array/builder_dict.h" @@ -61,6 +62,31 @@ using ArrowPoolVector = std::vector>; namespace parquet { namespace { +inline __attribute__((always_inline)) void memcpy_avx512(void* dest, const void* src, size_t length) { + uint32_t k; + for (k = 0; k + 32 < length; k += 32) { + __m256i v = _mm256_loadu_si256((const __m256i*)(src + k)); + _mm256_storeu_si256((__m256i*)(dest + k), v); + } + auto mask = (1L << (length - k)) - 1; + __m256i v = _mm256_maskz_loadu_epi8(mask, src + k); + _mm256_mask_storeu_epi8(dest + k, mask, v); +} + +// inline __attribute__((always_inline)) void memcpy_opt(void* dest, const void* src, size_t length) { +// int nchunks = length / sizeof(uint64_t); +// int slice = length % sizeof(uint64_t); + +// uint64_t * s = (uint64_t *)src; +// uint64_t * d = (uint64_t *)dest; + +// while(nchunks--) +// *d++ = *s++; + +// while (slice--) +// *((uint8_t *)d++) =*((uint8_t *)s++); +// } + constexpr int64_t kInMemoryDefaultCapacity = 1024; // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. @@ -1356,15 +1382,11 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, values, - valid_bits_offset, out, &result, bianry_length)); - - // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, - // valid_bits_offset, out, &result)); + valid_bits_offset, &result, bianry_length)); return result; } @@ -1428,18 +1450,9 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int* out_values_decoded, int32_t* bianry_length) { - // ArrowBinaryHelper helper(out); int values_decoded = 0; - - - - // RETURN_NOT_OK(helper.builder->Reserve(num_values)); - // RETURN_NOT_OK(helper.builder->ReserveData( - // std::min(len_, helper.chunk_space_remaining))); - auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { @@ -1447,8 +1460,6 @@ class PlainByteArrayDecoder : public PlainDecoder, dst_value = values->mutable_data() + (*bianry_length); } - - int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, @@ -1464,37 +1475,24 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } - // if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { - // // This element would exceed the capacity of a chunk - // RETURN_NOT_OK(helper.PushChunk()); - // RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); - // RETURN_NOT_OK(helper.builder->ReserveData( - // std::min(len_, helper.chunk_space_remaining))); - // } - // helper.UnsafeAppend(data_ + 4, value_len); (*bianry_length) += value_len; offset[i+1] = offset[i] + value_len; +#ifdef __AVX512BW__ + memcpy_avx512(dst_value, data_ + 4, value_len); +#else memcpy(dst_value, data_ + 4, value_len); +#endif + dst_value = dst_value + value_len; - // std::cout << "*(data_ + 4) :" << *(data_ + 4) << std::endl; - // std::cout << "*(data_ + 5) " << *(data_ + 5) << std::endl; - data_ += increment; len_ -= increment; - - // uint8_t* address = values->mutable_data(); - // for(int i=0; i< 10; i++) { - // std::cout << "*(address+" << i << ")" << *(address+i) << std::endl; - // } - ++values_decoded; ++i; return Status::OK(); }, [&]() { - // helper.UnsafeAppendNull(); offset[i+1] = offset[i]; ++i; return Status::OK(); @@ -1962,23 +1960,18 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, - offset, values, - out, &result, bianry_length)); + offset, values, &result, bianry_length)); } else { PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, values, - valid_bits_offset, out, &result, bianry_length)); + valid_bits_offset, &result, bianry_length)); } - // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, - // valid_bits_offset, out, &result)); - return result; } @@ -2051,23 +2044,18 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int* out_num_values, int32_t* bianry_length) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - - // ArrowBinaryHelper helper(out); - auto dst_value = values->mutable_data() + (*bianry_length); - - ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; int num_appended = 0; + uint64_t capacity = values->capacity(); while (num_appended < num_values) { bool is_valid = bit_reader.IsSet(); bit_reader.Next(); @@ -2086,30 +2074,28 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // Consume all indices if (is_valid) { auto idx = indices[i]; - RETURN_NOT_OK(IndexInBounds(idx)); - const auto& val = dict_values[idx]; - // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - // RETURN_NOT_OK(helper.PushChunk()); - // } - // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); - + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; - uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; +#ifdef __AVX512BW__ + memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); +#else memcpy(dst_value, val.ptr, static_cast(value_len)); +#endif dst_value = dst_value + value_len; ++i; ++values_decoded; } else { - // RETURN_NOT_OK(helper.AppendNull()); offset[num_appended+1] = offset[num_appended]; --null_count; } @@ -2123,7 +2109,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, bit_reader.Next(); } } else { - // RETURN_NOT_OK(helper.AppendNull()); offset[num_appended+1] = offset[num_appended]; --null_count; ++num_appended; @@ -2165,13 +2150,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, Status DecodeArrowDenseNonNull_opt(int num_values, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, - typename EncodingTraits::Accumulator* out, int* out_num_values, int32_t* bianry_length) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; + uint64_t capacity = values->capacity(); // ArrowBinaryHelper helper(out); auto dict_values = reinterpret_cast(dictionary_->data()); @@ -2185,23 +2170,21 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; - RETURN_NOT_OK(IndexInBounds(idx)); + // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; - // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - // RETURN_NOT_OK(helper.PushChunk()); - // } - // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); - auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; - uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; +#ifdef __AVX512BW__ + memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); +#else memcpy(dst_value, val.ptr, static_cast(value_len)); +#endif dst_value = dst_value + value_len; num_appended++; diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 4902da1a9a395..b424ef6826122 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -321,7 +321,6 @@ class TypedDecoder : virtual public Decoder { int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { return 0; } diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 1d86e1c8d15dc..67bdfaba07e2f 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -33,7 +33,7 @@ ARROW_BOOST_BUILD_VERSION=1.75.0 ARROW_BROTLI_BUILD_VERSION=v1.0.9 ARROW_BZIP2_BUILD_VERSION=1.0.8 ARROW_CARES_BUILD_VERSION=1.17.1 -ARROW_GBENCHMARK_BUILD_VERSION=v1.5.2 +ARROW_GBENCHMARK_BUILD_VERSION=v1.6.0 ARROW_GFLAGS_BUILD_VERSION=v2.2.2 ARROW_GLOG_BUILD_VERSION=v0.4.0 ARROW_GRPC_BUILD_VERSION=v1.35.0