diff --git a/benchmarks/csvIO.py b/benchmarks/csvIO.py index 97839ea8d6..76ad3cc74b 100644 --- a/benchmarks/csvIO.py +++ b/benchmarks/csvIO.py @@ -17,7 +17,7 @@ def create_parser(): parser.add_argument("hostname", help="Hostname of arkouda server") parser.add_argument("port", type=int, help="Port of arkouda server") parser.add_argument( - "-n", "--size", type=int, default=2*10**5, help="Problem size: length of array to read/write" + "-n", "--size", type=int, default=10**6, help="Problem size: length of array to read/write" ) parser.add_argument( "-t", "--trials", type=int, default=1, help="Number of times to run the benchmark" diff --git a/src/CSVMsg.chpl b/src/CSVMsg.chpl index 10d766d252..433850da32 100644 --- a/src/CSVMsg.chpl +++ b/src/CSVMsg.chpl @@ -227,7 +227,7 @@ module CSVMsg { coforall loc in Locales do on loc { const localeFilename = filenames[loc.id]; - + // create the file to write to var csvFile = open(localeFilename, ioMode.cw); var writer = csvFile.writer(locking=false); @@ -312,13 +312,13 @@ module CSVMsg { var hasHeader = false; var dtype_idx = 0; var column_name_idx = 0; - + if lines[0] == CSV_HEADER_OPEN + "\n" { hasHeader = true; dtype_idx = 1; column_name_idx = 3; } - + var columns = lines[column_name_idx].split(col_delim).strip(); var file_dtypes: [0..#columns.size] string; if dtype_idx > 0 { @@ -342,7 +342,7 @@ module CSVMsg { throw getErrorWithContext( msg="The dataset %s was not found in %s".doFormat(dset, filename), lineNumber=getLineNumber(), - routineName=getRoutineName(), + routineName=getRoutineName(), moduleName=getModuleName(), errorClass="DatasetNotFoundError"); } @@ -353,6 +353,112 @@ module CSVMsg { return (row_ct, hasHeader, new list(dtypes)); } + proc read_csv_pattern(ref A: [] ?t, filename: string, filedom: domain(1), colName: string, colDelim: string, hasHeaders: bool, lineOffset: int, allowErrors: bool, ref hadError: bool) throws { + // We do a check to see if the filedome even intersects with + // A.localSubdomain before even opening the file + // The implementation assumes a single local subdomain so we make + // sure that is the case first + if !A.hasSingleLocalSubdomain() then + throw getErrorWithContext( + msg="The array A must have a single local subdomain on locale %i".doFormat(here.id), + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="InvalidArgumentError"); + const intersection = domain_intersection(A.localSubdomain(), filedom); + if(intersection.size == 0) { + // There is nothing to be done for this file on this locale + return; + } + + var fr = openReader(filename); + if(hasHeaders) { + var line:string; + // The first three lines are headers we don't care about + // Advance through them without reading them in + for param i in 0..<3 { + try {fr.advanceThrough(b'\n');} + catch { + throw getErrorWithContext( + msg="This CSV file is missing header values when headers were detected. This error should not arise.", + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="IOError"); + } + } + } + var colIdx = -1; + // This next line will have the columns + for (column,i) in zip(fr.readLine(stripNewline = true).split(colDelim), 0..) { + if column == colName { + colIdx=i; + break; + } + } + + if(colIdx == -1) then + throw getErrorWithContext( + msg="The dataset %s was not found in %s".doFormat(colName, filename), + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="DatasetNotFoundError"); + + var line = new csvLine(t, colIdx, colDelim); + for x in intersection { + try { + fr.read(line); + } catch e: BadFormatError { + // Handle errors + if t!=string { + if allowErrors { + A[x] = min(t); + continue; + } else { + hadError |= true; + } + } else { + A[x]=""; + } + } + // store the items from the csvLine in the array + A[x] = line.item; + } + } + + use Regex; + record csvLine: readDeserializable { + + type itemType; + var item: itemType; + const colIdx: int; + const colDelim: string; + var r: regex(string); + + proc init(type itemType, colIdx: int, colDelim: string) throws { + this.itemType = itemType; + this.colIdx = colIdx; + this.colDelim = colDelim; + init this; + if this.itemType == string then + this.r = new regex("[\n"+this.colDelim+"]"); + } + + proc ref deserialize(reader: fileReader(?), ref deserializer) throws { + // read the comma delimited items in a single line + for 0.. 0 { - forall x in intersection with (| reduce hadError) { - var row = lines[x-offsets[file_idx]+data_offset].split(col_delim); - if t == string || row[colidx] != "" { - // only write into A[x] if the value is non-empty and the col dtype is not a string - A[x] = row[colidx]: t; - } - else if allowErrors { - A[x] = min(t); - } - else { - hadError |= true; - } - } - } - } + read_csv_pattern(A, filename, filedom, dset, col_delim, hasHeaders, offsets[file_idx], allowErrors, hadError); } } }