Skip to content

Commit

Permalink
Closes Bears-R-Us#3419: Remove intertwined list column and string col…
Browse files Browse the repository at this point in the history
…umn byte calculation logic (Bears-R-Us#3420)

* Update Chapel code

* Convert C++ code to split string and list byte calc
  • Loading branch information
bmcdonald3 authored Jul 10, 2024
1 parent 2ca352c commit 277e226
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
40 changes: 39 additions & 1 deletion src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,29 @@ module ParquetMsg {
return byteSizes;
}

proc calcStrListSizesAndOffset(offsets: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws {
var (subdoms, length) = getSubdomains(sizes);

var byteSizes: [filenames.domain] int;

coforall loc in offsets.targetLocales() with (ref byteSizes) do on loc {
var locFiles = filenames;
var locFiledoms = subdoms;

forall (i, filedom, filename) in zip(sizes.domain, locFiledoms, locFiles) {
for locdom in offsets.localSubdomains() {
const intersection = domain_intersection(locdom, filedom);
if intersection.size > 0 {
var col: [filedom] t;
byteSizes[i] = getStrListColSize(filename, dsetname, col);
offsets[filedom] = col;
}
}
}
}
return byteSizes;
}

proc getNullIndices(A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string, ty) throws {
extern proc c_getStringColumnNullIndices(filename, colname, nulls_chpl, errMsg): int;
var (subdoms, length) = getSubdomains(sizes);
Expand Down Expand Up @@ -317,6 +340,21 @@ module ParquetMsg {
return byteSize;
}

proc getStrListColSize(filename: string, dsetname: string, ref offsets: [] int) throws {
extern proc c_getStringListColumnNumBytes(filename, colname, offsets, numElems, startIdx, batchSize, errMsg): int;
var pqErr = new parquetErrorMsg();

var byteSize = c_getStringListColumnNumBytes(filename.localize().c_str(),
dsetname.localize().c_str(),
c_ptrTo(offsets),
offsets.size, 0, 256,
c_ptrTo(pqErr.errMsg));

if byteSize == ARROWERROR then
pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName());
return byteSize;
}

proc getListColSize(filename: string, dsetname: string, ref seg_sizes: [] int) throws {
extern proc c_getListColumnSize(filename, colname, seg_sizes, numElems, startIdx, errMsg): int;
var pqErr = new parquetErrorMsg();
Expand Down Expand Up @@ -669,7 +707,7 @@ module ParquetMsg {
}
else if ty == ArrowTypes.stringArr {
var entrySeg = createSymEntry((+ reduce listSizes), int);
var byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, listSizes, dsetname);
var byteSizes = calcStrListSizesAndOffset(entrySeg.a, filenames, listSizes, dsetname);
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;

var entryVal = createSymEntry((+ reduce byteSizes), uint(8));
Expand Down
67 changes: 67 additions & 0 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,69 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
}

int64_t cpp_getStringColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);
auto offsets = (int64_t*)chpl_offsets;
int64_t byteSize = 0;

if(ty == ARROWSTRING) {
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(filename, false);

std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
int num_row_groups = file_metadata->num_row_groups();

int64_t i = 0;
for (int r = 0; r < num_row_groups; r++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);

int64_t values_read = 0;

std::shared_ptr<parquet::ColumnReader> column_reader;

int64_t idx;
idx = file_metadata -> schema() -> ColumnIndex(colname);

if(idx < 0) {
std::string dname(colname);
std::string fname(filename);
std::string msg = "Dataset: " + dname + " does not exist in file: " + fname;
*errMsg = strdup(msg.c_str());
return ARROWERROR;
}
column_reader = row_group_reader->Column(idx);

int16_t definition_level;
parquet::ByteArrayReader* ba_reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

int64_t numRead = 0;
while (ba_reader->HasNext() && numRead < numElems) {
parquet::ByteArray value;
(void)ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
if(values_read > 0) {
offsets[i] = value.len + 1;
byteSize += value.len + 1;
numRead += values_read;
} else {
offsets[i] = 1;
byteSize+=1;
numRead+=1;
}
i++;
}
}
return byteSize;
}
return ARROWERROR;
} catch (const std::exception& e) {
*errMsg = strdup(e.what());
return ARROWERROR;
}
}

int64_t cpp_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);
int64_t dty; // used to store the type of data so we can handle lists
Expand Down Expand Up @@ -684,4 +747,8 @@ extern "C" {
int64_t c_getListColumnSize(const char* filename, const char* colname, void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg) {
return cpp_getListColumnSize(filename, colname, chpl_seg_sizes, numElems, startIdx, errMsg);
}

int64_t c_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg) {
return cpp_getStringListColumnNumBytes(filename, colname, chpl_offsets, numElems, startIdx, batchSize, errMsg);
}
}
3 changes: 3 additions & 0 deletions src/parquet/ReadParquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ extern "C" {
void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg);
int64_t cpp_getListColumnSize(const char* filename, const char* colname,
void* chpl_seg_sizes, int64_t numElems, int64_t startIdx, char** errMsg);

int64_t c_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg);
int64_t cpp_getStringListColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, int64_t batchSize, char** errMsg);

#ifdef __cplusplus
}
Expand Down

0 comments on commit 277e226

Please sign in to comment.