From bea687733eed9e8d6f71e3aaed892e21d2dc110d Mon Sep 17 00:00:00 2001 From: yspMing <1844341967@qq.com> Date: Fri, 7 Jan 2022 01:33:39 -0500 Subject: [PATCH 1/2] Adding predicate fitler funtion to ape-reader --- oap-ape/ape-native/src/reader.cc | 39 ++- oap-ape/ape-native/src/reader.h | 4 + oap-ape/ape-native/src/test/CMakeLists.txt | 6 +- .../ape-native/src/test/parquetHdfsTest.cc | 18 +- .../src/test/predicateFilterTest.cc | 170 ++++++++++ oap-ape/ape-native/src/utils/JsonConvertor.cc | 78 +++++ oap-ape/ape-native/src/utils/JsonConvertor.h | 4 + .../src/utils/PredicateExpression.cc | 292 ++++++++++++++++++ .../src/utils/PredicateExpression.h | 186 +++++++++++ 9 files changed, 789 insertions(+), 8 deletions(-) create mode 100644 oap-ape/ape-native/src/test/predicateFilterTest.cc create mode 100644 oap-ape/ape-native/src/utils/PredicateExpression.cc create mode 100644 oap-ape/ape-native/src/utils/PredicateExpression.h diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index a1073e2f8..b793c42dd 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -35,7 +35,7 @@ void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, ARROW_LOG(DEBUG) << "hdfsHost " << hdfsHost << " port " << hdfsPort; options->ConfigureEndPoint(hdfsHost, hdfsPort); - // todo: if we delete `options`, it will core dump, seems like free twice. + // TODO: if we delete `options`, it will core dump, seems like free twice. auto result = arrow::fs::HadoopFileSystem::Make(*options); if (!result.ok()) { ARROW_LOG(WARNING) << "HadoopFileSystem Make failed! err msg:" @@ -457,7 +457,7 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr allocateFilterBuffers(batchSize); } - if (aggExprs.size()) { // todo: group by agg size + if (aggExprs.size()) { // TODO: group by agg size allocateAggBuffers(batchSize); } @@ -564,6 +564,11 @@ bool Reader::checkEndOfRowGroup() { // if a splitFile contains rowGroup [2,5], currentRowGroup is 2 // rowGroupReaders index starts from 0 ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar; + //find next row group passing the predicate filter + while(!doPredicateFilter(currentRowGroup)){ + currentRowGroup++; + totalRowGroupsRead++; + } rowGroupReader = rowGroupReaders[currentRowGroup - firstRowGroupIndex]; currentRowGroup++; totalRowGroupsRead++; @@ -593,6 +598,12 @@ void Reader::setFilter(std::string filterJsonStr) { filterExpression = std::make_shared( "root", std::dynamic_pointer_cast(tmpExpression)); + //set predicate filter + std::shared_ptr tmpExpressionP = + JsonConvertor::parseToPredicateExpression(filterJsonStr); + + predicateExpression = std::make_shared("root", tmpExpressionP); + // get column names from expression filterColumnNames.clear(); setFilterColumnNames(tmpExpression); @@ -879,4 +890,28 @@ bool Reader::isNativeEnabled() { arrow::internal::CpuInfo::Vendor::Intel; } +bool Reader::doPredicateFilter(int rowGroupIndex){ + int8_t res; + + if(!predicateExpression or rowGroupIndex >= firstRowGroupIndex + totalRowGroups - 1){ + return true; + } + + std::unique_ptr urgMataData = fileMetaData->RowGroup(rowGroupIndex); + std::shared_ptr rgMataData = std::move(urgMataData); + + predicateExpression->setSchema(schema); + predicateExpression->setStatistic(rgMataData); + predicateExpression->PredicateWithParam(res); + + if(res > 0){ + ARROW_LOG(DEBUG) <<"predicate pass."; + return true; + } + else{ + ARROW_LOG(DEBUG) <<"predicate not pass."; + return false; + } +} + } // namespace ape diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 17930ac86..9b96bec30 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -30,6 +30,7 @@ #include "utils/AggExpression.h" #include "utils/FilterExpression.h" +#include "utils/PredicateExpression.h" #include "utils/PlasmaCacheManager.h" #include "utils/JsonConvertor.h" #include "utils/Type.h" @@ -66,6 +67,8 @@ class Reader { void setPreBufferEnabled(bool isEnabled); + bool doPredicateFilter(int rowGroupIndex); + static bool isNativeEnabled(); private: @@ -127,6 +130,7 @@ class Reader { int64_t totalRowsLoadedSoFar = 0; std::shared_ptr filterExpression; + std::shared_ptr predicateExpression; std::chrono::duration filterTime = std::chrono::nanoseconds::zero(); std::chrono::duration aggTime = std::chrono::nanoseconds::zero(); diff --git a/oap-ape/ape-native/src/test/CMakeLists.txt b/oap-ape/ape-native/src/test/CMakeLists.txt index e1e23b7b2..8aa4707a5 100644 --- a/oap-ape/ape-native/src/test/CMakeLists.txt +++ b/oap-ape/ape-native/src/test/CMakeLists.txt @@ -51,7 +51,7 @@ set(ARROW_STATIC -Wl,-Bstatic; -Wl,--whole-archive; arrow_static; -Wl,--no-whole set(THREAD -Wl,-Bdynamic;-Wl,--as-needed;Threads::Threads;-Wl,--no-as-needed) add_executable(convertorTest convertorTest.cc) -target_link_libraries(convertorTest gtest_main parse ${ARROW_STATIC} ${THREAD}) +target_link_libraries(convertorTest gtest_main parquet_jni) add_test(NAME convertorTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/convertorTest) add_executable(decimalTest decimalTest.cc) @@ -76,6 +76,10 @@ target_link_libraries(parquetHdfsTest gtest_main parquet_jni ${ARROW_STATIC} ${T add_executable(plasmaTest plasmaTest.cc) target_link_libraries(plasmaTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD}) +add_executable(predicateFilterTest predicateFilterTest.cc) +target_link_libraries(predicateFilterTest gtest_main parquet_jni) +add_test(NAME predicateFilterTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/predicateFilterTest) + if(APE_CI) # Will not run these test else() diff --git a/oap-ape/ape-native/src/test/parquetHdfsTest.cc b/oap-ape/ape-native/src/test/parquetHdfsTest.cc index c9dbea4b5..78c38871f 100644 --- a/oap-ape/ape-native/src/test/parquetHdfsTest.cc +++ b/oap-ape/ape-native/src/test/parquetHdfsTest.cc @@ -25,11 +25,13 @@ #include #include +#include + TEST(ParquetHdfsTest, ReadTest) { arrow::fs::HdfsOptions options_; - std::string hdfs_host = "sr585"; - int hdfs_port = 9000; + std::string hdfs_host = "clx06-AEP"; + int hdfs_port = 8020; // std::string hdfs_user = "kunshang"; options_.ConfigureEndPoint(hdfs_host, hdfs_port); @@ -42,8 +44,8 @@ TEST(ParquetHdfsTest, ReadTest) { std::make_shared("", *result); std::string file_name = - "/tpcds_10g/store_sales/" - "part-00000-74feb3b4-1954-4be7-802d-a50912793bea-c000.snappy.parquet"; + "/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450816/" + "part-00009-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet"; auto file_result = fs_->OpenInputFile(file_name); EXPECT_TRUE(file_result.ok()) << "Open hdfs file failed"; @@ -84,7 +86,7 @@ TEST(ParquetHdfsTest, ReadTest) { column_reader = row_group_reader->Column(1); parquet::Int32Reader* int32_reader = static_cast(column_reader.get()); - int batch_size = 10000; + int batch_size = 1000; int32_t* values_rb = (int32_t*)std::malloc(batch_size * sizeof(int32_t)); int64_t values_read = 0; int64_t rows_read = 0; @@ -105,10 +107,15 @@ TEST(ParquetHdfsTest, ReadTest) { // ReadBatchSpaced will record a null bitmap. std::cout << std::endl << "test ReadBatchSpaced API" << std::endl; + std::cout<<"malloc values_rbs"< + +#include +#include + +#include +#include +#include +#include +#include + +#include "src/reader.h" +#include "src/utils/PredicateExpression.h" + +TEST(predicateFilterTest, minMaxTest) +{ + arrow::fs::HdfsOptions options_; + + std::string hdfs_host = "clx06-AEP"; + int hdfs_port = 8020; + + options_.ConfigureEndPoint(hdfs_host, hdfs_port); + + auto result = arrow::fs::HadoopFileSystem::Make(options_); + EXPECT_TRUE(result.ok()) << "HadoopFileSystem Make failed"; + + std::shared_ptr fs_ = + std::make_shared("", *result); + + std::string file_name = + "/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450817/" + "part-00013-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet"; + + auto file_result = fs_->OpenInputFile(file_name); + EXPECT_TRUE(file_result.ok()) << "Open hdfs file failed"; + + std::shared_ptr file = file_result.ValueOrDie(); + std::cout << "file size is " << file->GetSize().ValueOrDie() << std::endl; + + parquet::ReaderProperties properties; + // std::shared_ptr metadata; + std::unique_ptr parquetReader = + parquet::ParquetFileReader::Open(file, properties, NULLPTR); + + std::shared_ptr fileMetaData = parquetReader->metadata(); + + int numRows = fileMetaData->num_rows(); + int numCols = fileMetaData->num_columns(); + int numRowGroups = fileMetaData->num_row_groups(); + std::string schema = fileMetaData->schema()->ToString(); + + std::cout<<"parquet file has "< urgMataData = fileMetaData->RowGroup(rowGroupIndex); + std::shared_ptr rgMataData = std::move(urgMataData); + + //std::vector> columnChunkMeta; + //columnChunkMeta.resize(numCols); + std::unique_ptr columnChunkMeta; + + for(int i=0; iColumnChunk(i); + std::string column_name = fileMetaData->schema()->Column(i)->name(); + parquet::Type::type column_type = fileMetaData->schema()->Column(i)->physical_type(); + std::cout<<"current column["< statistic = columnChunkMeta->statistics(); + if(!statistic->HasMinMax()) + { + std::cout<<"This column does not have valid min max value."<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()< JsonConvertor::parseToFilterExpression(nlohmann::jso return ex; } + +std::shared_ptr JsonConvertor::parseToPredicateExpression( + std::string jsonString) { + ARROW_LOG(DEBUG) << "json string " << jsonString; + auto json = nlohmann::json::parse(jsonString); + return parseToPredicateExpression(json); +} + +std::shared_ptr JsonConvertor::parseToPredicateExpression(nlohmann::json root) { + std::shared_ptr ex; + std::string type = root["FilterTypeName"]; + if (type.compare("not") == 0) { + ex = std::make_shared(type, + parseToPredicateExpression(root["child"])); + } else if (type.compare("and") == 0 || type.compare("or") == 0) { + nlohmann::json leftNode = root["LeftNode"]; + nlohmann::json RightNode = root["RightNode"]; + ex = std::make_shared(type, parseToPredicateExpression(leftNode), + parseToPredicateExpression(RightNode)); + } else if (type.compare("lt") == 0 || type.compare("lteq") == 0 || + type.compare("gt") == 0 || type.compare("gteq") == 0 || + type.compare("eq") == 0 || type.compare("noteq") == 0) { + std::string colType = root["ColumnType"]; + std::string columnName = root["ColumnName"]; + std::string valueString = root["Value"]; + if (valueString.compare("null") == 0) { // this will only match 'eq', 'noteq' type. + NullStruct nullStruct; + ex = std::make_shared(type, columnName, nullStruct); + } else if (colType.compare("Boolean") == 0) { + bool value = valueString == "false" ? false : true; // FIXME + ex = std::make_shared(type, columnName, value); + } else if (colType.compare("Integer") == 0) { + int32_t value = std::stoi(valueString); + ex = std::make_shared(type, columnName, value); + } else if (colType.compare("Long") == 0) { + int64_t value = std::stol(valueString); + ex = std::make_shared(type, columnName, value); + } else if (colType.compare("Float") == 0) { + float value = std::stof(valueString); + ex = std::make_shared(type, columnName, value); + } else if (colType.compare("Double") == 0) { + double value = std::stod(valueString); + ex = std::make_shared(type, columnName, value); + } else if (colType.compare("FromStringBinary") == 0) { + // the binary is like Binary{\"xxxxx\"} + int binLen = valueString.length() - 10; + std::string value = valueString.substr(8, binLen); + uint8_t* buf = new uint8_t[binLen]; + // FIXME: memory leak! + std::memcpy(buf, value.data(), binLen); + parquet::ByteArray byteArray(binLen, buf); + ex = std::make_shared(type, columnName, byteArray); + } else { + // WARNING: NOT support yet; + ARROW_LOG(WARNING) << "unsupported data type"; + } + } else if (type.compare("apestartwithfilter") == 0) { + std::string colType = root["ColumnType"]; // should be BinaryColumn + std::string columnName = root["ColumnName"]; + std::string valueString = root["Value"]; // start with string + ex = std::make_shared(type, columnName, valueString); + } else if (type.compare("apeendwithfilter") == 0) { + std::string colType = root["ColumnType"]; // should be BinaryColumn + std::string columnName = root["ColumnName"]; + std::string valueString = root["Value"]; // end with string + ex = std::make_shared(type, columnName, valueString); + } else if (type.compare("apecontainsfilter") == 0) { + std::string colType = root["ColumnType"]; // should be BinaryColumn + std::string columnName = root["ColumnName"]; + std::string valueString = root["Value"]; // end with string + ex = std::make_shared(type, columnName, valueString); + } else { + ARROW_LOG(WARNING) << "unsupported Expression type" << type; + } + + return ex; +} + std::vector> JsonConvertor::parseToGroupByExpressions( std::string jsonString) { ARROW_LOG(DEBUG) << "json string " << jsonString; diff --git a/oap-ape/ape-native/src/utils/JsonConvertor.h b/oap-ape/ape-native/src/utils/JsonConvertor.h index 9b001814f..d450db495 100644 --- a/oap-ape/ape-native/src/utils/JsonConvertor.h +++ b/oap-ape/ape-native/src/utils/JsonConvertor.h @@ -27,6 +27,7 @@ #include "src/utils/AggExpression.h" #include "src/utils/FilterExpression.h" +#include "src/utils/PredicateExpression.h" #include "src/utils/Expression.h" namespace ape { @@ -36,6 +37,9 @@ class JsonConvertor { static std::shared_ptr parseToFilterExpression(std::string jsonString); static std::shared_ptr parseToFilterExpression(nlohmann::json root); + static std::shared_ptr parseToPredicateExpression(std::string jsonString); + static std::shared_ptr parseToPredicateExpression(nlohmann::json root); + static std::vector> parseToGroupByExpressions( std::string jsonString); static std::vector> parseToGroupByExpressions( diff --git a/oap-ape/ape-native/src/utils/PredicateExpression.cc b/oap-ape/ape-native/src/utils/PredicateExpression.cc new file mode 100644 index 000000000..2e017a279 --- /dev/null +++ b/oap-ape/ape-native/src/utils/PredicateExpression.cc @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +#include "src/utils/PredicateExpression.h" +#include "src/utils/Expression.h" +#include "src/utils/Type.h" + +namespace ape { + +class prFinder { + public: + explicit prFinder(const std::string& cmp_str) : str(cmp_str) {} + + bool operator()(Schema& v) { return v.getColName().compare(str) == 0; } + + private: + const std::string str; +}; + +// Base class +PredicateExpression::PredicateExpression(std::string type_) : Expression() { type = type_; } + +PredicateExpression::~PredicateExpression() {} + +// RootPredicateExpression +RootPredicateExpression::RootPredicateExpression(std::string type_, + std::shared_ptr child_) + : PredicateExpression(type_) { + child = child_; +} + +int RootPredicateExpression::PredicateWithParam(int8_t& out) { + // root node doesn't need outbuffer + int8_t childOut; + auto start1 = std::chrono::steady_clock::now(); + child->PredicateWithParam(childOut); + auto end1 = std::chrono::steady_clock::now(); + ARROW_LOG(DEBUG) << "exec takes " + << static_cast>(end1 - start1).count() * + 1000 + << " ms"; + + auto end2 = std::chrono::steady_clock::now(); + ARROW_LOG(DEBUG) << "copy takes " + << static_cast>(end2 - end1).count() * + 1000 + << " ms"; + out = childOut; + return 0; +} + +RootPredicateExpression::~RootPredicateExpression() {} + +// NotPredicateExpression +NotPredicateExpression::NotPredicateExpression(std::string type_, + std::shared_ptr child_) + : PredicateExpression(type_) { + child = child_; +} + +NotPredicateExpression::~NotPredicateExpression() {} + +int NotPredicateExpression::PredicateWithParam(int8_t& out) { + int8_t childOut; + child->PredicateWithParam(childOut); + + out = 1 - childOut; + + return 0; +} + +// BinaryPredicateExpression +BinaryPredicateExpression::BinaryPredicateExpression(std::string type_, + std::shared_ptr left_, + std::shared_ptr right_) + : PredicateExpression(type_) { + left = left_; + right = right_; + opType = type_; +} + +BinaryPredicateExpression::~BinaryPredicateExpression() {} + +int BinaryPredicateExpression::PredicateWithParam(int8_t& out) { + int8_t leftChild; + int8_t rightChild; + + left->PredicateWithParam(leftChild); + + if (opType.compare("and") == 0 && leftChild < 1) { + out = 0; + return 0; + } else if (opType.compare("or") == 0 && leftChild > 0) { + out = 1; + return 0; + } + + right->PredicateWithParam(rightChild); + + if (opType.compare("and") == 0) { + out = leftChild & rightChild; + } else if (opType.compare("or") == 0) { + out = leftChild | rightChild; + } + + return 0; +} + +// StringPredicateExpression +StringPredicateExpression::StringPredicateExpression(std::string type_, std::string columnName_, + std::string value_) + : UnaryPredicateExpression(type_, columnName_) { + value = value_; +} + +void StringPredicateExpression::setSchema(std::shared_ptr> schema_) { + schema = schema_; + ptrdiff_t pos = std::distance( + schema->begin(), std::find_if(schema->begin(), schema->end(), prFinder(columnName))); + columnIndex = pos; +} + +std::string StringPredicateExpression::getColumnName() { return columnName; } + +// StartWithPredicateExpression +int StartWithPredicateExpression::PredicateWithParam(int8_t& out) { + //currently string type doesn't support predicate operation + out = 1; + return 0; +} + +// EndWithPredicateExpression +int EndWithPredicateExpression::PredicateWithParam(int8_t& out) { + //currently string type doesn't support predicate operation + out = 1; + return 0; +} + +// ContainsPredicateExpression +int ContainsPredicateExpression::PredicateWithParam(int8_t& out) { + //currently string type doesn't support predicate operation + out = 1; + return 0; +} + +// UnaryPredicateExpression +template +TypedUnaryPredicateExpression::TypedUnaryPredicateExpression(std::string type_, + std::string columnName_, + T value_) + : UnaryPredicateExpression(type_, columnName_) { + value = value_; + compareType = type_; +} + +template <> +int TypedUnaryPredicateExpression::PredicateWithParam(int8_t& out) { + out = 1; + + return 0; +} + +template <> +int TypedUnaryPredicateExpression::PredicateWithParam(int8_t& out) { + out = 1; + + return 0; +} + +template +int TypedUnaryPredicateExpression::PredicateWithParam(int8_t& out) { + + if(!hasMinMax) + { + out = 1; + return 0; + } + ARROW_LOG(DEBUG)<<"columnName: "< value ? 1 : 0; + } else if (compareType.compare("gteq") == 0) { + out = maxVal >= value ? 1 : 0; + } else if (compareType.compare("eq") == 0) { + out = (maxVal >= value && minVal <= value) ? 1 : 0; + } else if (compareType.compare("noteq") == 0) { + out = 1; + } else if (compareType.compare("lt") == 0) { + out = minVal < value ? 1 : 0; + } else if (compareType.compare("lteq") == 0) { + out = minVal <= value ? 1 : 0; + } else { + ARROW_LOG(WARNING) << "NOT support Predicate type!"; + } + + return 0; +} + +template <> +void TypedUnaryPredicateExpression::setStatistic(std::shared_ptr rgMataData_) +{ + std::unique_ptr columnChunkMeta = rgMataData_->ColumnChunk(columnIndex); + std::shared_ptr statistic = columnChunkMeta->statistics(); + hasMinMax = statistic->HasMinMax(); + auto int32Statistic = std::static_pointer_cast(statistic); + maxVal = int32Statistic->max(); + minVal = int32Statistic->min(); +} + +template <> +void TypedUnaryPredicateExpression::setStatistic(std::shared_ptr rgMataData_) +{ + std::unique_ptr columnChunkMeta = rgMataData_->ColumnChunk(columnIndex); + std::shared_ptr statistic = columnChunkMeta->statistics(); + hasMinMax = statistic->HasMinMax(); + auto int64Statistic = std::static_pointer_cast(statistic); + maxVal = int64Statistic->max(); + minVal = int64Statistic->min(); +} + +template <> +void TypedUnaryPredicateExpression::setStatistic(std::shared_ptr rgMataData_) +{ + std::unique_ptr columnChunkMeta = rgMataData_->ColumnChunk(columnIndex); + std::shared_ptr statistic = columnChunkMeta->statistics(); + hasMinMax = statistic->HasMinMax(); + auto floatStatistic = std::static_pointer_cast(statistic); + maxVal = floatStatistic->max(); + minVal = floatStatistic->min(); +} + +template <> +void TypedUnaryPredicateExpression::setStatistic(std::shared_ptr rgMataData_) +{ + std::unique_ptr columnChunkMeta = rgMataData_->ColumnChunk(columnIndex); + std::shared_ptr statistic = columnChunkMeta->statistics(); + hasMinMax = statistic->HasMinMax(); + auto doubleStatistic = std::static_pointer_cast(statistic); + maxVal = doubleStatistic->max(); + minVal = doubleStatistic->min(); +} + +template +void TypedUnaryPredicateExpression::setStatistic(std::shared_ptr rgMataData_) +{ + hasMinMax = false; +} + +template +void TypedUnaryPredicateExpression::setSchema( + std::shared_ptr> schema_) { + schema = schema_; + ptrdiff_t pos = std::distance( + schema->begin(), std::find_if(schema->begin(), schema->end(), prFinder(columnName))); + columnIndex = pos; +} + +std::string UnaryPredicateExpression::getColumnName() { return columnName; } + +template +TypedUnaryPredicateExpression::~TypedUnaryPredicateExpression() {} + +// Force compile these classes. +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; +template class TypedUnaryPredicateExpression; + +} // namespace ape diff --git a/oap-ape/ape-native/src/utils/PredicateExpression.h b/oap-ape/ape-native/src/utils/PredicateExpression.h new file mode 100644 index 000000000..16ad7bdbc --- /dev/null +++ b/oap-ape/ape-native/src/utils/PredicateExpression.h @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "src/utils/Expression.h" +#include "src/utils/Type.h" + +namespace ape { + +class PredicateExpression : public Expression { + public: + explicit PredicateExpression(std::string type_); + virtual void Execute() {} + int ExecuteWithParam(int batchSize, const std::vector& dataBuffers, + const std::vector& nullBuffers, + std::vector& outBuffers) { + return 0; + } + void setSchema(std::shared_ptr> schema_) {} + virtual int PredicateWithParam(int8_t& out) = 0; + virtual void setStatistic(std::shared_ptr rgMataData_) = 0; + ~PredicateExpression(); +}; + +class RootPredicateExpression : public PredicateExpression { + public: + RootPredicateExpression(std::string type_, std::shared_ptr child_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~RootPredicateExpression(); + void setStatistic(std::shared_ptr rgMataData_){ + child->setStatistic(rgMataData_); + } + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + child->setSchema(schema); + } + + private: + std::shared_ptr child; +}; + +class NotPredicateExpression : public PredicateExpression { + public: + NotPredicateExpression(std::string type_, std::shared_ptr child_); + void Execute() {} + int PredicateWithParam(int8_t& out); + void setStatistic(std::shared_ptr rgMataData_){ + child->setStatistic(rgMataData_); + } + ~NotPredicateExpression(); + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + child->setSchema(schema); + } + std::shared_ptr getChild() { return child; } + + private: + std::shared_ptr child; +}; + +class BinaryPredicateExpression : public PredicateExpression { + public: + BinaryPredicateExpression(std::string type_, std::shared_ptr left_, + std::shared_ptr right_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~BinaryPredicateExpression(); + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + left->setSchema(schema); + right->setSchema(schema); + } + void setStatistic(std::shared_ptr rgMataData_){ + left->setStatistic(rgMataData_); + right->setStatistic(rgMataData_); + } + std::shared_ptr getLeftChild() { return left; } + std::shared_ptr getRightChild() { return right; } + + private: + std::shared_ptr left; + std::shared_ptr right; + std::string opType; +}; + +class UnaryPredicateExpression : public PredicateExpression { + public: + UnaryPredicateExpression(std::string type_, std::string columnName_) + : PredicateExpression(type_) { + columnName = columnName_; + } + ~UnaryPredicateExpression() {} + std::string getColumnName(); + + protected: + std::string columnName; +}; + +template +class TypedUnaryPredicateExpression : public UnaryPredicateExpression { + public: + TypedUnaryPredicateExpression(std::string type_, std::string columnName_, T value_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~TypedUnaryPredicateExpression(); + void setSchema(std::shared_ptr> schema_); + void setStatistic(std::shared_ptr rgMataData_); + + private: + std::string compareType; + T value; + T minVal; + T maxVal; + bool hasMinMax; + int columnIndex; +}; + +class StringPredicateExpression : public UnaryPredicateExpression { + public: + StringPredicateExpression(std::string type_, std::string columnName_, std::string value_); + ~StringPredicateExpression() {} + void setSchema(std::shared_ptr> schema_); + void setStatistic(std::shared_ptr rgMataData_) {} + int PredicateWithParam(int8_t& out) = 0; + std::string getColumnName(); + + protected: + std::string type; + std::string value; + int columnIndex; +}; + +class StartWithPredicateExpression : public StringPredicateExpression { + public: + StartWithPredicateExpression(std::string type_, std::string columnName_, + std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~StartWithPredicateExpression() {} +}; + +class EndWithPredicateExpression : public StringPredicateExpression { + public: + EndWithPredicateExpression(std::string type_, std::string columnName_, std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~EndWithPredicateExpression() {} +}; + +class ContainsPredicateExpression : public StringPredicateExpression { + public: + ContainsPredicateExpression(std::string type_, std::string columnName_, std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~ContainsPredicateExpression() {} +}; + +using BoolUnaryPredicateExpression = TypedUnaryPredicateExpression; +using Int32UnaryPredicateExpression = TypedUnaryPredicateExpression; +using Int64UnaryPredicateExpression = TypedUnaryPredicateExpression; +// using Int96UnaryPredicateExpression = TypedUnaryExpression; +using FloatUnaryPredicateExpression = TypedUnaryPredicateExpression; +using DoubleUnaryPredicateExpression = TypedUnaryPredicateExpression; +using NullUnaryPredicateExpression = TypedUnaryPredicateExpression; +using ByteArrayUnaryPredicateExpression = TypedUnaryPredicateExpression; + +} // namespace ape From abdd5d76ab61fea742fb9a4fd7447c76ddeaf965 Mon Sep 17 00:00:00 2001 From: yspMing <1844341967@qq.com> Date: Tue, 18 Jan 2022 00:17:28 -0500 Subject: [PATCH 2/2] Fixing some bug for parquet row group filter push down method --- oap-ape/ape-native/src/CMakeLists.txt | 2 +- oap-ape/ape-native/src/reader.cc | 26 ++++++++++++++++------ oap-ape/ape-native/src/reader.h | 1 + oap-ape/ape-native/src/test/CMakeLists.txt | 4 ++-- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/oap-ape/ape-native/src/CMakeLists.txt b/oap-ape/ape-native/src/CMakeLists.txt index 1987d3077..3413848f0 100644 --- a/oap-ape/ape-native/src/CMakeLists.txt +++ b/oap-ape/ape-native/src/CMakeLists.txt @@ -58,7 +58,7 @@ if(NOT REDIS_PLUS_PLUS_LIB) message(FATAL_ERROR "redis++ library not found") endif() -set(PARSE_SRC utils/FilterExpression.cc utils/JsonConvertor.cc utils/Type.cc utils/UnaryFilter.cc) +set(PARSE_SRC utils/FilterExpression.cc utils/JsonConvertor.cc utils/Type.cc utils/UnaryFilter.cc utils/PredicateExpression.cc) set(DECIMAL_SRC utils/DecimalUtil.cc utils/DecimalConvertor.cc) set(AGG_SRC utils/AggExpression.cc) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index b793c42dd..3ff61a943 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -68,6 +68,7 @@ void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, fileMetaData = parquetReader->metadata(); + this->useRowGroupFilter = false; this->firstRowGroupIndex = firstRowGroup; this->totalRowGroups = rowGroupToRead; @@ -565,10 +566,17 @@ bool Reader::checkEndOfRowGroup() { // rowGroupReaders index starts from 0 ARROW_LOG(DEBUG) << "totalRowsLoadedSoFar: " << totalRowsLoadedSoFar; //find next row group passing the predicate filter - while(!doPredicateFilter(currentRowGroup)){ - currentRowGroup++; - totalRowGroupsRead++; + if(useRowGroupFilter) + { + while(!doPredicateFilter(currentRowGroup)){ + currentRowGroup++; + totalRowGroupsRead++; + } + if(currentRowGroup > firstRowGroupIndex + totalRowGroups - 1){ + return true; + } } + rowGroupReader = rowGroupReaders[currentRowGroup - firstRowGroupIndex]; currentRowGroup++; totalRowGroupsRead++; @@ -599,10 +607,13 @@ void Reader::setFilter(std::string filterJsonStr) { "root", std::dynamic_pointer_cast(tmpExpression)); //set predicate filter - std::shared_ptr tmpExpressionP = - JsonConvertor::parseToPredicateExpression(filterJsonStr); + if(useRowGroupFilter) + { + std::shared_ptr tmpExpressionP = + JsonConvertor::parseToPredicateExpression(filterJsonStr); - predicateExpression = std::make_shared("root", tmpExpressionP); + predicateExpression = std::make_shared("root", tmpExpressionP); + } // get column names from expression filterColumnNames.clear(); @@ -893,7 +904,8 @@ bool Reader::isNativeEnabled() { bool Reader::doPredicateFilter(int rowGroupIndex){ int8_t res; - if(!predicateExpression or rowGroupIndex >= firstRowGroupIndex + totalRowGroups - 1){ + if(!predicateExpression or rowGroupIndex > firstRowGroupIndex + totalRowGroups - 1){ + //std::cout<<"rowGroupIndex: "< extraByteArrayBuffers; + bool useRowGroupFilter = false; bool filterReset = false; int currentBatchSize = 0; int initRequiredColumnCount = 0; diff --git a/oap-ape/ape-native/src/test/CMakeLists.txt b/oap-ape/ape-native/src/test/CMakeLists.txt index 8aa4707a5..cee99cf89 100644 --- a/oap-ape/ape-native/src/test/CMakeLists.txt +++ b/oap-ape/ape-native/src/test/CMakeLists.txt @@ -51,7 +51,7 @@ set(ARROW_STATIC -Wl,-Bstatic; -Wl,--whole-archive; arrow_static; -Wl,--no-whole set(THREAD -Wl,-Bdynamic;-Wl,--as-needed;Threads::Threads;-Wl,--no-as-needed) add_executable(convertorTest convertorTest.cc) -target_link_libraries(convertorTest gtest_main parquet_jni) +target_link_libraries(convertorTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD}) add_test(NAME convertorTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/convertorTest) add_executable(decimalTest decimalTest.cc) @@ -77,7 +77,7 @@ add_executable(plasmaTest plasmaTest.cc) target_link_libraries(plasmaTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD}) add_executable(predicateFilterTest predicateFilterTest.cc) -target_link_libraries(predicateFilterTest gtest_main parquet_jni) +target_link_libraries(predicateFilterTest gtest_main parquet_jni ${ARROW_STATIC} ${THREAD}) add_test(NAME predicateFilterTest COMMAND ${EXECUTABLE_OUTPUT_PATH}/predicateFilterTest) if(APE_CI)