Skip to content

Commit

Permalink
Add a sort to vector of FileReaders
Browse files Browse the repository at this point in the history
- createDirectoryIterator is used to list all matching Parquet files and the order is not guarenteed
- Fix parquetEmpty.ecl and parquetCorrupt.ecl by adding the OUTPUT statements back
  • Loading branch information
jackdelv committed Jan 14, 2025
1 parent c1fcef7 commit 111a0d2
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 7 deletions.
18 changes: 13 additions & 5 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,23 @@ arrow::Status ParquetReader::openReadFile()
ForEach (*itr)
{
IFile &file = itr->query();
const char *filename = file.queryFilename();
parquet::arrow::FileReaderBuilder readerBuilder;
reportIfFailure(readerBuilder.OpenFile(file.queryFilename(), false, readerProperties));
reportIfFailure(readerBuilder.OpenFile(filename, false, readerProperties));
readerBuilder.memory_pool(pool);
readerBuilder.properties(arrowReaderProps);
std::unique_ptr<parquet::arrow::FileReader> parquetFileReader;
reportIfFailure(readerBuilder.Build(&parquetFileReader));
parquetFileReaders.push_back(std::move(parquetFileReader));
parquetFileReaders.emplace_back(filename, std::move(parquetFileReader));
}

auto sortFileReaders = [](NamedFileReader &a, NamedFileReader &b) -> bool
{
return strcmp(std::get<0>(a).c_str(), std::get<0>(b).c_str()) < 0;
};

std::sort(parquetFileReaders.begin(), parquetFileReaders.end(), sortFileReaders);

if (parquetFileReaders.empty())
failx("Parquet file %s not found", location.c_str());
}
Expand Down Expand Up @@ -361,8 +369,8 @@ std::shared_ptr<parquet::arrow::RowGroupReader> ParquetReader::queryCurrentTable
tables += fileTableCounts[i];
if (currTable < tables)
{
currentTableMetadata = parquetFileReaders[i]->parquet_reader()->metadata()->Subset({static_cast<int>(currTable - offset)});
return parquetFileReaders[i]->RowGroup(currTable - offset);
currentTableMetadata = std::get<1>(parquetFileReaders[i])->parquet_reader()->metadata()->Subset({static_cast<int>(currTable - offset)});
return std::get<1>(parquetFileReaders[i])->RowGroup(currTable - offset);
}
offset = tables;
}
Expand Down Expand Up @@ -393,7 +401,7 @@ arrow::Status ParquetReader::processReadFile()

for (int i = 0; i < parquetFileReaders.size(); i++)
{
__int64 tables = parquetFileReaders[i]->num_row_groups();
__int64 tables = std::get<1>(parquetFileReaders[i])->num_row_groups();
fileTableCounts.push_back(tables);
totalTables += tables;
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor
};

using TableColumns = std::unordered_map<std::string, std::shared_ptr<arrow::Array>>;
using NamedFileReader = std::tuple<std::string, std::shared_ptr<parquet::arrow::FileReader>>;

/**
* @brief Opens and reads Parquet files and partitioned datasets. The ParquetReader processes a file
Expand Down Expand Up @@ -422,7 +423,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr.
arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; // Iterator of RecordBatches when reading a partitioned dataset.
std::vector<__int64> fileTableCounts; // Count of RowGroups in each open file to get the correct row group when reading specific parts of the file.
std::vector<std::shared_ptr<parquet::arrow::FileReader>> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc.
std::vector<NamedFileReader> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc.
std::shared_ptr<parquet::FileMetaData> currentTableMetadata = nullptr; // Parquet metadata for the current table.
TableColumns parquetTable; // The current table being read broken up into columns. Unordered map where the left side is a string of the field name and the right side is an array of the values.
std::vector<std::string> partitionFields; // The partitioning schema for reading Directory Partitioned files.
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/key/parquetSize.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
</Dataset>
<Dataset name='multiDataset'>
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
<Row><id>1</id><name>Alice</name><price>10.5</price><isactive>yes</isactive></Row>
<Row><id>2</id><name>Bob</name><price>20.75</price><isactive>no</isactive></Row>
<Row><id>3</id><name>Charlie</name><price>15.25</price><isactive>yes</isactive></Row>
</Dataset>
2 changes: 2 additions & 0 deletions testing/regress/ecl/parquetCorrupt.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ string basePath := '' : STORED('OriginalTextFilesOsPath');
filePath := basePath + '/download/corrupt.parquet';

CORRUPT_PARQUET := ParquetIO.Read(RECORDDEF, filePath);

OUTPUT(CORRUPT_PARQUET);
2 changes: 2 additions & 0 deletions testing/regress/ecl/parquetEmpty.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ EMPTY_PARQUET := DATASET([], RECORDDEF);
ParquetIO.Write(EMPTY_PARQUET, filePath, TRUE);

read_data := ParquetIO.Read(RECORDDEF, filePath);

OUTPUT(read_data);

0 comments on commit 111a0d2

Please sign in to comment.