Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Migrate ORC Writer to pylibcudf #17310

Draft
wants to merge 6 commits into
base: branch-24.12
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 97 additions & 83 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from libc.stdint cimport int64_t
from libcpp cimport bool, int
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import itertools
from collections import OrderedDict

try:
Expand All @@ -16,31 +14,22 @@ except ImportError:
import json

cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
orc_chunked_writer,
orc_writer_options,
write_orc as libcudf_write_orc,
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
sink_info,
table_input_metadata,
)
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.io.utils cimport update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

import cudf
from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES
from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.core.buffer import acquire_spill_lock

from pylibcudf.io.types cimport TableInputMetadata, SinkInfo
from pylibcudf.io.orc cimport OrcChunkedWriter

# TODO: Consider inlining this function since it seems to only be used in one place.
cpdef read_parsed_orc_statistics(filepath_or_buffer):
Expand Down Expand Up @@ -246,61 +235,68 @@ def write_orc(
--------
cudf.read_orc
"""
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, index)
)

user_data = {}
user_data["pandas"] = generate_pandas_metadata(table, index)
if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
if table._index is not None:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
else:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
tbl_meta.c_obj.column_metadata[level].set_name(
str.encode(_index_level_name(idx_name, level, table._column_names))
)
num_index_cols_meta = len(table._index.names)
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = TableInputMetadata(plc_table)
num_index_cols_meta = 0

if cols_as_map_type is not None:
cols_as_map_type = set(cols_as_map_type)

for i, name in enumerate(table._column_names, num_index_cols_meta):
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
tbl_meta.c_obj.column_metadata[i],
(cols_as_map_type is not None)
and (name in cols_as_map_type),
)

cdef orc_writer_options c_orc_writer_options = move(
orc_writer_options.builder(
sink_info_c, tv
).metadata(tbl_meta)
.key_value_metadata(move(user_data))
options = (
plc.io.orc.OrcWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc_table
)
.metadata(tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
if stripe_size_bytes is not None:
c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes)
options.set_stripe_size_bytes(stripe_size_bytes)
if stripe_size_rows is not None:
c_orc_writer_options.set_stripe_size_rows(stripe_size_rows)
options.set_stripe_size_rows(stripe_size_rows)
if row_index_stride is not None:
c_orc_writer_options.set_row_index_stride(row_index_stride)
options.set_row_index_stride(row_index_stride)

with nogil:
libcudf_write_orc(c_orc_writer_options)
plc.io.orc.write_orc(options)


cdef int64_t get_skiprows_arg(object arg) except*:
Expand All @@ -326,13 +322,12 @@ cdef class ORCWriter:
cudf.io.orc.to_orc
"""
cdef bool initialized
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef OrcChunkedWriter writer
cdef SinkInfo sink
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef TableInputMetadata tbl_meta
cdef object cols_as_map_type
cdef object stripe_size_bytes
cdef object stripe_size_rows
Expand All @@ -347,8 +342,7 @@ cdef class ORCWriter:
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.sink = plc.io.SinkInfo([path])
self.statistics = statistics
self.compression = compression
self.index = index
Expand All @@ -368,17 +362,23 @@ cdef class ORCWriter:
table._index.name is not None or
isinstance(table._index, cudf.core.multiindex.MultiIndex)
)
tv = table_view_from_table(table, not keep_index)
if keep_index:
columns = [
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
else:
columns = [col.to_pylibcudf(mode="read") for col in table._columns]

with nogil:
self.writer.get()[0].write(tv)
self.writer.write(
plc.Table(columns)
)

def close(self):
if not self.initialized:
return

with nogil:
self.writer.get()[0].close()
self.writer.close()

def __dealloc__(self):
self.close()
Expand All @@ -387,60 +387,74 @@ cdef class ORCWriter:
"""
Prepare all the values required to build the
chunked_orc_writer_options anb creates a writer"""
cdef table_view tv

num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True),
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
self.tbl_meta = TableInputMetadata(plc_table)
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
self.tbl_meta.c_obj.column_metadata[level].set_name(
str.encode(idx_name)
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
self.tbl_meta.column_metadata[0].set_name(
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(
table.index._columns, table._columns
)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
self.tbl_meta.c_obj.column_metadata[0].set_name(
str.encode(table._index.name)
)
num_index_cols_meta = 1

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
self.tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
self.tbl_meta.column_metadata[i],
self.tbl_meta.c_obj.column_metadata[i],
(self.cols_as_map_type is not None)
and (name in self.cols_as_map_type),
)

cdef map[string, string] user_data
user_data = {}
pandas_metadata = generate_pandas_metadata(table, self.index)
user_data[str.encode("pandas")] = str.encode(pandas_metadata)

cdef chunked_orc_writer_options c_opts = move(
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
user_data["pandas"] = pandas_metadata

options = (
plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
c_opts.set_stripe_size_bytes(self.stripe_size_bytes)
options.set_stripe_size_bytes(self.stripe_size_bytes)
if self.stripe_size_rows is not None:
c_opts.set_stripe_size_rows(self.stripe_size_rows)
options.set_stripe_size_rows(self.stripe_size_rows)
if self.row_index_stride is not None:
c_opts.set_row_index_stride(self.row_index_stride)
options.set_row_index_stride(self.row_index_stride)

with nogil:
self.writer.reset(new orc_chunked_writer(c_opts))
self.writer = plc.io.orc.OrcChunkedWriter.from_options(options)

self.initialized = True

Expand Down
65 changes: 63 additions & 2 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ from libcpp cimport bool
from libcpp.optional cimport optional
from libcpp.string cimport string
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from libcpp.memory cimport unique_ptr
from libcpp.map cimport map
from pylibcudf.io.types cimport (
SourceInfo,
SinkInfo,
TableWithMetadata,
TableInputMetadata,
)
from pylibcudf.libcudf.io.orc_metadata cimport (
column_statistics,
parsed_orc_statistics,
statistics_type,
)
from pylibcudf.libcudf.io.orc cimport (
orc_chunked_writer,
orc_writer_options,
orc_writer_options_builder,
chunked_orc_writer_options,
chunked_orc_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table
from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)

cpdef TableWithMetadata read_orc(
SourceInfo source_info,
Expand Down Expand Up @@ -48,3 +66,46 @@ cdef class ParsedOrcStatistics:
cpdef ParsedOrcStatistics read_parsed_orc_statistics(
SourceInfo source_info
)

cdef class OrcWriterOptions:
cdef orc_writer_options c_obj
cdef Table table
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class OrcWriterOptionsBuilder:
cdef orc_writer_options_builder c_obj
cdef Table table
cdef SinkInfo sink
cpdef OrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
cpdef void close(self)
cpdef write(self, Table table)

cdef class ChunkedOrcWriterOptions:
cdef chunked_orc_writer_options c_obj
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class ChunkedOrcWriterOptionsBuilder:
cdef chunked_orc_writer_options_builder c_obj
cdef SinkInfo sink
cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata(
self, dict kvm
)
cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef ChunkedOrcWriterOptions build(self)
Loading
Loading