Skip to content

Commit

Permalink
Add benchmark for for CSV Read and write perf (Bears-R-Us#3189)
Browse files Browse the repository at this point in the history
* Add benchmark for for CSV Read and write perf

This  does the following:
- Updates the `time_ak_write`, `time_ak_read`, and `check_correctness`
  functions in IO.py to take a `fileFormat` argument instead of a the
  old `parquet` argument.
- `fileFormat` is an enum to distinguish between the three different
  filetypes, namely- hdf5, parquet, and csv.
- Add support for testing performance of csv reads and write in the
  above functions
- Remove the unused `seed` argument from `time_ak_read`
- Add a new benchmark `csvIO.py`. Currently CSV read performance is
  pretty bad and the benchmark just says 0.0 because of rounding. I have
  a follow up task to improve CSV read performance.
- Update `multiIO.py`, `parquetIO.py`, and `multiParquetIO.py` to work
  with the new changes to `IO.py` described in the first bullet
- Small change to contributing.rst (we omit the `.py`)

Signed-off-by: Shreyas Khandekar <[email protected]>

* Add graph infrastructure

Signed-off-by: Shreyas Khandekar <[email protected]>

* Fix errors with correctness tests

Signed-off-by: Shreyas Khandekar <[email protected]>

* Fix issue with array Truth values in correctness tests

Signed-off-by: Shreyas Khandekar <[email protected]>

* Change write to read in output for time_ak_read

Signed-off-by: Shreyas Khandekar <[email protected]>

* Increase significant digits in rate output to 4

Signed-off-by: Shreyas Khandekar <[email protected]>

---------

Signed-off-by: Shreyas Khandekar <[email protected]>
  • Loading branch information
ShreyasKhandekar authored May 21, 2024
1 parent fc95156 commit ea7ef8b
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 50 deletions.
120 changes: 88 additions & 32 deletions benchmarks/IO.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env python3

import argparse
from enum import Enum
import os
import time
from glob import glob

import arkouda as ak
import numpy as np

TYPES = (
"int64",
Expand All @@ -22,15 +24,22 @@
"lz4"
)

class FileFormat(Enum):
HDF5 = 1
PARQUET = 2
CSV = 3

def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, comps=None):
def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, fileFormat, comps=None):
if comps is None or comps == [""]:
comps = COMPRESSIONS

if not parquet:
print(">>> arkouda {} HDF5 write with compression={}".format(dtype, comps))
else:
print(">>> arkouda {} Parquet write with compression={}".format(dtype, comps))
file_format_actions = {
FileFormat.HDF5: ">>> arkouda {} HDF5 write with compression={}".format(dtype, comps),
FileFormat.PARQUET: ">>> arkouda {} Parquet write with compression={}".format(dtype, comps),
FileFormat.CSV: ">>> arkouda {} CSV write".format(dtype)
}
print(file_format_actions.get(fileFormat, "Invalid file format"))

cfg = ak.get_config()
N = N_per_locale * cfg["numLocales"]
print("numLocales = {}, N = {:,}, filesPerLoc = {}".format(cfg["numLocales"], N, numfiles))
Expand All @@ -44,7 +53,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co
a = ak.random_strings_uniform(1, 16, N, seed=seed)

