diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 5222f2d41f..a9dad49964 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -276,6 +276,29 @@ module ParquetMsg { return byteSizes; } + proc calcStrListSizesAndOffset(offsets: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws { + var (subdoms, length) = getSubdomains(sizes); + + var byteSizes: [filenames.domain] int; + + coforall loc in offsets.targetLocales() with (ref byteSizes) do on loc { + var locFiles = filenames; + var locFiledoms = subdoms; + + forall (i, filedom, filename) in zip(sizes.domain, locFiledoms, locFiles) { + for locdom in offsets.localSubdomains() { + const intersection = domain_intersection(locdom, filedom); + if intersection.size > 0 { + var col: [filedom] t; + byteSizes[i] = getStrListColSize(filename, dsetname, col); + offsets[filedom] = col; + } + } + } + } + return byteSizes; + } + proc getNullIndices(A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string, ty) throws { extern proc c_getStringColumnNullIndices(filename, colname, nulls_chpl, errMsg): int; var (subdoms, length) = getSubdomains(sizes); @@ -317,6 +340,21 @@ module ParquetMsg { return byteSize; } + proc getStrListColSize(filename: string, dsetname: string, ref offsets: [] int) throws { + extern proc c_getStringListColumnNumBytes(filename, colname, offsets, numElems, startIdx, batchSize, errMsg): int; + var pqErr = new parquetErrorMsg(); + + var byteSize = c_getStringListColumnNumBytes(filename.localize().c_str(), + dsetname.localize().c_str(), + c_ptrTo(offsets), + offsets.size, 0, 256, + c_ptrTo(pqErr.errMsg)); + + if byteSize == ARROWERROR then + pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + return byteSize; + } + proc getListColSize(filename: string, dsetname: string, ref seg_sizes: [] int) throws { extern proc c_getListColumnSize(filename, colname, seg_sizes, numElems, startIdx, errMsg): int; var pqErr = new parquetErrorMsg(); @@ -669,7 +707,7 @@ module ParquetMsg { } else if ty == ArrowTypes.stringArr { var entrySeg = createSymEntry((+ reduce listSizes), int); - var byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, listSizes, dsetname); + var byteSizes = calcStrListSizesAndOffset(entrySeg.a, filenames, listSizes, dsetname); entrySeg.a = (+ scan entrySeg.a) - entrySeg.a; var entryVal = createSymEntry((+ reduce byteSizes), uint(8)); diff --git a/src/parquet/ReadParquet.cpp b/src/parquet/ReadParquet.cpp index 096833bd40..071b3bc36d 100644 --- a/src/parquet/ReadParquet.cpp +++ b/src/parquet/ReadParquet.cpp @@ -399,6 +399,69 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c } int64_t cpp_getStringColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) { + try { + int64_t ty = cpp_getType(filename, colname, errMsg); + auto offsets = (int64_t*)chpl_offsets; + int64_t byteSize = 0; + + if(ty == ARROWSTRING) { + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile(filename, false); + + std::shared_ptr file_metadata = parquet_reader->metadata(); + int num_row_groups = file_metadata->num_row_groups(); + + int64_t i = 0; + for (int r = 0; r < num_row_groups; r++) { + std::shared_ptr row_group_reader = + parquet_reader->RowGroup(r); + + int64_t values_read = 0; + + std::shared_ptr column_reader; + + int64_t idx; + idx = file_metadata -> schema() -> ColumnIndex(colname); + + if(idx < 0) { + std::string dname(colname); + std::string fname(filename); + std::string msg = "Dataset: " + dname + " does not exist in file: " + fname; + *errMsg = strdup(msg.c_str()); + return ARROWERROR; + } + column_reader = row_group_reader->Column(idx); + + int16_t definition_level; + parquet::ByteArrayReader* ba_reader = + static_cast(column_reader.get()); + + int64_t numRead = 0; + while (ba_reader->HasNext() && numRead < numElems) { + parquet::ByteArray value; + (void)ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + if(values_read > 0) { + offsets[i] = value.len + 1; + byteSize += value.len + 1; + numRead += values_read; + } else { + offsets[i] = 1; + byteSize+=1; + numRead+=1; + } + i++; + } + } + return byteSize; + } + return ARROWERROR; + } catch (const std::exception& e) { + *errMsg = strdup(e.what()); + return ARROWERROR; + } +} + +int64_t cpp_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) { try { int64_t ty = cpp_getType(filename, colname, errMsg); int64_t dty; // used to store the type of data so we can handle lists @@ -684,4 +747,8 @@ extern "C" { int64_t c_getListColumnSize(const char* filename, const char* colname, void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg) { return cpp_getListColumnSize(filename, colname, chpl_seg_sizes, numElems, startIdx, errMsg); } + + int64_t c_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) { + return cpp_getStringListColumnNumBytes(filename, colname, chpl_offsets, numElems, startIdx, batchSize, errMsg); + } } diff --git a/src/parquet/ReadParquet.h b/src/parquet/ReadParquet.h index a0a62d788d..6a90ee07e3 100644 --- a/src/parquet/ReadParquet.h +++ b/src/parquet/ReadParquet.h @@ -42,6 +42,9 @@ extern "C" { void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg); int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg); + + int64_t c_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg); + int64_t cpp_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg); #ifdef __cplusplus }