Skip to content

Commit

Permalink
Seperate string read code (Bears-R-Us#3418)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmcdonald3 authored Jul 9, 2024
1 parent f532daa commit 2ca352c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 7 deletions.
12 changes: 6 additions & 6 deletions src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ module ParquetMsg {
}
}

proc readStrFilesByName(A: [] ?t, ref whereNull: [] bool, filenames: [] string, sizes: [] int, dsetname: string, ty) throws {
extern proc c_readColumnByName(filename, arr_chpl, where_null_chpl, colNum, numElems, startIdx, batchSize, byteLength, hasNonFloatNulls, errMsg): int;
proc readStrFilesByName(ref A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws {
extern proc c_readStrColumnByName(filename, arr_chpl, colname, batchSize, errMsg): int;
var (subdoms, length) = getSubdomains(sizes);

coforall loc in A.targetLocales() do on loc {
Expand All @@ -187,9 +187,9 @@ module ParquetMsg {
var pqErr = new parquetErrorMsg();
var col: [filedom] t;

if c_readColumnByName(filename.localize().c_str(), c_ptrTo(col), c_ptrTo(whereNull[intersection.low]),
dsetname.localize().c_str(), intersection.size, 0,
batchSize, -1, false, c_ptrTo(pqErr.errMsg)) == ARROWERROR {
if c_readStrColumnByName(filename.localize().c_str(), c_ptrTo(col),
dsetname.localize().c_str(),
batchSize, c_ptrTo(pqErr.errMsg)) == ARROWERROR {
pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName());
}
A[filedom] = col;
Expand Down Expand Up @@ -997,7 +997,7 @@ module ParquetMsg {

// Read into distributed array
var entryVal = new shared SymEntry((+ reduce byteSizes), uint(8));
readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);
readStrFilesByName(entryVal.a, filenames, byteSizes, dsetname);

var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st);
rnames.pushBack((dsetname, ObjType.STRINGS, "%s+%?".format(stringsEntry.name, stringsEntry.nBytes)));
Expand Down
63 changes: 63 additions & 0 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,65 @@ void readColumnIrregularBitWidth(void* chpl_arr, int startIdx, std::shared_ptr<p
}
}

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);

std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(filename, false);

std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
int num_row_groups = file_metadata->num_row_groups();

int64_t i = 0;
for (int r = 0; r < num_row_groups; r++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);

int64_t values_read = 0;

std::shared_ptr<parquet::ColumnReader> column_reader;

auto idx = file_metadata -> schema() -> ColumnIndex(colname);
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

if(idx < 0) {
std::string dname(colname);
std::string fname(filename);
std::string msg = "Dataset: " + dname + " does not exist in file: " + fname;
*errMsg = strdup(msg.c_str());
return ARROWERROR;
}

column_reader = row_group_reader->Column(idx);

if(ty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (unsigned char*)chpl_arr;
parquet::ByteArrayReader* reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

while (reader->HasNext()) {
parquet::ByteArray value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// if values_read is 0, that means that it was a null value
if(values_read > 0) {
for(int j = 0; j < value.len; j++) {
chpl_ptr[i] = value.ptr[j];
i++;
}
}
i++; // skip one space so the strings are null terminated with a 0
}
}
}
return 0;
} catch (const std::exception& e) {
*errMsg = strdup(e.what());
return ARROWERROR;
}
}

int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);
Expand Down Expand Up @@ -606,6 +665,10 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
}

extern "C" {
int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) {
return cpp_readStrColumnByName(filename, chpl_arr, colname, batchSize, errMsg);
}

int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
return cpp_readColumnByName(filename, chpl_arr, where_null_chpl, colname, numElems, startIdx, batchSize, byteLength, hasNonFloatNulls, errMsg);
}
Expand Down
4 changes: 3 additions & 1 deletion src/parquet/ReadParquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <queue>
extern "C" {
#endif

int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);

int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl,
const char* colname, int64_t numElems, int64_t startIdx,
Expand Down

0 comments on commit 2ca352c

Please sign in to comment.