diff --git a/benchmarks/IO.py b/benchmarks/IO.py index 191ff4738b..ef3be69444 100644 --- a/benchmarks/IO.py +++ b/benchmarks/IO.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 import argparse +from enum import Enum import os import time from glob import glob import arkouda as ak +import numpy as np TYPES = ( "int64", @@ -22,15 +24,22 @@ "lz4" ) +class FileFormat(Enum): + HDF5 = 1 + PARQUET = 2 + CSV = 3 -def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, comps=None): +def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, fileFormat, comps=None): if comps is None or comps == [""]: comps = COMPRESSIONS - if not parquet: - print(">>> arkouda {} HDF5 write with compression={}".format(dtype, comps)) - else: - print(">>> arkouda {} Parquet write with compression={}".format(dtype, comps)) + file_format_actions = { + FileFormat.HDF5: ">>> arkouda {} HDF5 write with compression={}".format(dtype, comps), + FileFormat.PARQUET: ">>> arkouda {} Parquet write with compression={}".format(dtype, comps), + FileFormat.CSV: ">>> arkouda {} CSV write".format(dtype) + } + print(file_format_actions.get(fileFormat, "Invalid file format")) + cfg = ak.get_config() N = N_per_locale * cfg["numLocales"] print("numLocales = {}, N = {:,}, filesPerLoc = {}".format(cfg["numLocales"], N, numfiles)) @@ -44,7 +53,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co a = ak.random_strings_uniform(1, 16, N, seed=seed) times = {} - if parquet: + if fileFormat == FileFormat.PARQUET: for comp in comps: if comp in COMPRESSIONS: writetimes = [] @@ -57,7 +66,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co end = time.time() writetimes.append(end - start) times[comp] = sum(writetimes) / trials - else: + elif fileFormat == FileFormat.HDF5: writetimes = [] for i in range(trials): for j in range(numfiles): @@ -66,28 +75,42 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co end = time.time() writetimes.append(end - start) times["HDF5"] = sum(writetimes) / trials + elif fileFormat == FileFormat.CSV: + writetimes = [] + for i in range(trials): + for j in range(numfiles): + start = time.time() + a.to_csv(f"{path}{j:04}") + end = time.time() + writetimes.append(end - start) + times["CSV"] = sum(writetimes) / trials + else: + raise ValueError("Invalid file format") nb = a.size * a.itemsize * numfiles for key in times.keys(): print("write Average time {} = {:.4f} sec".format(key, times[key])) - print("write Average rate {} = {:.2f} GiB/sec".format(key, nb / 2**30 / times[key])) + print("write Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key])) -def time_ak_read(N_per_locale, numfiles, trials, dtype, path, seed, parquet, comps=None): +def time_ak_read(N_per_locale, numfiles, trials, dtype, path, fileFormat, comps=None): if comps is None or comps == [""]: comps = COMPRESSIONS - if not parquet: - print(">>> arkouda HDF5 {} read".format(dtype)) - else: - print(">>> arkouda Parquet {} read".format(dtype)) + file_format_actions = { + FileFormat.HDF5: ">>> arkouda {} HDF5 read with compression={}".format(dtype, comps), + FileFormat.PARQUET: ">>> arkouda {} Parquet read with compression={}".format(dtype, comps), + FileFormat.CSV: ">>> arkouda {} CSV read".format(dtype) + } + print(file_format_actions.get(fileFormat, "Invalid file format")) + cfg = ak.get_config() N = N_per_locale * cfg["numLocales"] print("numLocales = {}, N = {:,}, filesPerLoc = {}".format(cfg["numLocales"], N, numfiles)) a = ak.array([]) times = {} - if parquet: + if fileFormat == FileFormat.PARQUET: for comp in COMPRESSIONS: if comp in comps: readtimes = [] @@ -98,7 +121,7 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, seed, parquet, com readtimes.append(end - start) times[comp] = sum(readtimes) / trials - else: + elif fileFormat == FileFormat.HDF5: readtimes = [] for i in range(trials): start = time.time() @@ -106,11 +129,21 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, seed, parquet, com end = time.time() readtimes.append(end - start) times["HDF5"] = sum(readtimes) / trials + elif fileFormat == FileFormat.CSV: + readtimes = [] + for i in range(trials): + start = time.time() + a = ak.read_csv(path + "*").popitem()[1] + end = time.time() + readtimes.append(end - start) + times["CSV"] = sum(readtimes) / trials + else: + raise ValueError("Invalid file format") nb = a.size * a.itemsize for key in times.keys(): print("read Average time {} = {:.4f} sec".format(key, times[key])) - print("read Average rate {} = {:.2f} GiB/sec".format(key, nb / 2**30 / times[key])) + print("read Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key])) def remove_files(path): @@ -118,8 +151,9 @@ def remove_files(path): os.remove(f) -def check_correctness(dtype, path, seed, parquet, multifile=False): +def check_correctness(dtype, path, seed, fileFormat, multifile=False): N = 10**4 + b = None if dtype == "int64": a = ak.randint(0, 2**32, N, seed=seed) if multifile: @@ -136,20 +170,36 @@ def check_correctness(dtype, path, seed, parquet, multifile=False): a = ak.random_strings_uniform(1, 16, N, seed=seed) if multifile: b = ak.random_strings_uniform(1, 16, N, seed=seed) + else: + raise ValueError(f"Invalid dtype: {dtype}") - a.to_hdf(f"{path}{1}") if not parquet else a.to_parquet(f"{path}{1}") - if multifile: - b.to_hdf(f"{path}{2}") if not parquet else b.to_parquet(f"{path}{2}") + file_format_actions = { + FileFormat.HDF5: (a.to_hdf, b.to_hdf if b is not None else None, ak.read_hdf), + FileFormat.PARQUET: (a.to_parquet, b.to_parquet if b is not None else None, ak.read_parquet), + FileFormat.CSV: (a.to_csv, b.to_csv if b is not None else None, ak.read_csv) + } + + if fileFormat in file_format_actions: + write_a, write_b, read_c = file_format_actions.get(fileFormat) + else: + raise ValueError(f"Invalid file format: {fileFormat}") - c = ak.read_hdf(path + "*") if not parquet else ak.read_parquet(path + "*") - c = c.popitem()[1] + write_a(f"{path}{1}") + if multifile: + write_b(f"{path}{2}") + + c = read_c(path + "*").popitem()[1] remove_files(path) - if not multifile: - assert (a == c).all() + + if dtype == "float64": + assert np.allclose(a.to_ndarray(), c[0 : a.size].to_ndarray()) # Slice is full array when single file + if multifile: + assert np.allclose(b.to_ndarray(), c[a.size :].to_ndarray()) else: - assert (a == c[0 : a.size]).all() - assert (b == c[a.size :]).all() + assert (a == c[0 : a.size]).all() # Slice is full array when single file + if multifile: + assert (b == c[a.size :]).all() def create_parser(): @@ -182,9 +232,13 @@ def create_parser(): parser.add_argument( "-s", "--seed", default=None, type=int, help="Value to initialize random number generator" ) - parser.add_argument( + group = parser.add_mutually_exclusive_group() + group.add_argument( "-q", "--parquet", default=False, action="store_true", help="Perform Parquet operations" ) + group.add_argument( + "-v", "--csv", default=False, action="store_true", help="Perform CSV operations" + ) parser.add_argument( "-w", "--only-write", @@ -232,9 +286,11 @@ def create_parser(): comp_str = args.compression comp_types = COMPRESSIONS if comp_str == "" else comp_str.lower().split(",") + fileFormat = FileFormat.CSV if args.csv else FileFormat.PARQUET if args.parquet else FileFormat.HDF5 + if args.correctness_only: for dtype in TYPES: - check_correctness(dtype, args.path, args.seed, args.parquet) + check_correctness(dtype, args.path, args.seed, fileFormat) sys.exit(0) print("array size = {:,}".format(args.size)) @@ -248,12 +304,12 @@ def create_parser(): args.dtype, args.path, args.seed, - args.parquet, + fileFormat, comp_types, ) elif args.only_read: time_ak_read( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet, comp_types + args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat, comp_types ) else: time_ak_write( @@ -263,11 +319,11 @@ def create_parser(): args.dtype, args.path, args.seed, - args.parquet, + fileFormat, comp_types, ) time_ak_read( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet, comp_types + args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat, comp_types ) remove_files(args.path) diff --git a/benchmarks/contributing.rst b/benchmarks/contributing.rst index d4a6b0178a..3557694197 100644 --- a/benchmarks/contributing.rst +++ b/benchmarks/contributing.rst @@ -71,8 +71,8 @@ Running your benchmark ====================== To ensure your benchmark is working, you can run it by running the command from your root directory: -``./benchmarks/run_benchmarks.py myMethod.py`` -where "myMethod.py" is replaced with your filename. +``./benchmarks/run_benchmarks.py myMethod`` +where "myMethod" is replaced with your filename. Once everything is working here, correctness testing and numpy testing should be added to your benchmark in the following manner: diff --git a/benchmarks/csvIO.py b/benchmarks/csvIO.py new file mode 100644 index 0000000000..76ad3cc74b --- /dev/null +++ b/benchmarks/csvIO.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +import argparse + +from IO import * + +TYPES = ( + "int64", + "float64", + "uint64", + "str" +) + + +def create_parser(): + parser = argparse.ArgumentParser(description="Measure performance of Parquet reads/writes.") + parser.add_argument("hostname", help="Hostname of arkouda server") + parser.add_argument("port", type=int, help="Port of arkouda server") + parser.add_argument( + "-n", "--size", type=int, default=10**6, help="Problem size: length of array to read/write" + ) + parser.add_argument( + "-t", "--trials", type=int, default=1, help="Number of times to run the benchmark" + ) + parser.add_argument( + "-d", "--dtype", default="int64", help="Dtype of array ({})".format(", ".join(TYPES)) + ) + parser.add_argument( + "--correctness-only", + default=False, + action="store_true", + help="Only check correctness, not performance.", + ) + parser.add_argument( + "-p", + "--path", + default=os.getcwd() + "ak-io-test", + help="Target path for measuring read/write rates", + ) + parser.add_argument( + "-s", "--seed", default=None, type=int, help="Value to initialize random number generator" + ) + parser.add_argument( + "-w", + "--only-write", + default=False, + action="store_true", + help="Only write the files; files will not be removed", + ) + parser.add_argument( + "-r", + "--only-read", + default=False, + action="store_true", + help="Only read the files; files will not be removed", + ) + parser.add_argument( + "-f", + "--only-delete", + default=False, + action="store_true", + help="Only delete files created from writing with this benchmark", + ) + parser.add_argument( + "-l", "--files-per-loc", type=int, default=1, help="Number of files to create per locale" + ) + parser.add_argument( + "-c", + "--compression", + default="", + action="store", + help="Compression types to run Parquet benchmarks against. Comma delimited list (NO SPACES) allowing " + "for multiple. Accepted values: none, snappy, gzip, brotli, zstd, and lz4" + ) + return parser + + +if __name__ == "__main__": + import sys + + parser = create_parser() + args = parser.parse_args() + if args.dtype not in TYPES: + raise ValueError("Dtype must be {}, not {}".format("/".join(TYPES), args.dtype)) + ak.verbose = False + ak.connect(args.hostname, args.port) + comp_str = args.compression + comp_types = COMPRESSIONS if comp_str == "" else comp_str.lower().split(",") + + if args.correctness_only: + for dtype in TYPES: + check_correctness(dtype, args.path, args.seed, FileFormat.CSV) + sys.exit(0) + + print("array size = {:,}".format(args.size)) + print("number of trials = ", args.trials) + + if args.only_write: + time_ak_write( + args.size, + args.files_per_loc, + args.trials, + args.dtype, + args.path, + args.seed, + FileFormat.CSV, + comp_types, + ) + elif args.only_read: + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.CSV, comp_types) + else: + time_ak_write( + args.size, + args.files_per_loc, + args.trials, + args.dtype, + args.path, + args.seed, + FileFormat.CSV, + comp_types, + ) + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.CSV, comp_types) + remove_files(args.path) + + sys.exit(0) diff --git a/benchmarks/graph_infra/arkouda.graph b/benchmarks/graph_infra/arkouda.graph index 44522c596f..d6d31cb489 100644 --- a/benchmarks/graph_infra/arkouda.graph +++ b/benchmarks/graph_infra/arkouda.graph @@ -112,6 +112,12 @@ repeat-files: parquetMultiIO.dat graphtitle: Parquet 10 Files/Loc IO Performance ylabel: Performance (GiB/s) +perfkeys: write Average rate CSV =, read Average rate CSV = +graphkeys: Write GiB/s, Read GiB/s +repeat-files: csvIO.dat +graphtitle: CSV IO Performance +ylabel: Performance (GiB/s) + perfkeys: non-regex with literal substring Average rate =, regex with literal substring Average rate =, regex with pattern Average rate = graphkeys: non-regex with literal substring GiB/s, regex with literal substring GiB/s, regex with pattern GiB/s files: substring_search.dat, substring_search.dat, substring_search.dat diff --git a/benchmarks/graph_infra/csvIO.perfkeys b/benchmarks/graph_infra/csvIO.perfkeys new file mode 100644 index 0000000000..6b3e1482f1 --- /dev/null +++ b/benchmarks/graph_infra/csvIO.perfkeys @@ -0,0 +1,4 @@ +write Average time CSV = +write Average rate CSV = +read Average time CSV = +read Average rate CSV = \ No newline at end of file diff --git a/benchmarks/multiIO.py b/benchmarks/multiIO.py index 14f2edb4bb..6069f4b15f 100644 --- a/benchmarks/multiIO.py +++ b/benchmarks/multiIO.py @@ -41,9 +41,14 @@ def create_parser(): parser.add_argument( "-s", "--seed", default=None, type=int, help="Value to initialize random number generator" ) - parser.add_argument( + group = parser.add_mutually_exclusive_group() + group.add_argument( "-q", "--parquet", default=False, action="store_true", help="Perform Parquet operations" ) + group.add_argument( + "-v", "--csv", default=False, action="store_true", help="Perform CSV operations" + ) + parser.add_argument( "-w", "--only-write", @@ -81,9 +86,11 @@ def create_parser(): ak.verbose = False ak.connect(args.hostname, args.port) + fileFormat = FileFormat.CSV if args.csv else FileFormat.PARQUET if args.parquet else FileFormat.HDF5 + if args.correctness_only: for dtype in TYPES: - check_correctness(dtype, args.path, args.seed, args.parquet, multifile=True) + check_correctness(dtype, args.path, args.seed, fileFormat, multifile=True) sys.exit(0) print("array size = {:,}".format(args.size)) @@ -91,20 +98,20 @@ def create_parser(): if args.only_write: time_ak_write( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet + args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, fileFormat ) elif args.only_read: time_ak_read( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet + args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat ) elif args.only_delete: remove_files(args.path) else: time_ak_write( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet + args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, fileFormat ) time_ak_read( - args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet + args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat ) remove_files(args.path) diff --git a/benchmarks/parquetIO.py b/benchmarks/parquetIO.py index cb710c27b4..4f83857773 100644 --- a/benchmarks/parquetIO.py +++ b/benchmarks/parquetIO.py @@ -89,7 +89,7 @@ def create_parser(): if args.correctness_only: for dtype in TYPES: - check_correctness(dtype, args.path, args.seed, True) + check_correctness(dtype, args.path, args.seed, FileFormat.PARQUET) sys.exit(0) print("array size = {:,}".format(args.size)) @@ -103,11 +103,11 @@ def create_parser(): args.dtype, args.path, args.seed, - True, + FileFormat.PARQUET, comp_types, ) elif args.only_read: - time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, True, comp_types) + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.PARQUET, comp_types) else: time_ak_write( args.size, @@ -116,10 +116,10 @@ def create_parser(): args.dtype, args.path, args.seed, - True, + FileFormat.PARQUET, comp_types, ) - time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, True, comp_types) + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.PARQUET, comp_types) remove_files(args.path) sys.exit(0) diff --git a/benchmarks/parquetMultiIO.py b/benchmarks/parquetMultiIO.py index 2008a7e1db..b0a8916ec6 100644 --- a/benchmarks/parquetMultiIO.py +++ b/benchmarks/parquetMultiIO.py @@ -91,7 +91,7 @@ def create_parser(): if args.correctness_only: for dtype in TYPES: - check_correctness(dtype, args.path, args.seed, True, multifile=True) + check_correctness(dtype, args.path, args.seed, FileFormat.PARQUET, multifile=True) sys.exit(0) print("array size = {:,}".format(args.size)) @@ -105,11 +105,11 @@ def create_parser(): args.dtype, args.path, args.seed, - True, + FileFormat.PARQUET, comp_types, ) elif args.only_read: - time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, True, comp_types) + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.PARQUET, comp_types) elif args.only_delete: remove_files(args.path) else: @@ -120,10 +120,10 @@ def create_parser(): args.dtype, args.path, args.seed, - True, + FileFormat.PARQUET, comp_types, ) - time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, True, comp_types) + time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.PARQUET, comp_types) remove_files(args.path) sys.exit(0) diff --git a/benchmarks/run_benchmarks.py b/benchmarks/run_benchmarks.py index 96d2416731..22914cbf96 100755 --- a/benchmarks/run_benchmarks.py +++ b/benchmarks/run_benchmarks.py @@ -35,6 +35,7 @@ "array_create", "array_transfer", "IO", + "csvIO", "small-str-groupby", "str-argsort", "str-coargsort",