From baee5ad71820aa72b6a2a880afa05a6d26aef57a Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 19 Jul 2022 14:16:28 +0800 Subject: [PATCH 01/25] Add parquet scan benchmark --- cpp/CMakeLists.txt | 5 + cpp/src/parquet/CMakeLists.txt | 2 + .../parquet/arrow/parquet_scan_benchmark.cc | 213 +++++++++++++++++ .../arrow/parquet_scan_string_benchmark.cc | 223 ++++++++++++++++++ cpp/src/parquet/arrow/test_utils.h | 132 +++++++++++ cpp/src/parquet/arrow/utils/exception.h | 25 ++ cpp/src/parquet/arrow/utils/macros.h | 104 ++++++++ 7 files changed, 704 insertions(+) create mode 100644 cpp/src/parquet/arrow/parquet_scan_benchmark.cc create mode 100644 cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc create mode 100644 cpp/src/parquet/arrow/test_utils.h create mode 100644 cpp/src/parquet/arrow/utils/exception.h create mode 100644 cpp/src/parquet/arrow/utils/macros.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 234a66c00d875..0f112af489c88 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,6 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") +add_compile_options(-g) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") @@ -937,3 +938,7 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + + + + diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 3f3ca5a529917..0aceae25fc98b 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -399,6 +399,8 @@ add_parquet_benchmark(column_io_benchmark) add_parquet_benchmark(encoding_benchmark) add_parquet_benchmark(level_conversion_benchmark) add_parquet_benchmark(arrow/reader_writer_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_string_benchmark PREFIX "parquet-arrow") if(ARROW_WITH_BROTLI) add_definitions(-DARROW_WITH_BROTLI) diff --git a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc new file mode 100644 index 0000000000000..2ab95e1c380d0 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + for (int i = 0; i < num_columns; ++i) { + column_indices.push_back(i); + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + : public GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark(std::string filename) + : GoogleBenchmarkColumnarToRow(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace columnartorow +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkColumnarToRow::CacheScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc new file mode 100644 index 0000000000000..58763e2edfee0 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + std::cout << "Enter Is_binary_like Check: " << std::endl; + for (int i = 0; i < num_columns; ++i) { + auto field = schema->field(i); + auto type = field->type(); + if (arrow::is_binary_like(type->id())) { + std::cout << "Is_binary_like colIndex: " << i << std::endl; + column_indices.push_back(i); + } + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + : public GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark(std::string filename) + : GoogleBenchmarkParquetStringScan(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + for (auto val : local_column_indices){ + std::cout << "local_column_indices: is_binary_like colIndex: " << val << std::endl; + } + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace ParquetStringScan +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkParquetStringScan::IteratorScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/test_utils.h b/cpp/src/parquet/arrow/test_utils.h new file mode 100644 index 0000000000000..d3afa459dfdf6 --- /dev/null +++ b/cpp/src/parquet/arrow/test_utils.h @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "utils/macros.h" +using namespace arrow; + +using TreeExprBuilder = gandiva::TreeExprBuilder; +using FunctionNode = gandiva::FunctionNode; + +#define ASSERT_NOT_OK(status) \ + do { \ + ::arrow::Status __s = (status); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \ + do { \ + auto status_name = (rexpr); \ + auto __s = status_name.status(); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + lhs = std::move(status_name).ValueOrDie(); \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_NAME(x, y) ARROW_CONCAT(x, y) + +#define ARROW_ASSIGN_OR_THROW(lhs, rexpr) \ + ARROW_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_THROW_NAME(_error_or_value, __COUNTER__), \ + lhs, rexpr); + +template +Status Equals(const T& expected, const T& actual) { + if (expected.Equals(actual)) { + return arrow::Status::OK(); + } + std::stringstream pp_expected; + std::stringstream pp_actual; + ::arrow::PrettyPrintOptions options(/*indent=*/2); + options.window = 50; + ASSERT_NOT_OK(PrettyPrint(expected, options, &pp_expected)); + ASSERT_NOT_OK(PrettyPrint(actual, options, &pp_actual)); + if (pp_expected.str() == pp_actual.str()) { + return arrow::Status::OK(); + } + return Status::Invalid("Expected RecordBatch is ", pp_expected.str(), " with schema ", + expected.schema()->ToString(), ", while actual is ", + pp_actual.str(), " with schema ", actual.schema()->ToString()); +} + +void MakeInputBatch(std::vector input_data, + std::shared_ptr sch, + std::shared_ptr* input_batch) { + // prepare input record Batch + std::vector> array_list; + int length = -1; + int i = 0; + for (auto data : input_data) { + std::shared_ptr a0; + ASSERT_NOT_OK(arrow::ipc::internal::json::ArrayFromJSON(sch->field(i++)->type(), + data.c_str(), &a0)); + if (length == -1) { + length = a0->length(); + } + assert(length == a0->length()); + array_list.push_back(a0); + } + + *input_batch = RecordBatch::Make(sch, length, array_list); + return; +} + +void ConstructNullInputBatch(std::shared_ptr* null_batch) { + std::vector> columns; + arrow::Int64Builder builder1; + builder1.AppendNull(); + builder1.Append(1); + + arrow::Int64Builder builder2; + builder2.Append(1); + builder2.AppendNull(); + + std::shared_ptr array1; + builder1.Finish(&array1); + std::shared_ptr array2; + builder2.Finish(&array2); + + columns.push_back(array1); + columns.push_back(array2); + + std::vector> schema_vec{ + arrow::field("col1", arrow::int64()), + arrow::field("col2", arrow::int64()), + }; + + std::shared_ptr schema{std::make_shared(schema_vec)}; + *null_batch = arrow::RecordBatch::Make(schema, 2, columns); + return; +} diff --git a/cpp/src/parquet/arrow/utils/exception.h b/cpp/src/parquet/arrow/utils/exception.h new file mode 100644 index 0000000000000..582903d0ef0fa --- /dev/null +++ b/cpp/src/parquet/arrow/utils/exception.h @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +class JniPendingException : public std::runtime_error { + public: + explicit JniPendingException(const std::string& arg) : runtime_error(arg) {} +}; \ No newline at end of file diff --git a/cpp/src/parquet/arrow/utils/macros.h b/cpp/src/parquet/arrow/utils/macros.h new file mode 100644 index 0000000000000..e123d46f82854 --- /dev/null +++ b/cpp/src/parquet/arrow/utils/macros.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "parquet/arrow/utils/exception.h" + +#define TIME_NANO_DIFF(finish, start) \ + (finish.tv_sec - start.tv_sec) * 1000000000 + (finish.tv_nsec - start.tv_nsec) + +#define TIME_MICRO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO(time, res, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + res = (expr); \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define VECTOR_PRINT(v, name) \ + std::cout << "[" << name << "]:"; \ + for (int i = 0; i < v.size(); i++) { \ + if (i != v.size() - 1) \ + std::cout << v[i] << ","; \ + else \ + std::cout << v[i]; \ + } \ + std::cout << std::endl; + +#define THROW_NOT_OK(expr) \ + do { \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + } while (false); + +#define TIME_TO_STRING(time) \ + (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us") + +#define TIME_NANO_TO_STRING(time) \ + (time > 1e7 ? time / 1e6 : ((time > 1e4) ? time / 1e3 : time)) \ + << (time > 1e7 ? "ms" : (time > 1e4 ? "us" : "ns")) From 7fdf91400f11036980d002da9b720c531d0db606 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 22 Jul 2022 15:29:47 +0800 Subject: [PATCH 02/25] Add Usage --- cpp/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cpp/README.md b/cpp/README.md index b083f3fe78e74..ec6b136aa83a3 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -32,3 +32,16 @@ to install pre-compiled binary versions of the library. Please refer to our latest [C++ Development Documentation][1]. [1]: https://github.com/apache/arrow/blob/master/docs/source/developers/cpp + +## Run parquet string scan benchmark +#### Minimal benchmark build +cd arrow +mkdir -p cpp/debug +cd cpp/debug +cmake -DCMAKE_BUILD_TYPE=Release -DARROW_BUILD_BENCHMARKS=ON -DARROW_WITH_ZLIB=ON -DARROW_JEMALLOC=OFF -DARROW_PARQUET=ON -DARROW_COMPUTE=ON -DARROW_DATASET=ON -DARROW_WITH_SNAPPY=ON -DARROW_FILESYSTEM=ON .. + +#### Run benchmark and collect perf data +cpp/debug +./release/parquet-arrow-parquet-scan-string-benchmark --iterations 10 --threads 1 --file {parquet_path} --cpu 0 & +perf record -e cycles:ppp -C 0 sleep 10 + From 3b8ffa3eb3383eb43726e63b7f402ecec2efa53f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 22 Jul 2022 15:30:28 +0800 Subject: [PATCH 03/25] perf report --- cpp/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/README.md b/cpp/README.md index ec6b136aa83a3..9f563149beb77 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -44,4 +44,5 @@ cmake -DCMAKE_BUILD_TYPE=Release -DARROW_BUILD_BENCHMARKS=ON -DARROW_WITH_ZLIB=O cpp/debug ./release/parquet-arrow-parquet-scan-string-benchmark --iterations 10 --threads 1 --file {parquet_path} --cpu 0 & perf record -e cycles:ppp -C 0 sleep 10 +perf report From a7c3879ebe44e4cacae390e053e7798d18468aee Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Thu, 11 Aug 2022 09:16:00 +0800 Subject: [PATCH 04/25] Add Optimize append --- cpp/CMakeLists.txt | 2 +- .../arrow/parquet_scan_string_benchmark.cc | 1 + cpp/src/parquet/arrow/reader_internal.cc | 17 ++- cpp/src/parquet/column_reader.cc | 85 ++++++++++++++- cpp/src/parquet/column_reader.h | 6 ++ cpp/src/parquet/encoding.cc | 102 ++++++++++++++++++ cpp/src/parquet/encoding.h | 9 ++ 7 files changed, 215 insertions(+), 7 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0f112af489c88..822e4de7ed826 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,7 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") -add_compile_options(-g) +add_compile_options(-g -O0) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc index 58763e2edfee0..79db29bf1b08b 100644 --- a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc @@ -38,6 +38,7 @@ // namespace benchmark { const int batch_buffer_size = 32768; +// const int batch_buffer_size = 2; class GoogleBenchmarkParquetStringScan { public: diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index ccd05d3c38cbe..386b3e42ef04c 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/compute/api.h" @@ -330,6 +331,17 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } +std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, + const std::shared_ptr& type) { + std::vector> buffers = {reader->ReleaseIsValid(), + reader->ReleaseOffsets(), + reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), + buffers, reader->null_count()); + std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + return ::arrow::MakeArray(data); +} + Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); @@ -697,8 +709,9 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); - result = chunked_result; + result = TransferBinaryZeroCopy(reader, value_type); + // RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); + // result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ec205f3d3f935..e62008d837d34 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1554,6 +1554,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + values_ = AllocateBuffer(pool); + offset_ = AllocateBuffer(pool); } ::arrow::ArrayVector GetBuilderChunks() override { @@ -1568,23 +1570,98 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), &accumulator_); + // int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + // static_cast(values_to_read), &accumulator_); + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + static_cast(values_to_read), 0, + NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, 0, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } + // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + // int64_t num_decoded = this->current_decoder_->DecodeArrow( + // static_cast(values_to_read), static_cast(null_count), + // valid_bits_->mutable_data(), values_written_, &accumulator_); + // DCHECK_EQ(num_decoded, values_to_read - null_count); + // ResetValues(); + // } + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = this->current_decoder_->DecodeArrow( + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, &accumulator_); + valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, values_written_, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); ResetValues(); } + void ReserveValues(int64_t extra_values) { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + PARQUET_THROW_NOT_OK( + values_->Resize(bytes_for_values(new_values_capacity * 20), false)); + PARQUET_THROW_NOT_OK( + offset_->Resize(bytes_for_values(new_values_capacity * 4), false)); + + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; + + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + + std::shared_ptr ReleaseValues() override { + auto result = values_; + // PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } + + std::shared_ptr ReleaseOffsets() { + auto result = offset_; + offset_ = AllocateBuffer(this->pool_); + bianry_length_ = 0; + return result; + } + + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + bianry_length_ = 0; + } + } + private: // Helper data structure for accumulating builder chunks typename EncodingTraits::Accumulator accumulator_; + + int32_t bianry_length_ = 0; + + // std::shared_ptr<::arrow::ResizableBuffer> values_; + std::shared_ptr<::arrow::ResizableBuffer> offset_; + // std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index a73bba6cb4e9c..535c885f53852 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -226,6 +226,8 @@ class RecordReader { /// \brief Pre-allocate space for data. Results in better flat read performance virtual void Reserve(int64_t num_values) = 0; + virtual void ReserveValues(int64_t capacity) {} + /// \brief Clear consumed values and repetition/definition levels as the /// result of calling ReadRecords virtual void Reset() = 0; @@ -234,6 +236,10 @@ class RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; + virtual std::shared_ptr ReleaseOffsets() { + return nullptr; + } + /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseIsValid() = 0; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index eeeff1c8f9b7a..8c436ddd5bfb7 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/array/builder_dict.h" @@ -1351,7 +1352,27 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, out, &result, bianry_length)); + + // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + // valid_bits_offset, out, &result)); + + return result; + } + private: + + // const int32_t* offset_arr; + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, @@ -1403,6 +1424,87 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_values_decoded, + int32_t* bianry_length) { + // ArrowBinaryHelper helper(out); + int values_decoded = 0; + + + + // RETURN_NOT_OK(helper.builder->Reserve(num_values)); + // RETURN_NOT_OK(helper.builder->ReserveData( + // std::min(len_, helper.chunk_space_remaining))); + + auto dst_value = values->mutable_data() + (*bianry_length); + int capacity = values->capacity(); + if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { + values->Reserve(len_); + dst_value = values->mutable_data() + (*bianry_length); + } + + + + int i = 0; + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ParquetException::EofException(); + } + // if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { + // // This element would exceed the capacity of a chunk + // RETURN_NOT_OK(helper.PushChunk()); + // RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); + // RETURN_NOT_OK(helper.builder->ReserveData( + // std::min(len_, helper.chunk_space_remaining))); + // } + // helper.UnsafeAppend(data_ + 4, value_len); + + (*bianry_length) += value_len; + offset[i+1] = offset[i] + value_len; + memcpy(dst_value, data_ + 4, value_len); + dst_value = dst_value + value_len; + + // std::cout << "*(data_ + 4) :" << *(data_ + 4) << std::endl; + // std::cout << "*(data_ + 5) " << *(data_ + 5) << std::endl; + + data_ += increment; + len_ -= increment; + + // uint8_t* address = values->mutable_data(); + // for(int i=0; i< 10; i++) { + // std::cout << "*(address+" << i << ")" << *(address+i) << std::endl; + // } + + ++values_decoded; + ++i; + return Status::OK(); + }, + [&]() { + // helper.UnsafeAppendNull(); + offset[i+1] = offset[i]; + ++i; + return Status::OK(); + })); + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index a3d8e012b6a52..4902da1a9a395 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -316,6 +316,15 @@ class TypedDecoder : virtual public Decoder { virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) = 0; + + virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + return 0; + } /// \brief Decode into an ArrayBuilder or other accumulator ignoring nulls /// From 642bfa4ed8080ccb3196a13a4b14cc1e82399f48 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 16 Aug 2022 10:21:25 +0800 Subject: [PATCH 05/25] Complete plaindecoder code and passed test --- cpp/src/parquet/arrow/reader_internal.cc | 2 +- cpp/src/parquet/column_reader.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 386b3e42ef04c..22d19a4c899b0 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -338,7 +338,7 @@ std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, reader->ReleaseValues()}; auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), buffers, reader->null_count()); - std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; return ::arrow::MakeArray(data); } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index e62008d837d34..a45e4da577df1 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1577,7 +1577,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, 0, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); + // ResetValues(); } // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { @@ -1594,7 +1594,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, values_written_, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); - ResetValues(); + // ResetValues(); } void ReserveValues(int64_t extra_values) { From 25ad7998e9f0cdbe6625ece027a459a464e31777 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Thu, 18 Aug 2022 14:51:47 +0800 Subject: [PATCH 06/25] Add code for DictDecoder --- cpp/src/parquet/encoding.cc | 163 +++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 8c436ddd5bfb7..63b3d0637e323 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1443,7 +1443,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Reserve(len_); + values->Reserve(len_ + *bianry_length); dst_value = values->mutable_data() + (*bianry_length); } @@ -1958,6 +1958,31 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + int result = 0; + if (null_count == 0) { + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, + offset, values, + out, &result, bianry_length)); + } else { + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, out, &result, bianry_length)); + + } + + // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + // valid_bits_offset, out, &result)); + + return result; + } + + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -2022,6 +2047,92 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values, + int32_t* bianry_length) { + constexpr int32_t kBufferSize = 1024; + int32_t indices[kBufferSize]; + + // ArrowBinaryHelper helper(out); + + auto dst_value = values->mutable_data() + (*bianry_length); + + + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_appended = 0; + while (num_appended < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(kBufferSize, num_values - num_appended - null_count); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + + if (ARROW_PREDICT_FALSE(num_indices < 1)) { + return Status::Invalid("Invalid number of indices '", num_indices, "'"); + } + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + auto idx = indices[i]; + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + // RETURN_NOT_OK(helper.PushChunk()); + // } + // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + + auto value_len = val.len; + (*bianry_length) += value_len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Reserve(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + + ++i; + ++values_decoded; + } else { + // RETURN_NOT_OK(helper.AppendNull()); + offset[num_appended+1] = offset[num_appended]; + --null_count; + } + ++num_appended; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + // RETURN_NOT_OK(helper.AppendNull()); + offset[num_appended+1] = offset[num_appended]; + --null_count; + ++num_appended; + } + } + *out_num_values = values_decoded; + return Status::OK(); + } + Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits::Accumulator* out, int* out_num_values) { @@ -2051,6 +2162,56 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDenseNonNull_opt(int num_values, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + typename EncodingTraits::Accumulator* out, + int* out_num_values, + int32_t* bianry_length) { + + constexpr int32_t kBufferSize = 2048; + int32_t indices[kBufferSize]; + int values_decoded = 0; + + // ArrowBinaryHelper helper(out); + auto dict_values = reinterpret_cast(dictionary_->data()); + + auto dst_value = values->mutable_data() + (*bianry_length); + int num_appended = 0; + + while (values_decoded < num_values) { + int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (num_indices == 0) ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + auto idx = indices[i]; + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + // RETURN_NOT_OK(helper.PushChunk()); + // } + // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + + auto value_len = val.len; + (*bianry_length) += value_len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Reserve(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + num_appended++; + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, From f3dce6b7df0fdab9c9f5b8767896b8fb70679131 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 19 Aug 2022 09:34:07 +0800 Subject: [PATCH 07/25] Resume CMakeLists.txt --- cpp/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 822e4de7ed826..1d58528cf7059 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,7 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") -add_compile_options(-g -O0) +#add_compile_options(-g -O0) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") From 5c4d2539a50c35db330a739803fe651adfe8d67a Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 22 Aug 2022 17:48:42 +0800 Subject: [PATCH 08/25] Fix offset validate --- cpp/src/parquet/arrow/reader.cc | 6 +++--- cpp/src/parquet/encoding.cc | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 016ceacb0ef62..c8329894498c4 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -105,9 +105,9 @@ class ColumnReaderImpl : public ColumnReader { std::shared_ptr<::arrow::ChunkedArray>* out) final { RETURN_NOT_OK(LoadBatch(batch_size)); RETURN_NOT_OK(BuildArray(batch_size, out)); - for (int x = 0; x < (*out)->num_chunks(); x++) { - RETURN_NOT_OK((*out)->chunk(x)->Validate()); - } + // for (int x = 0; x < (*out)->num_chunks(); x++) { + // RETURN_NOT_OK((*out)->chunk(x)->Validate()); + // } return Status::OK(); } diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 63b3d0637e323..824fb655620bf 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2094,7 +2094,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); auto value_len = val.len; - (*bianry_length) += value_len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { @@ -2102,6 +2101,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } + (*bianry_length) += value_len; memcpy(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; @@ -2193,7 +2193,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); auto value_len = val.len; - (*bianry_length) += value_len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { @@ -2201,6 +2200,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } + (*bianry_length) += value_len; memcpy(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; From 64123b0893d9386cd36e8e5da54ce7489df668d1 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 23 Aug 2022 17:02:03 +0800 Subject: [PATCH 09/25] reduce buffer capacity --- cpp/src/parquet/column_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index a45e4da577df1..5b432967231cd 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1602,9 +1602,9 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(bytes_for_values(new_values_capacity * 20), false)); + values_->Resize(new_values_capacity * 20, false)); PARQUET_THROW_NOT_OK( - offset_->Resize(bytes_for_values(new_values_capacity * 4), false)); + offset_->Resize(new_values_capacity * 4, false)); auto offset = reinterpret_cast(offset_->mutable_data()); offset[0] = 0; From 3bf58f8fc6725fd8bef896b84536681ecdd6612b Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 29 Aug 2022 10:08:46 +0800 Subject: [PATCH 10/25] Add Patch version --- cpp/src/parquet/arrow/reader.cc | 5 +++++ cpp/src/parquet/column_reader.cc | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index c8329894498c4..971a76d15fc7f 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/buffer.h" @@ -141,6 +142,10 @@ class FileReaderImpl : public FileReader { : pool_(pool), reader_(std::move(reader)), reader_properties_(std::move(properties)) {} + + ~FileReaderImpl() { + std::cout << "Patch version for fix OOM" << std::endl; + } Status Init() { return SchemaManifest::Make(reader_->metadata()->schema(), diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 5b432967231cd..dc92c750f76ed 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1604,7 +1604,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, PARQUET_THROW_NOT_OK( values_->Resize(new_values_capacity * 20, false)); PARQUET_THROW_NOT_OK( - offset_->Resize(new_values_capacity * 4, false)); + offset_->Resize((new_values_capacity+1) * 4, false)); auto offset = reinterpret_cast(offset_->mutable_data()); offset[0] = 0; From 86cc3951e90ddd5d16d627a6ebf8646668125673 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 30 Aug 2022 11:01:16 +0800 Subject: [PATCH 11/25] Add Fix for write validate --- cpp/src/parquet/arrow/reader.cc | 6 +++--- cpp/src/parquet/encoding.cc | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 971a76d15fc7f..e4fa62a79d67f 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -106,9 +106,9 @@ class ColumnReaderImpl : public ColumnReader { std::shared_ptr<::arrow::ChunkedArray>* out) final { RETURN_NOT_OK(LoadBatch(batch_size)); RETURN_NOT_OK(BuildArray(batch_size, out)); - // for (int x = 0; x < (*out)->num_chunks(); x++) { - // RETURN_NOT_OK((*out)->chunk(x)->Validate()); - // } + for (int x = 0; x < (*out)->num_chunks(); x++) { + RETURN_NOT_OK((*out)->chunk(x)->Validate()); + } return Status::OK(); } diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 824fb655620bf..ed5a5922a7a66 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1443,7 +1443,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Reserve(len_ + *bianry_length); + values->Resize(len_ + *bianry_length); dst_value = values->mutable_data() + (*bianry_length); } @@ -2098,7 +2098,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Reserve(capacity); + values->Resize(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; @@ -2197,7 +2197,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Reserve(capacity); + values->Resize(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; From 145a562efee18b05dc8ff9d20542060fa877988e Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 30 Aug 2022 12:56:04 +0800 Subject: [PATCH 12/25] Set false for resize --- cpp/src/parquet/encoding.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index ed5a5922a7a66..7fde92d7fd39e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1443,7 +1443,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Resize(len_ + *bianry_length); + values->Resize(len_ + *bianry_length, false); dst_value = values->mutable_data() + (*bianry_length); } @@ -2098,7 +2098,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity); + values->Resize(capacity, false); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; @@ -2197,7 +2197,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity); + values->Resize(capacity, false); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; From 2b8aabffe3f979588f8ddbd5943dcfabb73363d9 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 30 Aug 2022 17:34:15 +0800 Subject: [PATCH 13/25] Fix customer case issue --- cpp/src/arrow/buffer.h | 3 +++ cpp/src/parquet/column_reader.cc | 8 ++++++++ cpp/src/parquet/encoding.cc | 6 +++--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 1a3bb29e43971..b79187cf2dda7 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -225,6 +225,9 @@ class ARROW_EXPORT Buffer { /// \brief Return the buffer's size in bytes int64_t size() const { return size_; } + /// \brief Set the buffer's size in bytes + void SetSize(int64_t size) { size_ = size; } + /// \brief Return the buffer's capacity (number of allocated bytes) int64_t capacity() const { return capacity_; } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index dc92c750f76ed..265dd8d1e8ec4 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1634,6 +1634,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, std::shared_ptr ReleaseOffsets() { auto result = offset_; + + auto offsetArr = reinterpret_cast(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + int64_t binary_length = last_offset - first_offset; + std::cout << "binary_length:" << binary_length << std::endl; + values_->SetSize(binary_length); + offset_ = AllocateBuffer(this->pool_); bianry_length_ = 0; return result; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 7fde92d7fd39e..824fb655620bf 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1443,7 +1443,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Resize(len_ + *bianry_length, false); + values->Reserve(len_ + *bianry_length); dst_value = values->mutable_data() + (*bianry_length); } @@ -2098,7 +2098,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity, false); + values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; @@ -2197,7 +2197,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity, false); + values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; From 510acb0faf2093900176466e93b5b6ae2f349280 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 30 Aug 2022 17:36:46 +0800 Subject: [PATCH 14/25] Add patch version --- cpp/src/parquet/arrow/reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e4fa62a79d67f..55208e503e723 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -144,7 +144,7 @@ class FileReaderImpl : public FileReader { reader_properties_(std::move(properties)) {} ~FileReaderImpl() { - std::cout << "Patch version for fix OOM" << std::endl; + std::cout << "Patch version-0830" << std::endl; } Status Init() { From 972cc4e2788f1b439ede4de2234ea79e01f2154a Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 30 Aug 2022 17:41:03 +0800 Subject: [PATCH 15/25] Remove cout --- cpp/src/parquet/column_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 265dd8d1e8ec4..f4979d2d97f57 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1639,7 +1639,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, const auto first_offset = offsetArr[0]; const auto last_offset = offsetArr[values_written_]; int64_t binary_length = last_offset - first_offset; - std::cout << "binary_length:" << binary_length << std::endl; + // std::cout << "binary_length:" << binary_length << std::endl; values_->SetSize(binary_length); offset_ = AllocateBuffer(this->pool_); From 1433cde21ffd598ac91ef67fc8eaee249420bcf9 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 2 Sep 2022 16:52:40 +0800 Subject: [PATCH 16/25] Add Function opt --- cpp/src/parquet/arrow/reader_internal.cc | 5 ++++- cpp/src/parquet/column_reader.cc | 8 +++++++- cpp/src/parquet/column_reader.h | 5 +++++ cpp/src/parquet/encoding.cc | 4 ++-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 22d19a4c899b0..f1c31357b578c 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -704,7 +704,10 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::DATE64: RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result)); break; - case ::arrow::Type::FIXED_SIZE_BINARY: + case ::arrow::Type::FIXED_SIZE_BINARY: { + RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::BINARY: case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index f4979d2d97f57..2367da0bb83fd 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1602,7 +1602,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * 20, false)); + values_->Resize(new_values_capacity * binary_per_row_length_, false)); PARQUET_THROW_NOT_OK( offset_->Resize((new_values_capacity+1) * 4, false)); @@ -1641,6 +1641,12 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, int64_t binary_length = last_offset - first_offset; // std::cout << "binary_length:" << binary_length << std::endl; values_->SetSize(binary_length); + + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + binary_per_row_length_ = binary_length / values_written_ + 1; + std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + hasCal_average_len_ = true; + } offset_ = AllocateBuffer(this->pool_); bianry_length_ = 0; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 535c885f53852..e5d635753999d 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -54,6 +54,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; +static constexpr int32_t kDefaultBinaryPerRowSzie = 20; + class PARQUET_EXPORT LevelDecoder { public: LevelDecoder(); @@ -301,6 +303,9 @@ class RecordReader { int64_t levels_position_; int64_t levels_capacity_; + bool hasCal_average_len_ = false; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie; + std::shared_ptr<::arrow::ResizableBuffer> values_; // In the case of false, don't allocate the values buffer (when we directly read into // builder classes). diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 824fb655620bf..fe130547e7e19 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2172,6 +2172,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; + uint64_t capacity = values->capacity(); // ArrowBinaryHelper helper(out); auto dict_values = reinterpret_cast(dictionary_->data()); @@ -2185,7 +2186,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; - RETURN_NOT_OK(IndexInBounds(idx)); + // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { // RETURN_NOT_OK(helper.PushChunk()); @@ -2194,7 +2195,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; - uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); values->Reserve(capacity); From f37f44a1a9a934ebee131891eb92c2941affa3a4 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 2 Sep 2022 17:41:23 +0800 Subject: [PATCH 17/25] Add to DecodeArrowDense_opt --- cpp/src/parquet/encoding.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index fe130547e7e19..1fbae5ca9b96c 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2068,6 +2068,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; int num_appended = 0; + uint64_t capacity = values->capacity(); while (num_appended < num_values) { bool is_valid = bit_reader.IsSet(); bit_reader.Next(); @@ -2095,7 +2096,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; - uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); values->Reserve(capacity); From b9c2644d82ccabf7b55978fcae760d4a9c1e89ae Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 6 Sep 2022 11:25:32 +0800 Subject: [PATCH 18/25] clean comment --- cpp/src/parquet/column_reader.cc | 1 - cpp/src/parquet/encoding.cc | 36 ++------------------------------ 2 files changed, 2 insertions(+), 35 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 2367da0bb83fd..e32106c1e2148 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1639,7 +1639,6 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, const auto first_offset = offsetArr[0]; const auto last_offset = offsetArr[values_written_]; int64_t binary_length = last_offset - first_offset; - // std::cout << "binary_length:" << binary_length << std::endl; values_->SetSize(binary_length); if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 1fbae5ca9b96c..fba3aadfd00f4 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1464,37 +1464,19 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } - // if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { - // // This element would exceed the capacity of a chunk - // RETURN_NOT_OK(helper.PushChunk()); - // RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); - // RETURN_NOT_OK(helper.builder->ReserveData( - // std::min(len_, helper.chunk_space_remaining))); - // } - // helper.UnsafeAppend(data_ + 4, value_len); (*bianry_length) += value_len; offset[i+1] = offset[i] + value_len; memcpy(dst_value, data_ + 4, value_len); dst_value = dst_value + value_len; - // std::cout << "*(data_ + 4) :" << *(data_ + 4) << std::endl; - // std::cout << "*(data_ + 5) " << *(data_ + 5) << std::endl; - data_ += increment; len_ -= increment; - - // uint8_t* address = values->mutable_data(); - // for(int i=0; i< 10; i++) { - // std::cout << "*(address+" << i << ")" << *(address+i) << std::endl; - // } - ++values_decoded; ++i; return Status::OK(); }, [&]() { - // helper.UnsafeAppendNull(); offset[i+1] = offset[i]; ++i; return Status::OK(); @@ -2057,8 +2039,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - // ArrowBinaryHelper helper(out); - auto dst_value = values->mutable_data() + (*bianry_length); @@ -2087,13 +2067,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // Consume all indices if (is_valid) { auto idx = indices[i]; - RETURN_NOT_OK(IndexInBounds(idx)); - const auto& val = dict_values[idx]; - // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - // RETURN_NOT_OK(helper.PushChunk()); - // } - // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); - + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; @@ -2110,7 +2085,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, ++i; ++values_decoded; } else { - // RETURN_NOT_OK(helper.AppendNull()); offset[num_appended+1] = offset[num_appended]; --null_count; } @@ -2124,7 +2098,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, bit_reader.Next(); } } else { - // RETURN_NOT_OK(helper.AppendNull()); offset[num_appended+1] = offset[num_appended]; --null_count; ++num_appended; @@ -2189,11 +2162,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, auto idx = indices[i]; // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; - // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - // RETURN_NOT_OK(helper.PushChunk()); - // } - // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); - auto value_len = val.len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { From 710edb4d3337007633cc6421655fec972a1f7a42 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 6 Sep 2022 14:46:39 +0800 Subject: [PATCH 19/25] Clean unnecessary paras --- cpp/src/parquet/column_reader.cc | 19 ++----------------- cpp/src/parquet/encoding.cc | 31 +++---------------------------- cpp/src/parquet/encoding.h | 1 - 3 files changed, 5 insertions(+), 46 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index e32106c1e2148..16c8bd41dbef2 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1570,31 +1570,19 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, } void ReadValuesDense(int64_t values_to_read) override { - // int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - // static_cast(values_to_read), &accumulator_); int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), 0, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, 0, &accumulator_, &bianry_length_); + values_, 0, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); - // ResetValues(); } - // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - // int64_t num_decoded = this->current_decoder_->DecodeArrow( - // static_cast(values_to_read), static_cast(null_count), - // valid_bits_->mutable_data(), values_written_, &accumulator_); - // DCHECK_EQ(num_decoded, values_to_read - null_count); - // ResetValues(); - // } - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, values_written_, &accumulator_, &bianry_length_); + values_, values_written_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); - // ResetValues(); } void ReserveValues(int64_t extra_values) { @@ -1626,7 +1614,6 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, std::shared_ptr ReleaseValues() override { auto result = values_; - // PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; @@ -1672,9 +1659,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, int32_t bianry_length_ = 0; - // std::shared_ptr<::arrow::ResizableBuffer> values_; std::shared_ptr<::arrow::ResizableBuffer> offset_; - // std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index fba3aadfd00f4..dd7433302c2fb 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1356,15 +1356,11 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, values, - valid_bits_offset, out, &result, bianry_length)); - - // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, - // valid_bits_offset, out, &result)); + valid_bits_offset, &result, bianry_length)); return result; } @@ -1428,18 +1424,9 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int* out_values_decoded, int32_t* bianry_length) { - // ArrowBinaryHelper helper(out); int values_decoded = 0; - - - - // RETURN_NOT_OK(helper.builder->Reserve(num_values)); - // RETURN_NOT_OK(helper.builder->ReserveData( - // std::min(len_, helper.chunk_space_remaining))); - auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { @@ -1447,8 +1434,6 @@ class PlainByteArrayDecoder : public PlainDecoder, dst_value = values->mutable_data() + (*bianry_length); } - - int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, @@ -1944,23 +1929,18 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, - offset, values, - out, &result, bianry_length)); + offset, values, &result, bianry_length)); } else { PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, values, - valid_bits_offset, out, &result, bianry_length)); + valid_bits_offset, &result, bianry_length)); } - // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, - // valid_bits_offset, out, &result)); - return result; } @@ -2033,16 +2013,12 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int* out_num_values, int32_t* bianry_length) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - auto dst_value = values->mutable_data() + (*bianry_length); - - ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); auto dict_values = reinterpret_cast(dictionary_->data()); @@ -2139,7 +2115,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, Status DecodeArrowDenseNonNull_opt(int num_values, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, - typename EncodingTraits::Accumulator* out, int* out_num_values, int32_t* bianry_length) { diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 4902da1a9a395..b424ef6826122 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -321,7 +321,6 @@ class TypedDecoder : virtual public Decoder { int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* out, int32_t* bianry_length) { return 0; } From a5e4080622d88facd641b5be5b5adc3fef4bcdad Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 6 Sep 2022 14:52:38 +0800 Subject: [PATCH 20/25] Delete Patch version --- cpp/src/parquet/arrow/reader.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 55208e503e723..d886c956bc877 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -142,10 +142,6 @@ class FileReaderImpl : public FileReader { : pool_(pool), reader_(std::move(reader)), reader_properties_(std::move(properties)) {} - - ~FileReaderImpl() { - std::cout << "Patch version-0830" << std::endl; - } Status Init() { return SchemaManifest::Make(reader_->metadata()->schema(), From b9e52c33a146d4dbf7d3955652520a235c7ebfe7 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 7 Sep 2022 13:10:01 +0800 Subject: [PATCH 21/25] Modify Gbenchmark version to avoid conflict --- cpp/thirdparty/versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 1d86e1c8d15dc..67bdfaba07e2f 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -33,7 +33,7 @@ ARROW_BOOST_BUILD_VERSION=1.75.0 ARROW_BROTLI_BUILD_VERSION=v1.0.9 ARROW_BZIP2_BUILD_VERSION=1.0.8 ARROW_CARES_BUILD_VERSION=1.17.1 -ARROW_GBENCHMARK_BUILD_VERSION=v1.5.2 +ARROW_GBENCHMARK_BUILD_VERSION=v1.6.0 ARROW_GFLAGS_BUILD_VERSION=v2.2.2 ARROW_GLOG_BUILD_VERSION=v0.4.0 ARROW_GRPC_BUILD_VERSION=v1.35.0 From 541dd29b972392d8a63761fe3b106be9b9b2023f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 7 Sep 2022 16:31:47 +0800 Subject: [PATCH 22/25] custom memcpy --- cpp/src/parquet/encoding.cc | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dd7433302c2fb..1bc9cdbbf986f 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/array/builder_dict.h" @@ -61,6 +62,31 @@ using ArrowPoolVector = std::vector>; namespace parquet { namespace { +void memcpy_avx512(void* dest, const void* src, size_t length) { + uint32_t k; + for (k = 0; k + 32 < length; k += 32) { + __m256i v = _mm256_loadu_si256((const __m256i*)(src + k)); + _mm256_storeu_si256((__m256i*)(dest + k), v); + } + auto mask = (1L << (length - k)) - 1; + __m256i v = _mm256_maskz_loadu_epi8(mask, src + k); + _mm256_mask_storeu_epi8(dest + k, mask, v); +} + +// inline __attribute__((always_inline)) void memcpy_avx512(void* dest, const void* src, size_t length) { +// int nchunks = length / sizeof(uint64_t); +// int slice = length % sizeof(uint64_t); + +// uint64_t * s = (uint64_t *)src; +// uint64_t * d = (uint64_t *)dest; + +// while(nchunks--) +// *d++ = *s++; + +// while (slice--) +// *((uint8_t *)d++) =*((uint8_t *)s++); +// } + constexpr int64_t kInMemoryDefaultCapacity = 1024; // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. @@ -1452,7 +1478,8 @@ class PlainByteArrayDecoder : public PlainDecoder, (*bianry_length) += value_len; offset[i+1] = offset[i] + value_len; - memcpy(dst_value, data_ + 4, value_len); + // memcpy(dst_value, data_ + 4, value_len); + memcpy_avx512(dst_value, data_ + 4, value_len); dst_value = dst_value + value_len; data_ += increment; @@ -2054,7 +2081,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; - memcpy(dst_value, val.ptr, static_cast(value_len)); + // memcpy(dst_value, val.ptr, static_cast(value_len)); + memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; @@ -2145,7 +2173,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; - memcpy(dst_value, val.ptr, static_cast(value_len)); + // memcpy(dst_value, val.ptr, static_cast(value_len)); + memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; num_appended++; From 3a08a638ee415d886bca28ab2b5538fa3ad06999 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 7 Sep 2022 16:44:54 +0800 Subject: [PATCH 23/25] add -march=native --- cpp/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1d58528cf7059..c0ec785b45046 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -49,6 +49,7 @@ endif() set(ARROW_VERSION "4.0.0") #add_compile_options(-g -O0) +add_compile_options(-g -march=native) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") From 2fbcdd3d2b11557b68b5b0e9a86e9d4979dcde45 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 14 Sep 2022 14:31:06 +0800 Subject: [PATCH 24/25] Refine avx512 relation code --- .../parquet/arrow/parquet_scan_benchmark.cc | 6 +++++- cpp/src/parquet/encoding.cc | 20 ++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc index 2ab95e1c380d0..d1fab5b2e13a1 100644 --- a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc +++ b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc @@ -130,12 +130,16 @@ class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark properties, &parquet_reader)); std::vector> batches; + // ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + // row_group_indices, local_column_indices, &record_batch_reader)); + // need varify complex type, so remove local_column_indices ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( - row_group_indices, local_column_indices, &record_batch_reader)); + row_group_indices, &record_batch_reader)); do { TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); if (record_batch) { + // std::cout << " record_batch->ToString(): " << record_batch->ToString() << std::endl; // batches.push_back(record_batch); num_batches += 1; num_rows += record_batch->num_rows(); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 1bc9cdbbf986f..76080f87ce440 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -62,7 +62,7 @@ using ArrowPoolVector = std::vector>; namespace parquet { namespace { -void memcpy_avx512(void* dest, const void* src, size_t length) { +inline __attribute__((always_inline)) void memcpy_avx512(void* dest, const void* src, size_t length) { uint32_t k; for (k = 0; k + 32 < length; k += 32) { __m256i v = _mm256_loadu_si256((const __m256i*)(src + k)); @@ -73,7 +73,7 @@ void memcpy_avx512(void* dest, const void* src, size_t length) { _mm256_mask_storeu_epi8(dest + k, mask, v); } -// inline __attribute__((always_inline)) void memcpy_avx512(void* dest, const void* src, size_t length) { +// inline __attribute__((always_inline)) void memcpy_opt(void* dest, const void* src, size_t length) { // int nchunks = length / sizeof(uint64_t); // int slice = length % sizeof(uint64_t); @@ -1478,8 +1478,12 @@ class PlainByteArrayDecoder : public PlainDecoder, (*bianry_length) += value_len; offset[i+1] = offset[i] + value_len; - // memcpy(dst_value, data_ + 4, value_len); +#ifdef __AVX512BW__ memcpy_avx512(dst_value, data_ + 4, value_len); +#else + memcpy(dst_value, data_ + 4, value_len); +#endif + dst_value = dst_value + value_len; data_ += increment; @@ -2081,8 +2085,11 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; - // memcpy(dst_value, val.ptr, static_cast(value_len)); +#ifdef __AVX512BW__ memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); +#else + memcpy(dst_value, val.ptr, static_cast(value_len)); +#endif dst_value = dst_value + value_len; @@ -2173,8 +2180,11 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; - // memcpy(dst_value, val.ptr, static_cast(value_len)); +#ifdef __AVX512BW__ memcpy_avx512(dst_value, val.ptr, static_cast(value_len)); +#else + memcpy(dst_value, val.ptr, static_cast(value_len)); +#endif dst_value = dst_value + value_len; num_appended++; From 46a96668fc6f8b12e76478144d950e38973c54ee Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Thu, 15 Sep 2022 17:14:00 +0800 Subject: [PATCH 25/25] comment cout --- cpp/src/parquet/column_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 16c8bd41dbef2..401eeca5bb5db 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1630,7 +1630,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { binary_per_row_length_ = binary_length / values_written_ + 1; - std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; hasCal_average_len_ = true; }