times = {}
if parquet:
if fileFormat == FileFormat.PARQUET:
for comp in comps:
if comp in COMPRESSIONS:
writetimes = []
Expand All @@ -57,7 +66,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co
end = time.time()
writetimes.append(end - start)
times[comp] = sum(writetimes) / trials
else:
elif fileFormat == FileFormat.HDF5:
writetimes = []
for i in range(trials):
for j in range(numfiles):
Expand All @@ -66,28 +75,42 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, parquet, co
end = time.time()
writetimes.append(end - start)
times["HDF5"] = sum(writetimes) / trials
elif fileFormat == FileFormat.CSV:
writetimes = []
for i in range(trials):
for j in range(numfiles):
start = time.time()
a.to_csv(f"{path}{j:04}")
end = time.time()
writetimes.append(end - start)
times["CSV"] = sum(writetimes) / trials
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize * numfiles
for key in times.keys():
print("write Average time {} = {:.4f} sec".format(key, times[key]))
print("write Average rate {} = {:.2f} GiB/sec".format(key, nb / 2**30 / times[key]))
print("write Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))


def time_ak_read(N_per_locale, numfiles, trials, dtype, path, seed, parquet, comps=None):
def time_ak_read(N_per_locale, numfiles, trials, dtype, path, fileFormat, comps=None):
if comps is None or comps == [""]:
comps = COMPRESSIONS

if not parquet:
print(">>> arkouda HDF5 {} read".format(dtype))
else:
print(">>> arkouda Parquet {} read".format(dtype))
file_format_actions = {
FileFormat.HDF5: ">>> arkouda {} HDF5 read with compression={}".format(dtype, comps),
FileFormat.PARQUET: ">>> arkouda {} Parquet read with compression={}".format(dtype, comps),
FileFormat.CSV: ">>> arkouda {} CSV read".format(dtype)
}
print(file_format_actions.get(fileFormat, "Invalid file format"))

cfg = ak.get_config()
N = N_per_locale * cfg["numLocales"]
print("numLocales = {}, N = {:,}, filesPerLoc = {}".format(cfg["numLocales"], N, numfiles))
a = ak.array([])

times = {}
if parquet:
if fileFormat == FileFormat.PARQUET:
for comp in COMPRESSIONS:
if comp in comps:
readtimes = []
Expand All @@ -98,28 +121,39 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, seed, parquet, com
readtimes.append(end - start)
times[comp] = sum(readtimes) / trials

else:
elif fileFormat == FileFormat.HDF5:
readtimes = []
for i in range(trials):
start = time.time()
a = ak.read_hdf(path + "*").popitem()[1]
end = time.time()
readtimes.append(end - start)
times["HDF5"] = sum(readtimes) / trials
elif fileFormat == FileFormat.CSV:
readtimes = []
for i in range(trials):
start = time.time()
a = ak.read_csv(path + "*").popitem()[1]
end = time.time()
readtimes.append(end - start)
times["CSV"] = sum(readtimes) / trials
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize
for key in times.keys():
print("read Average time {} = {:.4f} sec".format(key, times[key]))
print("read Average rate {} = {:.2f} GiB/sec".format(key, nb / 2**30 / times[key]))
print("read Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))


def remove_files(path):
for f in glob(path + "*"):
os.remove(f)


def check_correctness(dtype, path, seed, parquet, multifile=False):
def check_correctness(dtype, path, seed, fileFormat, multifile=False):
N = 10**4
b = None
if dtype == "int64":
a = ak.randint(0, 2**32, N, seed=seed)
if multifile:
Expand All @@ -136,20 +170,36 @@ def check_correctness(dtype, path, seed, parquet, multifile=False):
a = ak.random_strings_uniform(1, 16, N, seed=seed)
if multifile:
b = ak.random_strings_uniform(1, 16, N, seed=seed)
else:
raise ValueError(f"Invalid dtype: {dtype}")

a.to_hdf(f"{path}{1}") if not parquet else a.to_parquet(f"{path}{1}")
if multifile:
b.to_hdf(f"{path}{2}") if not parquet else b.to_parquet(f"{path}{2}")
file_format_actions = {
FileFormat.HDF5: (a.to_hdf, b.to_hdf if b is not None else None, ak.read_hdf),
FileFormat.PARQUET: (a.to_parquet, b.to_parquet if b is not None else None, ak.read_parquet),
FileFormat.CSV: (a.to_csv, b.to_csv if b is not None else None, ak.read_csv)
}

if fileFormat in file_format_actions:
write_a, write_b, read_c = file_format_actions.get(fileFormat)
else:
raise ValueError(f"Invalid file format: {fileFormat}")

c = ak.read_hdf(path + "*") if not parquet else ak.read_parquet(path + "*")
c = c.popitem()[1]

write_a(f"{path}{1}")
if multifile:
write_b(f"{path}{2}")

c = read_c(path + "*").popitem()[1]
remove_files(path)
if not multifile:
assert (a == c).all()

if dtype == "float64":
assert np.allclose(a.to_ndarray(), c[0 : a.size].to_ndarray()) # Slice is full array when single file
if multifile:
assert np.allclose(b.to_ndarray(), c[a.size :].to_ndarray())
else:
assert (a == c[0 : a.size]).all()
assert (b == c[a.size :]).all()
assert (a == c[0 : a.size]).all() # Slice is full array when single file
if multifile:
assert (b == c[a.size :]).all()


def create_parser():
Expand Down Expand Up @@ -182,9 +232,13 @@ def create_parser():
parser.add_argument(
"-s", "--seed", default=None, type=int, help="Value to initialize random number generator"
)
parser.add_argument(
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-q", "--parquet", default=False, action="store_true", help="Perform Parquet operations"
)
group.add_argument(
"-v", "--csv", default=False, action="store_true", help="Perform CSV operations"
)
parser.add_argument(
"-w",
"--only-write",
Expand Down Expand Up @@ -232,9 +286,11 @@ def create_parser():
comp_str = args.compression
comp_types = COMPRESSIONS if comp_str == "" else comp_str.lower().split(",")

fileFormat = FileFormat.CSV if args.csv else FileFormat.PARQUET if args.parquet else FileFormat.HDF5

if args.correctness_only:
for dtype in TYPES:
check_correctness(dtype, args.path, args.seed, args.parquet)
check_correctness(dtype, args.path, args.seed, fileFormat)
sys.exit(0)

print("array size = {:,}".format(args.size))
Expand All @@ -248,12 +304,12 @@ def create_parser():
args.dtype,
args.path,
args.seed,
args.parquet,
fileFormat,
comp_types,
)
elif args.only_read:
time_ak_read(
args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet, comp_types
args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat, comp_types
)
else:
time_ak_write(
Expand All @@ -263,11 +319,11 @@ def create_parser():
args.dtype,
args.path,
args.seed,
args.parquet,
fileFormat,
comp_types,
)
time_ak_read(
args.size, args.files_per_loc, args.trials, args.dtype, args.path, args.seed, args.parquet, comp_types
args.size, args.files_per_loc, args.trials, args.dtype, args.path, fileFormat, comp_types
)
remove_files(args.path)

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Running your benchmark
======================

To ensure your benchmark is working, you can run it by running the command from your root directory:
``./benchmarks/run_benchmarks.py myMethod.py``
where "myMethod.py" is replaced with your filename.
``./benchmarks/run_benchmarks.py myMethod``
where "myMethod" is replaced with your filename.

Once everything is working here, correctness testing and numpy testing should be added to your benchmark in the following manner:

Expand Down
125 changes: 125 additions & 0 deletions benchmarks/csvIO.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3

import argparse

from IO import *

TYPES = (
"int64",
"float64",
"uint64",
"str"
)


def create_parser():
parser = argparse.ArgumentParser(description="Measure performance of Parquet reads/writes.")
parser.add_argument("hostname", help="Hostname of arkouda server")
parser.add_argument("port", type=int, help="Port of arkouda server")
parser.add_argument(
"-n", "--size", type=int, default=10**6, help="Problem size: length of array to read/write"
)
parser.add_argument(
"-t", "--trials", type=int, default=1, help="Number of times to run the benchmark"
)
parser.add_argument(
"-d", "--dtype", default="int64", help="Dtype of array ({})".format(", ".join(TYPES))
)
parser.add_argument(
"--correctness-only",
default=False,
action="store_true",
help="Only check correctness, not performance.",
)
parser.add_argument(
"-p",
"--path",
default=os.getcwd() + "ak-io-test",
help="Target path for measuring read/write rates",
)
parser.add_argument(
"-s", "--seed", default=None, type=int, help="Value to initialize random number generator"
)
parser.add_argument(
"-w",
"--only-write",
default=False,
action="store_true",
help="Only write the files; files will not be removed",
)
parser.add_argument(
"-r",
"--only-read",
default=False,
action="store_true",
help="Only read the files; files will not be removed",
)
parser.add_argument(
"-f",
"--only-delete",
default=False,
action="store_true",
help="Only delete files created from writing with this benchmark",
)
parser.add_argument(
"-l", "--files-per-loc", type=int, default=1, help="Number of files to create per locale"
)
parser.add_argument(
"-c",
"--compression",
default="",
action="store",
help="Compression types to run Parquet benchmarks against. Comma delimited list (NO SPACES) allowing "
"for multiple. Accepted values: none, snappy, gzip, brotli, zstd, and lz4"
)
return parser


if __name__ == "__main__":
import sys

parser = create_parser()
args = parser.parse_args()
if args.dtype not in TYPES:
raise ValueError("Dtype must be {}, not {}".format("/".join(TYPES), args.dtype))
ak.verbose = False
ak.connect(args.hostname, args.port)
comp_str = args.compression
comp_types = COMPRESSIONS if comp_str == "" else comp_str.lower().split(",")

if args.correctness_only:
for dtype in TYPES:
check_correctness(dtype, args.path, args.seed, FileFormat.CSV)
sys.exit(0)

print("array size = {:,}".format(args.size))
print("number of trials = ", args.trials)

if args.only_write:
time_ak_write(
args.size,
args.files_per_loc,
args.trials,
args.dtype,
args.path,
args.seed,
FileFormat.CSV,
comp_types,
)
elif args.only_read:
time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.CSV, comp_types)
else:
time_ak_write(
args.size,
args.files_per_loc,
args.trials,
args.dtype,
args.path,
args.seed,
FileFormat.CSV,
comp_types,
)
time_ak_read(args.size, args.files_per_loc, args.trials, args.dtype, args.path, FileFormat.CSV, comp_types)
remove_files(args.path)

sys.exit(0)
Loading

0 comments on commit ea7ef8b

Please sign in to comment.