Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33155 Revisit Parquet Test Suite #19405

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,25 @@ arrow::Status ParquetReader::openReadFile()
ForEach (*itr)
{
IFile &file = itr->query();
const char *filename = file.queryFilename();
parquet::arrow::FileReaderBuilder readerBuilder;
reportIfFailure(readerBuilder.OpenFile(file.queryFilename(), false, readerProperties));
reportIfFailure(readerBuilder.OpenFile(filename, false, readerProperties));
readerBuilder.memory_pool(pool);
readerBuilder.properties(arrowReaderProps);
std::unique_ptr<parquet::arrow::FileReader> parquetFileReader;
reportIfFailure(readerBuilder.Build(&parquetFileReader));
parquetFileReaders.push_back(std::move(parquetFileReader));
parquetFileReaders.emplace_back(filename, std::move(parquetFileReader));
}

auto sortFileReaders = [](NamedFileReader &a, NamedFileReader &b) -> bool
{
return strcmp(std::get<0>(a).c_str(), std::get<0>(b).c_str()) < 0;
};

std::sort(parquetFileReaders.begin(), parquetFileReaders.end(), sortFileReaders);

if (parquetFileReaders.empty())
failx("Parquet file %s not found", location.c_str());
}
return arrow::Status::OK();
}
Expand Down Expand Up @@ -358,8 +369,8 @@ std::shared_ptr<parquet::arrow::RowGroupReader> ParquetReader::queryCurrentTable
tables += fileTableCounts[i];
if (currTable < tables)
{
currentTableMetadata = parquetFileReaders[i]->parquet_reader()->metadata()->Subset({static_cast<int>(currTable - offset)});
return parquetFileReaders[i]->RowGroup(currTable - offset);
currentTableMetadata = std::get<1>(parquetFileReaders[i])->parquet_reader()->metadata()->Subset({static_cast<int>(currTable - offset)});
return std::get<1>(parquetFileReaders[i])->RowGroup(currTable - offset);
}
offset = tables;
}
Expand All @@ -381,7 +392,7 @@ arrow::Status ParquetReader::processReadFile()
rbatchItr = arrow::RecordBatchReader::RecordBatchReaderIterator(rbatchReader.get());
PARQUET_ASSIGN_OR_THROW(auto datasetRows, scanner->CountRows());
// Divide the work among any number of workers
divide_row_groups(activityCtx, datasetRows, totalRowCount, startRowGroup);
divide_row_groups(activityCtx, datasetRows, totalRowCount, startRow);
}
else
{
Expand All @@ -390,7 +401,7 @@ arrow::Status ParquetReader::processReadFile()

for (int i = 0; i < parquetFileReaders.size(); i++)
{
__int64 tables = parquetFileReaders[i]->num_row_groups();
__int64 tables = std::get<1>(parquetFileReaders[i])->num_row_groups();
fileTableCounts.push_back(tables);
totalTables += tables;
}
Expand Down Expand Up @@ -430,7 +441,7 @@ arrow::Result<std::shared_ptr<arrow::Table>> ParquetReader::queryRows()
{
// Start by getting the number of rows in the first group and checking if it includes this workers startRow
__int64 offset = (*rbatchItr)->get()->num_rows();
while (offset < startRow)
while (offset <= startRow)
{
rbatchItr++;
offset += (*rbatchItr)->get()->num_rows();
Expand Down Expand Up @@ -643,7 +654,7 @@ arrow::Status ParquetWriter::writePartition(std::shared_ptr<arrow::Table> table)
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(table);

StringBuffer basenameTemplate;
basenameTemplate.appendf("part_%d{i}_%lld.parquet",activityCtx->querySlave(), tablesProcessed++);
basenameTemplate.appendf("part_{i}_of_table_%lld_from_worker_%d.parquet", tablesProcessed++, activityCtx->querySlave());
writeOptions.basename_template = basenameTemplate.str();

ARROW_ASSIGN_OR_RAISE(auto scannerBuilder, dataset->NewScan());
Expand Down
3 changes: 2 additions & 1 deletion plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor
};

using TableColumns = std::unordered_map<std::string, std::shared_ptr<arrow::Array>>;
using NamedFileReader = std::tuple<std::string, std::shared_ptr<parquet::arrow::FileReader>>;

/**
* @brief Opens and reads Parquet files and partitioned datasets. The ParquetReader processes a file
Expand Down Expand Up @@ -422,7 +423,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr.
arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; // Iterator of RecordBatches when reading a partitioned dataset.
std::vector<__int64> fileTableCounts; // Count of RowGroups in each open file to get the correct row group when reading specific parts of the file.
std::vector<std::shared_ptr<parquet::arrow::FileReader>> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc.
std::vector<NamedFileReader> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc.
std::shared_ptr<parquet::FileMetaData> currentTableMetadata = nullptr; // Parquet metadata for the current table.
TableColumns parquetTable; // The current table being read broken up into columns. Unordered map where the left side is a string of the field name and the right side is an array of the values.
std::vector<std::string> partitionFields; // The partitioning schema for reading Directory Partitioned files.
Expand Down
2 changes: 0 additions & 2 deletions testing/regress/ecl/key/parquetEmpty.xml

This file was deleted.

18 changes: 12 additions & 6 deletions testing/regress/ecl/key/parquetPartition.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
<Dataset name='HivePartitionedSampleData'>
<Row><id>1</id><name>Alice </name><age>30</age><city>New York </city></Row>
<Row><id>2</id><name>Bob </name><age>25</age><city>Los Angeles </city></Row>
<Row><id>3</id><name>Charlie </name><age>40</age><city>Chicago </city></Row>
<Row><district>1</district><firstname>Alice</firstname><lastname>A</lastname><age>30</age><city>New York</city></Row>
<Row><district>1</district><firstname>Alice</firstname><lastname>B</lastname><age>35</age><city>New York</city></Row>
<Row><district>1</district><firstname>Chalice</firstname><lastname>C</lastname><age>40</age><city>Boston</city></Row>
<Row><district>2</district><firstname>Bob</firstname><lastname>A</lastname><age>25</age><city>Los Angeles</city></Row>
<Row><district>2</district><firstname>Jim</firstname><lastname>A</lastname><age>25</age><city>Los Angeles</city></Row>
<Row><district>3</district><firstname>Charlie</firstname><lastname>C</lastname><age>40</age><city>Chicago</city></Row>
</Dataset>
<Dataset name='DirPartitionedSampleData'>
<Row><id>1</id><name>Alice </name><age>30</age><city>New York </city></Row>
<Row><id>2</id><name>Bob </name><age>25</age><city>Los Angeles </city></Row>
<Row><id>3</id><name>Charlie </name><age>40</age><city>Chicago </city></Row>
<Row><district>1</district><firstname>Alice</firstname><lastname>A</lastname><age>30</age><city>New York</city></Row>
<Row><district>1</district><firstname>Alice</firstname><lastname>B</lastname><age>35</age><city>New York</city></Row>
<Row><district>1</district><firstname>Chalice</firstname><lastname>C</lastname><age>40</age><city>Boston</city></Row>
<Row><district>2</district><firstname>Bob</firstname><lastname>A</lastname><age>25</age><city>Los Angeles</city></Row>
<Row><district>2</district><firstname>Jim</firstname><lastname>A</lastname><age>25</age><city>Los Angeles</city></Row>
<Row><district>3</district><firstname>Charlie</firstname><lastname>C</lastname><age>40</age><city>Chicago</city></Row>
</Dataset>
7 changes: 2 additions & 5 deletions testing/regress/ecl/key/parquetSize.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
<Row><id>1</id><name>Alice</name><price>10.5</price><isactive>yes</isactive></Row>
<Row><id>2</id><name>Bob</name><price>20.75</price><isactive>no</isactive></Row>
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
<Row><id>1</id><name>A</name><price>10.0</price><isactive>1</isactive></Row>
</Dataset>
<Dataset name='multiDataset'>
<Row><id>1</id><name>A</name><price>10.0</price><isactive>1</isactive></Row>
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
<Row><id>2</id><name>Bob</name><price>20.75</price><isactive>no</isactive></Row>
<Row><id>1</id><name>A</name><price>10.0</price><isactive>1</isactive></Row>
<Row><id>1</id><name>Alice</name><price>10.5</price><isactive>yes</isactive></Row>
<Row><id>2</id><name>Bob</name><price>20.75</price><isactive>no</isactive></Row>
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
</Dataset>
8 changes: 0 additions & 8 deletions testing/regress/ecl/key/parquetString.xml

This file was deleted.

9 changes: 7 additions & 2 deletions testing/regress/ecl/key/parquetTypes.xml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
<Dataset name='SetTest'>
<Row><testid>1100</testid><testname>empty</testname><s0></s0><s1></s1><s2></s2><s3></s3><s4></s4><s5></s5><s6></s6><s7></s7><s8></s8><s9></s9><s10></s10></Row>
<Row><testid>1101</testid><testname>single</testname><s0><Item>true</Item></s0><s1><Item>1</Item></s1><s2><Item>1</Item></s2><s3><Item>1.0</Item></s3><s4><Item>1</Item></s4><s5><Item>a</Item></s5><s6><Item>a</Item></s6><s7><Item>A</Item></s7><s8><Item>a</Item></s8><s9><Item>a</Item></s9><s10><Item>FFFF</Item></s10></Row>
<Row><testid>1102</testid><testname>multiple</testname><s0><Item>true</Item><Item>false</Item></s0><s1><Item>1</Item><Item>2</Item></s1><s2><Item>1</Item><Item>2</Item></s2><s3><Item>1.0</Item><Item>2.0</Item></s3><s4><Item>1</Item><Item>2</Item></s4><s5><Item>a</Item><Item>b</Item></s5><s6><Item>a</Item><Item>b</Item></s6><s7><Item>A</Item><Item>B</Item></s7><s8><Item>a</Item><Item>b</Item></s8><s9><Item>a</Item><Item>b</Item></s9><s10><Item>0000</Item><Item>FFFF</Item></s10></Row>
</Dataset>
<Dataset name='BooleanTest'>
<Row><BooleanTest>Pass</BooleanTest></Row>
</Dataset>
Expand All @@ -16,8 +21,8 @@
<Dataset name='StringTest'>
<Row><StringTest>Pass</StringTest></Row>
</Dataset>
<Dataset name='DataAsStringTest'>
<Row><DataAsStringTest>Pass</DataAsStringTest></Row>
<Dataset name='DataTest'>
<Row><DataTest>Pass</DataTest></Row>
</Dataset>
<Dataset name='VarStringTest'>
<Row><VarStringTest>Pass</VarStringTest></Row>
Expand Down
13 changes: 5 additions & 8 deletions testing/regress/ecl/parquetCorrupt.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
//class=parquet
//fail
//nothor
//noroxe
//noroxie

//This ECL code reads a potentially corrupt Parquet file, handling the case where it might be corrupt
//by outputting either the file contents or a single informative record if the file can't be read.

IMPORT Std;
IMPORT Parquet;

RECORDDEF := RECORD
Expand All @@ -28,12 +27,10 @@ RECORDDEF := RECORD
STRING director;
END;

filePath2 := Std.File.GetDefaultDropZone() + '/corrupt.parquet';
string basePath := '' : STORED('OriginalTextFilesOsPath');

CORRUPT_PARQUET := ParquetIO.Read(RECORDDEF, filePath2);
filePath := basePath + '/download/corrupt.parquet';

CORRUPT_RESULT := IF(COUNT(CORRUPT_PARQUET) = 0,
DATASET([{0, 'Corrupt Parquet File', ''}], RECORDDEF),
CORRUPT_PARQUET);
CORRUPT_PARQUET := ParquetIO.Read(RECORDDEF, filePath);

OUTPUT(CORRUPT_RESULT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the output this workunit will not do anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

OUTPUT(CORRUPT_PARQUET);
6 changes: 4 additions & 2 deletions testing/regress/ecl/parquetDecimals.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

//class=parquet
//nothor,noroxie
//nothor
//noroxie

IMPORT STD;
IMPORT PARQUET;
Expand All @@ -39,7 +41,7 @@ decimalData := DATASET([{(DECIMAL) '0.12345678901234567890123456789012',

overwriteOption := TRUE;
dropzoneDirectory := Std.File.GetDefaultDropZone();
parquetFilePath := dropzoneDirectory + '/regress/decimal.parquet';
parquetFilePath := dropzoneDirectory + '/regress/parquet/decimal.parquet';

ParquetIO.Write(decimalData, parquetFilePath, overwriteOption);

Expand Down
6 changes: 5 additions & 1 deletion testing/regress/ecl/parquetEmpty.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
############################################################################## */

//class=parquet
//fail
//nothor
//noroxie

// This test tries writing an empty dataset to a Parquet file and then tries to read it.
// When writing Parquet files the plugin waits for a row before opening a file. The record
// is empty, therefore the file is never created and the read fails.

IMPORT Std;
IMPORT Parquet;

Expand All @@ -33,4 +38,3 @@ ParquetIO.Write(EMPTY_PARQUET, filePath, TRUE);
read_data := ParquetIO.Read(RECORDDEF, filePath);

OUTPUT(read_data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this deleted it will not read the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


2 changes: 0 additions & 2 deletions testing/regress/ecl/parquetOverwrite.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

//class=parquet
//fail
//nothor
//noroxie

IMPORT Std;
IMPORT Parquet;
Expand Down
65 changes: 56 additions & 9 deletions testing/regress/ecl/parquetPartition.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

//class=parquet
//nothor
//noroxie

IMPORT Std;
IMPORT Parquet;

// Define the record layout with explicit field lengths
datasetRecordLayout := RECORD
UNSIGNED4 id;
STRING25 name;
UNSIGNED4 district;
STRING firstname;
STRING lastname;
UNSIGNED4 age;
STRING50 city;
STRING city;
END;

// Create a small dataset - ensure all records have valid data
smallData := DATASET([
{1, 'Alice', 30, 'New York'},
{2, 'Bob', 25, 'Los Angeles'},
{3, 'Charlie', 40, 'Chicago'}
{1, 'Alice', 'A', 30, 'New York'},
{1, 'Alice', 'B', 35, 'New York'},
{1, 'Chalice', 'C', 40, 'Boston'},
{2, 'Bob', 'A', 25, 'Los Angeles'},
{2, 'Jim', 'A', 25, 'Los Angeles'},
{3, 'Charlie', 'C', 40, 'Chicago'}
], datasetRecordLayout);

// Set options
Expand All @@ -40,7 +43,51 @@ rowSize := 1024; // Increased buffer size
basePath := Std.File.GetDefaultDropZone() + '/regress/parquet/';

// Define partition keys as a semicolon-separated string with all keys
partitionKeys := 'id';
partitionKeys := 'district;firstname;city';

/**
* This partitioning creates a structure like this:
*
* Hive Partitioning:
* ├── district=1
* │ ├── firstname=Alice
* │ │ └── city=New%20York
* │ │ └── part_0_of_table_0_from_worker_0.parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These part... names are files? If so, they are all the same for every value combination. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct.

* │ └── firstname=Chalice
* │ └── city=Boston
* │ └── part_0_of_table_0_from_worker_0.parquet
* ├── district=2
* │ ├── firstname=Bob
* │ │ └── city=Los%20Angeles
* │ │ └── part_0_of_table_0_from_worker_0.parquet
* │ └── firstname=Jim
* │ └── city=Los%20Angeles
* │ └── part_0_of_table_0_from_worker_0.parquet
* └── district=3
* └── firstname=Charlie
* └── city=Chicago
* └── part_0_of_table_0_from_worker_0.parquet
*
* Directory Partitioning:
* ├── 1
* │ ├── Alice
* │ │ └── New York
* │ │ └── part_0_of_table_0_from_worker_0.parquet
* │ └── Chalice
* │ └── Boston
* │ └── part_0_of_table_0_from_worker_0.parquet
* ├── 2
* │ ├── Bob
* │ │ └── Los Angeles
* │ │ └── part_0_of_table_0_from_worker_0.parquet
* │ └── Jim
* │ └── Los Angeles
* │ └── part_0_of_table_0_from_worker_0.parquet
* └── 3
* └── Charlie
* └── Chicago
* └── part_0_of_table_0_from_worker_0.parquet
*/

// Write out the dataset with Hive partitioning on all keys
ParquetIO.HivePartition.Write(
Expand Down
1 change: 0 additions & 1 deletion testing/regress/ecl/parquetSchema.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//class=parquet
//nothor
//noroxie
//fail

IMPORT Std;
IMPORT Parquet;
Expand Down
10 changes: 3 additions & 7 deletions testing/regress/ecl/parquetSize.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
##############################################################################*/

//class=parquet
//nothor
//noroxie
//nohthor

IMPORT Std;
IMPORT Parquet;

recordLayout := RECORD
Expand All @@ -29,10 +25,10 @@ recordLayout := RECORD
STRING isactive;
END;

basePath := Std.File.GetDefaultDropZone() + '/regress/parquet/';
string basePath := '' : STORED('OriginalTextFilesOsPath');

singleFilePath := basePath + 'single.parquet';
multiFilePath := basePath + 'multi.parquet';
singleFilePath := basePath + '/download/single.parquet';
multiFilePath := basePath + '/download/multi.parquet';

// Reading the single and multi-part files
singleDataset := ParquetIO.Read(recordLayout, singleFilePath);
Expand Down
Loading
Loading