Skip to content

Commit

Permalink
Remove NativeFile support from cudf Python (#16589)
Browse files Browse the repository at this point in the history
This PR removes all support for passing NativeFile objects through cudf's I/O routines.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Bradley Dice (https://github.com/bdice)

URL: #16589
  • Loading branch information
vyasr authored Aug 19, 2024
1 parent 074abcc commit 79a5a97
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 681 deletions.
9 changes: 1 addition & 8 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types
from pylibcudf.io.datasource cimport Datasource, NativeFileDatasource

from cudf._lib.types cimport dtype_to_pylibcudf_type

Expand Down Expand Up @@ -35,8 +34,6 @@ from pylibcudf.libcudf.table.table_view cimport table_view
from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table

from pyarrow.lib import NativeFile

import pylibcudf as plc

from cudf.api.types import is_hashable
Expand Down Expand Up @@ -127,9 +124,7 @@ def read_csv(
cudf.read_csv
"""

if not isinstance(datasource, (BytesIO, StringIO, bytes,
Datasource,
NativeFile)):
if not isinstance(datasource, (BytesIO, StringIO, bytes)):
if not os.path.isfile(datasource):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), datasource
Expand All @@ -139,8 +134,6 @@ def read_csv(
datasource = datasource.read().encode()
elif isinstance(datasource, str) and not os.path.isfile(datasource):
datasource = datasource.encode()
elif isinstance(datasource, NativeFile):
datasource = NativeFileDatasource(datasource)

validate_args(delimiter, sep, delim_whitespace, decimal, thousands,
nrows, skipfooter, byte_range, skiprows)
Expand Down
10 changes: 0 additions & 10 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ except ImportError:
import json

cimport pylibcudf.libcudf.io.types as cudf_io_types
from pylibcudf.io.datasource cimport NativeFileDatasource
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
Expand Down Expand Up @@ -71,8 +70,6 @@ from cudf._lib.types import SUPPORTED_NUMPY_TO_LIBCUDF_TYPES
from cudf._lib.types cimport underlying_type_t_type_id
from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table

from pyarrow.lib import NativeFile

from cudf._lib.utils import _index_level_name, generate_pandas_metadata


Expand Down Expand Up @@ -204,10 +201,6 @@ cpdef read_parsed_orc_statistics(filepath_or_buffer):
cudf.io.orc.read_orc_statistics
"""

# Handle NativeFile input
if isinstance(filepath_or_buffer, NativeFile):
filepath_or_buffer = NativeFileDatasource(filepath_or_buffer)

cdef parsed_orc_statistics parsed = (
libcudf_read_parsed_orc_statistics(make_source_info([filepath_or_buffer]))
)
Expand Down Expand Up @@ -490,9 +483,6 @@ cdef orc_reader_options make_orc_reader_options(
bool use_index
) except*:

for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef vector[vector[size_type]] strps = stripes
cdef orc_reader_options opts
cdef source_info src = make_source_info(filepaths_or_buffers)
Expand Down
43 changes: 4 additions & 39 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ from libcpp.vector cimport vector
cimport pylibcudf.libcudf.io.data_sink as cudf_io_data_sink
cimport pylibcudf.libcudf.io.types as cudf_io_types
from pylibcudf.expressions cimport Expression
from pylibcudf.io.datasource cimport NativeFileDatasource
from pylibcudf.io.parquet cimport ChunkedParquetReader
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_writer_options,
Expand Down Expand Up @@ -62,8 +61,6 @@ from cudf._lib.io.utils cimport (
)
from cudf._lib.utils cimport table_view_from_table

from pyarrow.lib import NativeFile

import pylibcudf as plc

from pylibcudf cimport Table
Expand Down Expand Up @@ -133,7 +130,6 @@ cdef object _process_metadata(object df,
list per_file_user_data,
object row_groups,
object filepaths_or_buffers,
list pa_buffers,
bool allow_range_index,
bool use_pandas_metadata,
size_type nrows=-1,
Expand Down Expand Up @@ -199,9 +195,7 @@ cdef object _process_metadata(object df,
pa.parquet.read_metadata(
# Pyarrow cannot read directly from bytes
io.BytesIO(s) if isinstance(s, bytes) else s
) for s in (
pa_buffers or filepaths_or_buffers
)
) for s in filepaths_or_buffers
]

filtered_idx = []
Expand Down Expand Up @@ -274,27 +268,13 @@ def read_parquet_chunked(
size_type nrows=-1,
int64_t skip_rows=0
):
# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)

pa_buffers = []

new_bufs = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
new_bufs.append(NativeFileDatasource(datasource))
else:
new_bufs.append(datasource)

# Note: If this function ever takes accepts filters
# allow_range_index needs to be False when a filter is passed
# (see read_parquet)
allow_range_index = columns is not None and len(columns) != 0

reader = ChunkedParquetReader(
plc.io.SourceInfo(new_bufs),
plc.io.SourceInfo(filepaths_or_buffers),
columns,
row_groups,
use_pandas_metadata,
Expand Down Expand Up @@ -333,7 +313,7 @@ def read_parquet_chunked(
)
df = _process_metadata(df, column_names, child_names,
per_file_user_data, row_groups,
filepaths_or_buffers, pa_buffers,
filepaths_or_buffers,
allow_range_index, use_pandas_metadata,
nrows=nrows, skip_rows=skip_rows)
return df
Expand All @@ -356,16 +336,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
cudf.io.parquet.to_parquet
"""

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)
pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

allow_range_index = True
if columns is not None and len(columns) == 0 or filters:
allow_range_index = False
Expand All @@ -389,7 +359,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,

df = _process_metadata(df, tbl_w_meta.column_names(include_children=False),
tbl_w_meta.child_names, tbl_w_meta.per_file_user_data,
row_groups, filepaths_or_buffers, pa_buffers,
row_groups, filepaths_or_buffers,
allow_range_index, use_pandas_metadata,
nrows=nrows, skip_rows=skip_rows)
return df
Expand All @@ -403,11 +373,6 @@ cpdef read_parquet_metadata(filepaths_or_buffers):
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""
# Convert NativeFile buffers to NativeFileDatasource
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers)

args = move(source)
Expand Down
11 changes: 1 addition & 10 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from io import BytesIO, StringIO

import numpy as np
from pyarrow.lib import NativeFile

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -50,7 +49,6 @@ def read_csv(
comment=None,
delim_whitespace=False,
byte_range=None,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand All @@ -63,12 +61,6 @@ def read_csv(
FutureWarning,
)

if use_python_file_object and bytes_per_thread is not None:
raise ValueError(
"bytes_per_thread is only supported when "
"`use_python_file_object=False`"
)

if bytes_per_thread is None:
bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT

Expand All @@ -84,8 +76,7 @@ def read_csv(
filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO, NativeFile),
use_python_file_object=use_python_file_object,
iotypes=(BytesIO, StringIO),
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)
Expand Down
33 changes: 10 additions & 23 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from cudf._lib import orc as liborc
from cudf.api.types import is_list_like
from cudf.utils import ioutils
from cudf.utils.utils import maybe_filter_deprecation


def _make_empty_df(filepath_or_buffer, columns):
Expand Down Expand Up @@ -281,7 +280,6 @@ def read_orc(
num_rows=None,
use_index=True,
timestamp_type=None,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand Down Expand Up @@ -321,9 +319,6 @@ def read_orc(
)

filepaths_or_buffers = []
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
Expand All @@ -339,7 +334,6 @@ def read_orc(
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
use_python_file_object=use_python_file_object,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)
Expand All @@ -364,24 +358,17 @@ def read_orc(
stripes = selected_stripes

if engine == "cudf":
# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
with maybe_filter_deprecation(
not have_nativefile,
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
)
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
)
)
else:
from pyarrow import orc

Expand Down
Loading

0 comments on commit 79a5a97

Please sign in to comment.