From 25d4cd81777977f2a6980c362d5801a896031884 Mon Sep 17 00:00:00 2001 From: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> Date: Thu, 6 Jun 2024 13:33:23 -0700 Subject: [PATCH] Improve the CSV Read performance with new impl (#3238) * Improve the CSV Read performance with new impl This improves the CSV read performance, especially in multilocale and multi file settings where the performance previously was abysmal. See PR for more info. Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> * Don't use readDeserializable for 1.31 compat Add it back once 1.31 support is dropped Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> * Bring back up the problem size for the csv benchmark This problem size had been reduced since it was timing out with the old implementation. Now with this new one, everything should be faster and should not time out Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> * Not use try! in regex initializer Also not compile regex when the type isn't string Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> * Use readDeserializable interface Now that we've dropped 1.31 support, we can add this back Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> --------- Signed-off-by: Shreyas Khandekar <60454060+ShreyasKhandekar@users.noreply.github.com> --- benchmarks/csvIO.py | 2 +- src/CSVMsg.chpl | 152 ++++++++++++++++++++++++++++++++------------ 2 files changed, 113 insertions(+), 41 deletions(-) diff --git a/benchmarks/csvIO.py b/benchmarks/csvIO.py index 97839ea8d6..76ad3cc74b 100644 --- a/benchmarks/csvIO.py +++ b/benchmarks/csvIO.py @@ -17,7 +17,7 @@ def create_parser(): parser.add_argument("hostname", help="Hostname of arkouda server") parser.add_argument("port", type=int, help="Port of arkouda server") parser.add_argument( - "-n", "--size", type=int, default=2*10**5, help="Problem size: length of array to read/write" + "-n", "--size", type=int, default=10**6, help="Problem size: length of array to read/write" ) parser.add_argument( "-t", "--trials", type=int, default=1, help="Number of times to run the benchmark" diff --git a/src/CSVMsg.chpl b/src/CSVMsg.chpl index 10d766d252..433850da32 100644 --- a/src/CSVMsg.chpl +++ b/src/CSVMsg.chpl @@ -227,7 +227,7 @@ module CSVMsg { coforall loc in Locales do on loc { const localeFilename = filenames[loc.id]; - + // create the file to write to var csvFile = open(localeFilename, ioMode.cw); var writer = csvFile.writer(locking=false); @@ -312,13 +312,13 @@ module CSVMsg { var hasHeader = false; var dtype_idx = 0; var column_name_idx = 0; - + if lines[0] == CSV_HEADER_OPEN + "\n" { hasHeader = true; dtype_idx = 1; column_name_idx = 3; } - + var columns = lines[column_name_idx].split(col_delim).strip(); var file_dtypes: [0..#columns.size] string; if dtype_idx > 0 { @@ -342,7 +342,7 @@ module CSVMsg { throw getErrorWithContext( msg="The dataset %s was not found in %s".doFormat(dset, filename), lineNumber=getLineNumber(), - routineName=getRoutineName(), + routineName=getRoutineName(), moduleName=getModuleName(), errorClass="DatasetNotFoundError"); } @@ -353,6 +353,112 @@ module CSVMsg { return (row_ct, hasHeader, new list(dtypes)); } + proc read_csv_pattern(ref A: [] ?t, filename: string, filedom: domain(1), colName: string, colDelim: string, hasHeaders: bool, lineOffset: int, allowErrors: bool, ref hadError: bool) throws { + // We do a check to see if the filedome even intersects with + // A.localSubdomain before even opening the file + // The implementation assumes a single local subdomain so we make + // sure that is the case first + if !A.hasSingleLocalSubdomain() then + throw getErrorWithContext( + msg="The array A must have a single local subdomain on locale %i".doFormat(here.id), + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="InvalidArgumentError"); + const intersection = domain_intersection(A.localSubdomain(), filedom); + if(intersection.size == 0) { + // There is nothing to be done for this file on this locale + return; + } + + var fr = openReader(filename); + if(hasHeaders) { + var line:string; + // The first three lines are headers we don't care about + // Advance through them without reading them in + for param i in 0..<3 { + try {fr.advanceThrough(b'\n');} + catch { + throw getErrorWithContext( + msg="This CSV file is missing header values when headers were detected. This error should not arise.", + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="IOError"); + } + } + } + var colIdx = -1; + // This next line will have the columns + for (column,i) in zip(fr.readLine(stripNewline = true).split(colDelim), 0..) { + if column == colName { + colIdx=i; + break; + } + } + + if(colIdx == -1) then + throw getErrorWithContext( + msg="The dataset %s was not found in %s".doFormat(colName, filename), + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="DatasetNotFoundError"); + + var line = new csvLine(t, colIdx, colDelim); + for x in intersection { + try { + fr.read(line); + } catch e: BadFormatError { + // Handle errors + if t!=string { + if allowErrors { + A[x] = min(t); + continue; + } else { + hadError |= true; + } + } else { + A[x]=""; + } + } + // store the items from the csvLine in the array + A[x] = line.item; + } + } + + use Regex; + record csvLine: readDeserializable { + + type itemType; + var item: itemType; + const colIdx: int; + const colDelim: string; + var r: regex(string); + + proc init(type itemType, colIdx: int, colDelim: string) throws { + this.itemType = itemType; + this.colIdx = colIdx; + this.colDelim = colDelim; + init this; + if this.itemType == string then + this.r = new regex("[\n"+this.colDelim+"]"); + } + + proc ref deserialize(reader: fileReader(?), ref deserializer) throws { + // read the comma delimited items in a single line + for 0.. 0 { - forall x in intersection with (| reduce hadError) { - var row = lines[x-offsets[file_idx]+data_offset].split(col_delim); - if t == string || row[colidx] != "" { - // only write into A[x] if the value is non-empty and the col dtype is not a string - A[x] = row[colidx]: t; - } - else if allowErrors { - A[x] = min(t); - } - else { - hadError |= true; - } - } - } - } + read_csv_pattern(A, filename, filedom, dset, col_delim, hasHeaders, offsets[file_idx], allowErrors, hadError); } } }