Skip to content

Commit

Permalink
Improve the CSV Read performance with new impl (Bears-R-Us#3238)
Browse files Browse the repository at this point in the history
* Improve the CSV Read performance with new impl

This improves the CSV read performance, especially in multilocale and
multi file settings where the performance previously was abysmal. See PR
for more info.

Signed-off-by: Shreyas Khandekar <[email protected]>

* Don't use readDeserializable for 1.31 compat

Add it back once 1.31 support is dropped

Signed-off-by: Shreyas Khandekar <[email protected]>

* Bring back up the problem size for the csv benchmark

This problem size had been reduced since it was timing out with the old
implementation. Now with this new one, everything should be faster and
should not time out

Signed-off-by: Shreyas Khandekar <[email protected]>

* Not use try! in regex initializer

Also not compile regex when the type isn't string

Signed-off-by: Shreyas Khandekar <[email protected]>

* Use readDeserializable interface

Now that we've dropped 1.31 support, we can add this back

Signed-off-by: Shreyas Khandekar <[email protected]>

---------

Signed-off-by: Shreyas Khandekar <[email protected]>
  • Loading branch information
ShreyasKhandekar authored Jun 6, 2024
1 parent 4a628b8 commit 25d4cd8
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 41 deletions.
2 changes: 1 addition & 1 deletion benchmarks/csvIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def create_parser():
parser.add_argument("hostname", help="Hostname of arkouda server")
parser.add_argument("port", type=int, help="Port of arkouda server")
parser.add_argument(
"-n", "--size", type=int, default=2*10**5, help="Problem size: length of array to read/write"
"-n", "--size", type=int, default=10**6, help="Problem size: length of array to read/write"
)
parser.add_argument(
"-t", "--trials", type=int, default=1, help="Number of times to run the benchmark"
Expand Down
152 changes: 112 additions & 40 deletions src/CSVMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ module CSVMsg {

coforall loc in Locales do on loc {
const localeFilename = filenames[loc.id];

// create the file to write to
var csvFile = open(localeFilename, ioMode.cw);
var writer = csvFile.writer(locking=false);
Expand Down Expand Up @@ -312,13 +312,13 @@ module CSVMsg {
var hasHeader = false;
var dtype_idx = 0;
var column_name_idx = 0;

if lines[0] == CSV_HEADER_OPEN + "\n" {
hasHeader = true;
dtype_idx = 1;
column_name_idx = 3;
}

var columns = lines[column_name_idx].split(col_delim).strip();
var file_dtypes: [0..#columns.size] string;
if dtype_idx > 0 {
Expand All @@ -342,7 +342,7 @@ module CSVMsg {
throw getErrorWithContext(
msg="The dataset %s was not found in %s".doFormat(dset, filename),
lineNumber=getLineNumber(),
routineName=getRoutineName(),
routineName=getRoutineName(),
moduleName=getModuleName(),
errorClass="DatasetNotFoundError");
}
Expand All @@ -353,6 +353,112 @@ module CSVMsg {
return (row_ct, hasHeader, new list(dtypes));
}

proc read_csv_pattern(ref A: [] ?t, filename: string, filedom: domain(1), colName: string, colDelim: string, hasHeaders: bool, lineOffset: int, allowErrors: bool, ref hadError: bool) throws {
// We do a check to see if the filedome even intersects with
// A.localSubdomain before even opening the file
// The implementation assumes a single local subdomain so we make
// sure that is the case first
if !A.hasSingleLocalSubdomain() then
throw getErrorWithContext(
msg="The array A must have a single local subdomain on locale %i".doFormat(here.id),
lineNumber=getLineNumber(),
routineName=getRoutineName(),
moduleName=getModuleName(),
errorClass="InvalidArgumentError");
const intersection = domain_intersection(A.localSubdomain(), filedom);
if(intersection.size == 0) {
// There is nothing to be done for this file on this locale
return;
}

var fr = openReader(filename);
if(hasHeaders) {
var line:string;
// The first three lines are headers we don't care about
// Advance through them without reading them in
for param i in 0..<3 {
try {fr.advanceThrough(b'\n');}
catch {
throw getErrorWithContext(
msg="This CSV file is missing header values when headers were detected. This error should not arise.",
lineNumber=getLineNumber(),
routineName=getRoutineName(),
moduleName=getModuleName(),
errorClass="IOError");
}
}
}
var colIdx = -1;
// This next line will have the columns
for (column,i) in zip(fr.readLine(stripNewline = true).split(colDelim), 0..) {
if column == colName {
colIdx=i;
break;
}
}

if(colIdx == -1) then
throw getErrorWithContext(
msg="The dataset %s was not found in %s".doFormat(colName, filename),
lineNumber=getLineNumber(),
routineName=getRoutineName(),
moduleName=getModuleName(),
errorClass="DatasetNotFoundError");

var line = new csvLine(t, colIdx, colDelim);
for x in intersection {
try {
fr.read(line);
} catch e: BadFormatError {
// Handle errors
if t!=string {
if allowErrors {
A[x] = min(t);
continue;
} else {
hadError |= true;
}
} else {
A[x]="";
}
}
// store the items from the csvLine in the array
A[x] = line.item;
}
}

use Regex;
record csvLine: readDeserializable {

type itemType;
var item: itemType;
const colIdx: int;
const colDelim: string;
var r: regex(string);

proc init(type itemType, colIdx: int, colDelim: string) throws {
this.itemType = itemType;
this.colIdx = colIdx;
this.colDelim = colDelim;
init this;
if this.itemType == string then
this.r = new regex("[\n"+this.colDelim+"]");
}

proc ref deserialize(reader: fileReader(?), ref deserializer) throws {
// read the comma delimited items in a single line
for 0..<colIdx {
reader.advanceThrough(this.colDelim:bytes); // Skip over the columns we don't care about
}
if itemType == string then
this.item = reader.readTo(this.r); // Throws BadFormatError if no "," or "\n"
else
var success = reader.read(this.item);
// stop reading once a '\n' is encountered
reader.advanceThrough(b'\n'); // Will throw BadFormatError if no newline
}
}

proc read_files_into_dist_array(ref A: [?D] ?t, dset: string, filenames: [] string, filedomains: [] domain(1), skips: set(string), hasHeaders: bool, col_delim: string, offsets: [] int, allowErrors: bool) throws {
var hadError = false;
coforall loc in A.targetLocales() with (ref A, | reduce hadError) do on loc {
Expand All @@ -361,47 +467,13 @@ module CSVMsg {
var locFiledoms = filedomains;
/* On this locale, find all files containing data that belongs in
this locale's chunk of A */
for (filedom, filename, file_idx) in zip(locFiledoms, locFiles, 0..) {
forall (filedom, filename, file_idx) in zip(locFiledoms, locFiles, 0..) with (ref A, | reduce hadError) {
if (skips.contains(filename)) {
csvLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"File %s does not contain data for this dataset, skipping".doFormat(filename));
continue;
} else {
var lines = openReader(filename).lines().strip();
var data_offset = 1;
if hasHeaders {
data_offset = 4;
}
// determine the index of the column.
var column_names = lines[data_offset-1].split(col_delim);
var colidx: int;
var colExists = column_names.find(dset, colidx);
if !colExists{
throw getErrorWithContext(
msg="The dataset %s was not found in %s".doFormat(dset, filename),
lineNumber=getLineNumber(),
routineName=getRoutineName(),
moduleName=getModuleName(),
errorClass="DatasetNotFoundError");
}
for locdom in A.localSubdomains() {
const intersection = domain_intersection(locdom, filedom);
if intersection.size > 0 {
forall x in intersection with (| reduce hadError) {
var row = lines[x-offsets[file_idx]+data_offset].split(col_delim);
if t == string || row[colidx] != "" {
// only write into A[x] if the value is non-empty and the col dtype is not a string
A[x] = row[colidx]: t;
}
else if allowErrors {
A[x] = min(t);
}
else {
hadError |= true;
}
}
}
}
read_csv_pattern(A, filename, filedom, dset, col_delim, hasHeaders, offsets[file_idx], allowErrors, hadError);
}
}
}
Expand Down

0 comments on commit 25d4cd8

Please sign in to comment.