Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate CSV writer to pylibcudf #17163

Open
wants to merge 32 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cd78dcd
[WIP] Migrate CSV writer to pylibcudf
Matt711 Oct 24, 2024
4f899b6
migrate the CSV writer
Matt711 Oct 24, 2024
6723045
get existing test passing
Matt711 Oct 25, 2024
cbeea6b
add a test
Matt711 Oct 29, 2024
d3998fc
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 1, 2024
8286f74
clean up
Matt711 Nov 5, 2024
dc93b8b
merge conflict
Matt711 Nov 6, 2024
d7d21ca
add more test cases
Matt711 Nov 6, 2024
a6007d7
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 6, 2024
2ce052f
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 7, 2024
1a8c38c
fix type checking in SinkInfo
Matt711 Nov 7, 2024
afaa46c
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 7, 2024
b132456
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 11, 2024
5f8e4b9
Expose the CsvWriterOptions nad CsvWriterOptionsBuilder
Matt711 Nov 12, 2024
2287eba
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 12, 2024
3706bd5
Add type stubs
Matt711 Nov 12, 2024
15370b0
commit declaration file
Matt711 Nov 12, 2024
f10492c
merge conflict
Matt711 Nov 15, 2024
dd72323
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 15, 2024
4e706bd
add type stub
Matt711 Nov 15, 2024
99e770a
switch to options arg
Matt711 Nov 15, 2024
cf11996
remove casts
Matt711 Nov 15, 2024
f9e4570
clean up
Matt711 Nov 15, 2024
b7d971a
keep tables alive
Matt711 Nov 15, 2024
464ae48
address review
Matt711 Nov 15, 2024
2643831
addres reviews
Matt711 Nov 15, 2024
fb2e1bc
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 18, 2024
ffe895e
add to all
Matt711 Nov 18, 2024
496d664
clean up
Matt711 Nov 18, 2024
d6b3667
clean up
Matt711 Nov 18, 2024
94599b9
simplify fixtures
Matt711 Nov 18, 2024
4bdd6dd
Merge branch 'branch-24.12' into pylibcudf-io-csv
Matt711 Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ table_with_metadata read_csv(
*/

/**
*@brief Builder to build options for `writer_csv()`.
*@brief Builder to build options for `write_csv()`.
*/
class csv_writer_options_builder;

Expand Down
118 changes: 44 additions & 74 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types

Expand All @@ -23,16 +19,8 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport sink_info
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.utils cimport data_from_pylibcudf_io
from cudf._lib.utils import _dtype_to_names_list

import pylibcudf as plc

Expand Down Expand Up @@ -318,59 +306,49 @@ def write_csv(
--------
cudf.to_csv
"""
cdef table_view input_table_view = table_view_from_table(
table, not index
)
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, input_table_view)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

columns=[]
index_and_not_empty = index is True and table.index is not None
if index_and_not_empty:
columns.extend(col.to_pylibcudf(mode="read") for col in table.index._columns)
columns.extend(col.to_pylibcudf(mode="read") for col in table._columns)
if header:
all_names = []
if index_and_not_empty:
all_names.extend(table.index.names)
all_names.extend(
na_rep if name is None or pd.isnull(name)
else name for name in table._column_names
)
col_names = [
'""' if (name in (None, '') and len(all_names) == 1)
else (str(name) if name not in (None, '') else '')
for name in all_names
]
else:
col_names = []
num_index_columns = len(table._index.names) if index_and_not_empty else 0
col_names_and_child_col_names = [
(
name,
_dtype_to_names_list(
table[table._column_names[i - num_index_columns]]._column
) if i >= num_index_columns else []
)
for i, name in enumerate(col_names)
]
try:
with nogil:
cpp_write_csv(options)
plc.io.csv.write_csv(
plc.io.SinkInfo([path_or_buf]),
plc.io.TableWithMetadata(
plc.Table(columns),
col_names_and_child_col_names
),
sep=str(sep),
na_rep=str(na_rep),
header=header,
lineterminator=str(lineterminator),
rows_per_chunk=rows_per_chunk,
)
except OverflowError:
raise OverflowError(
f"Writing CSV file with chunksize={rows_per_chunk} failed. "
Expand Down Expand Up @@ -419,11 +397,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *:

dtype = cudf.dtype(dtype)
return dtype_to_pylibcudf_type(dtype)


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if pd.isnull(col_name)
else col_name
for col_name in column_names
)
11 changes: 1 addition & 10 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport add_df_col_struct_names
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io
from cudf._lib.utils import _dtype_to_names_list

import pylibcudf as plc

Expand Down Expand Up @@ -217,13 +218,3 @@ cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *:
"supported in JSON reader"
)
return dtype_to_data_type(dtype)


def _dtype_to_names_list(col):
if isinstance(col.dtype, cudf.StructDtype):
return [(name, _dtype_to_names_list(child))
for name, child in zip(col.dtype.fields, col.children)]
elif isinstance(col.dtype, cudf.ListDtype):
return [("", _dtype_to_names_list(child))
for child in col.children]
return []
10 changes: 10 additions & 0 deletions python/cudf/cudf/_lib/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,13 @@ cdef data_from_table_view(
source_column_idx += 1

return dict(zip(column_names, data_columns)), index


def _dtype_to_names_list(col):
if isinstance(col.dtype, cudf.StructDtype):
return [(name, _dtype_to_names_list(child))
for name, child in zip(col.dtype.fields, col.children)]
elif isinstance(col.dtype, cudf.ListDtype):
return [("", _dtype_to_names_list(child))
for child in col.children]
return []
31 changes: 31 additions & 0 deletions python/pylibcudf/pylibcudf/io/csv.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.vector cimport vector
from libcpp.string cimport string
from libcpp cimport bool
from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
csv_writer_options_builder,
)
from pylibcudf.libcudf.io.types cimport quote_style
from pylibcudf.io.types cimport SinkInfo
from pylibcudf cimport Table

cdef class CsvWriterOptions:
cdef csv_writer_options c_obj

@staticmethod
cdef CsvWriterOptionsBuilder builder(SinkInfo sink, Table table)


cdef class CsvWriterOptionsBuilder:
cdef csv_writer_options_builder c_obj
cpdef CsvWriterOptionsBuilder names(self, list names)
cpdef CsvWriterOptionsBuilder na_rep(self, str val)
cpdef CsvWriterOptionsBuilder include_header(self, bool val)
cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val)
cpdef CsvWriterOptionsBuilder line_terminator(self, str term)
cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim)
cpdef CsvWriterOptionsBuilder true_value(self, str val)
cpdef CsvWriterOptionsBuilder false_value(self, str val)
cpdef CsvWriterOptions build(self)
11 changes: 11 additions & 0 deletions python/pylibcudf/pylibcudf/io/csv.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ from collections.abc import Mapping
from pylibcudf.io.types import (
CompressionType,
QuoteStyle,
SinkInfo,
SourceInfo,
TableWithMetadata,
)
Expand Down Expand Up @@ -52,3 +53,13 @@ def read_csv(
# detect_whitespace_around_quotes: bool = False,
# timestamp_type: DataType = DataType(type_id.EMPTY),
) -> TableWithMetadata: ...
def write_csv(
sink_info: SinkInfo,
table: TableWithMetadata,
*,
sep: str = ",",
na_rep: str = "",
header: bool = True,
lineterminator: str = "\n",
rows_per_chunk: int = 8,
) -> None: ...
Loading
Loading