From 2ca352cfd5bd005ae4bf8c7acece730421f89d86 Mon Sep 17 00:00:00 2001
From: Ben McDonald <46734217+bmcdonald3@users.noreply.github.com>
Date: Tue, 9 Jul 2024 16:33:57 -0700
Subject: [PATCH] Seperate string read code (#3418)
---
src/ParquetMsg.chpl | 12 +++----
src/parquet/ReadParquet.cpp | 63 +++++++++++++++++++++++++++++++++++++
src/parquet/ReadParquet.h | 4 ++-
3 files changed, 72 insertions(+), 7 deletions(-)
diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl
index c7360c08bd..5222f2d41f 100644
--- a/src/ParquetMsg.chpl
+++ b/src/ParquetMsg.chpl
@@ -171,8 +171,8 @@ module ParquetMsg {
}
}
- proc readStrFilesByName(A: [] ?t, ref whereNull: [] bool, filenames: [] string, sizes: [] int, dsetname: string, ty) throws {
- extern proc c_readColumnByName(filename, arr_chpl, where_null_chpl, colNum, numElems, startIdx, batchSize, byteLength, hasNonFloatNulls, errMsg): int;
+ proc readStrFilesByName(ref A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws {
+ extern proc c_readStrColumnByName(filename, arr_chpl, colname, batchSize, errMsg): int;
var (subdoms, length) = getSubdomains(sizes);
coforall loc in A.targetLocales() do on loc {
@@ -187,9 +187,9 @@ module ParquetMsg {
var pqErr = new parquetErrorMsg();
var col: [filedom] t;
- if c_readColumnByName(filename.localize().c_str(), c_ptrTo(col), c_ptrTo(whereNull[intersection.low]),
- dsetname.localize().c_str(), intersection.size, 0,
- batchSize, -1, false, c_ptrTo(pqErr.errMsg)) == ARROWERROR {
+ if c_readStrColumnByName(filename.localize().c_str(), c_ptrTo(col),
+ dsetname.localize().c_str(),
+ batchSize, c_ptrTo(pqErr.errMsg)) == ARROWERROR {
pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName());
}
A[filedom] = col;
@@ -997,7 +997,7 @@ module ParquetMsg {
// Read into distributed array
var entryVal = new shared SymEntry((+ reduce byteSizes), uint(8));
- readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);
+ readStrFilesByName(entryVal.a, filenames, byteSizes, dsetname);
var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st);
rnames.pushBack((dsetname, ObjType.STRINGS, "%s+%?".format(stringsEntry.name, stringsEntry.nBytes)));
diff --git a/src/parquet/ReadParquet.cpp b/src/parquet/ReadParquet.cpp
index a3206aa9fe..096833bd40 100644
--- a/src/parquet/ReadParquet.cpp
+++ b/src/parquet/ReadParquet.cpp
@@ -95,6 +95,65 @@ void readColumnIrregularBitWidth(void* chpl_arr, int startIdx, std::shared_ptr
parquet_reader =
+ parquet::ParquetFileReader::OpenFile(filename, false);
+
+ std::shared_ptr file_metadata = parquet_reader->metadata();
+ int num_row_groups = file_metadata->num_row_groups();
+
+ int64_t i = 0;
+ for (int r = 0; r < num_row_groups; r++) {
+ std::shared_ptr row_group_reader =
+ parquet_reader->RowGroup(r);
+
+ int64_t values_read = 0;
+
+ std::shared_ptr column_reader;
+
+ auto idx = file_metadata -> schema() -> ColumnIndex(colname);
+ auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed
+
+ if(idx < 0) {
+ std::string dname(colname);
+ std::string fname(filename);
+ std::string msg = "Dataset: " + dname + " does not exist in file: " + fname;
+ *errMsg = strdup(msg.c_str());
+ return ARROWERROR;
+ }
+
+ column_reader = row_group_reader->Column(idx);
+
+ if(ty == ARROWSTRING) {
+ int16_t definition_level; // nullable type and only reading single records in batch
+ auto chpl_ptr = (unsigned char*)chpl_arr;
+ parquet::ByteArrayReader* reader =
+ static_cast(column_reader.get());
+
+ while (reader->HasNext()) {
+ parquet::ByteArray value;
+ (void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
+ // if values_read is 0, that means that it was a null value
+ if(values_read > 0) {
+ for(int j = 0; j < value.len; j++) {
+ chpl_ptr[i] = value.ptr[j];
+ i++;
+ }
+ }
+ i++; // skip one space so the strings are null terminated with a 0
+ }
+ }
+ }
+ return 0;
+ } catch (const std::exception& e) {
+ *errMsg = strdup(e.what());
+ return ARROWERROR;
+ }
+}
+
int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);
@@ -606,6 +665,10 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
}
extern "C" {
+ int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) {
+ return cpp_readStrColumnByName(filename, chpl_arr, colname, batchSize, errMsg);
+ }
+
int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) {
return cpp_readColumnByName(filename, chpl_arr, where_null_chpl, colname, numElems, startIdx, batchSize, byteLength, hasNonFloatNulls, errMsg);
}
diff --git a/src/parquet/ReadParquet.h b/src/parquet/ReadParquet.h
index 52e1871f0c..a0a62d788d 100644
--- a/src/parquet/ReadParquet.h
+++ b/src/parquet/ReadParquet.h
@@ -15,7 +15,9 @@
#include
extern "C" {
#endif
-
+ int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);
+
+ int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg);
int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl,
const char* colname, int64_t numElems, int64_t startIdx,