Skip to content

Commit

Permalink
add orc chuncked writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed Nov 18, 2024
1 parent 3648886 commit 763b870
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 96 deletions.
165 changes: 91 additions & 74 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from libc.stdint cimport int64_t
from libcpp cimport bool, int
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import itertools
from collections import OrderedDict

try:
Expand All @@ -16,31 +15,22 @@ except ImportError:
import json

cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
orc_chunked_writer,
orc_writer_options,
write_orc as libcudf_write_orc,
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
sink_info,
table_input_metadata,
)
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.io.utils cimport update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

import cudf
from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES
from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.core.buffer import acquire_spill_lock

from pylibcudf.io.types cimport TableInputMetadata, SinkInfo
from pylibcudf.io.orc cimport OrcChunkedWriter

# TODO: Consider inlining this function since it seems to only be used in one place.
cpdef read_parsed_orc_statistics(filepath_or_buffer):
Expand Down Expand Up @@ -246,61 +236,70 @@ def write_orc(
--------
cudf.read_orc
"""
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, index)
)

if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
if table._index is not None:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
else:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
tbl_meta.c_obj.column_metadata[level].set_name(
str.encode(_index_level_name(idx_name, level, table._column_names))
)
num_index_cols_meta = len(table._index.names)
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = TableInputMetadata(plc_table)
num_index_cols_meta = 0

if cols_as_map_type is not None:
cols_as_map_type = set(cols_as_map_type)

for i, name in enumerate(table._column_names, num_index_cols_meta):
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
tbl_meta.c_obj.column_metadata[i],
(cols_as_map_type is not None)
and (name in cols_as_map_type),
)

cdef orc_writer_options c_orc_writer_options = move(
orc_writer_options.builder(
sink_info_c, tv
).metadata(tbl_meta)
options = (
plc.io.orc.OrcWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc_table
)
.metadata(tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
if stripe_size_bytes is not None:
c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes)
options.set_stripe_size_bytes(stripe_size_bytes)
if stripe_size_rows is not None:
c_orc_writer_options.set_stripe_size_rows(stripe_size_rows)
options.set_stripe_size_rows(stripe_size_rows)
if row_index_stride is not None:
c_orc_writer_options.set_row_index_stride(row_index_stride)
options.set_row_index_stride(row_index_stride)

with nogil:
libcudf_write_orc(c_orc_writer_options)
plc.io.orc.write_orc(options)


cdef int64_t get_skiprows_arg(object arg) except*:
Expand All @@ -326,13 +325,12 @@ cdef class ORCWriter:
cudf.io.orc.to_orc
"""
cdef bool initialized
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef OrcChunkedWriter writer
cdef SinkInfo sink
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef TableInputMetadata tbl_meta
cdef object cols_as_map_type
cdef object stripe_size_bytes
cdef object stripe_size_rows
Expand All @@ -347,8 +345,7 @@ cdef class ORCWriter:
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.sink = plc.io.SinkInfo([path])
self.statistics = statistics
self.compression = compression
self.index = index
Expand All @@ -368,17 +365,23 @@ cdef class ORCWriter:
table._index.name is not None or
isinstance(table._index, cudf.core.multiindex.MultiIndex)
)
tv = table_view_from_table(table, not keep_index)
if keep_index:
columns = [
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
else:
columns = [col.to_pylibcudf(mode="read") for col in table._columns]

with nogil:
self.writer.get()[0].write(tv)
self.writer.write(
plc.Table(columns)
)

def close(self):
if not self.initialized:
return

with nogil:
self.writer.get()[0].close()
self.writer.close()

def __dealloc__(self):
self.close()
Expand All @@ -387,35 +390,50 @@ cdef class ORCWriter:
"""
Prepare all the values required to build the
chunked_orc_writer_options anb creates a writer"""
cdef table_view tv

num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True),
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
self.tbl_meta = TableInputMetadata(plc_table)
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
self.tbl_meta.c_obj.column_metadata[level].set_name(
str.encode(idx_name)
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
self.tbl_meta.column_metadata[0].set_name(
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(
table.index._columns, table._columns
)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
self.tbl_meta.c_obj.column_metadata[0].set_name(
str.encode(table._index.name)
)
num_index_cols_meta = 1

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
self.tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
self.tbl_meta.column_metadata[i],
self.tbl_meta.c_obj.column_metadata[i],
(self.cols_as_map_type is not None)
and (name in self.cols_as_map_type),
)
Expand All @@ -424,23 +442,22 @@ cdef class ORCWriter:
pandas_metadata = generate_pandas_metadata(table, self.index)
user_data[str.encode("pandas")] = str.encode(pandas_metadata)

cdef chunked_orc_writer_options c_opts = move(
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
options = (
plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
c_opts.set_stripe_size_bytes(self.stripe_size_bytes)
options.set_stripe_size_bytes(self.stripe_size_bytes)
if self.stripe_size_rows is not None:
c_opts.set_stripe_size_rows(self.stripe_size_rows)
options.set_stripe_size_rows(self.stripe_size_rows)
if self.row_index_stride is not None:
c_opts.set_row_index_stride(self.row_index_stride)
options.set_row_index_stride(self.row_index_stride)

with nogil:
self.writer.reset(new orc_chunked_writer(c_opts))
self.writer = plc.io.orc.OrcChunkedWriter.from_options(options)

self.initialized = True

Expand Down
60 changes: 48 additions & 12 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,33 @@ from libcpp cimport bool
from libcpp.optional cimport optional
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.memory cimport unique_ptr
from libcpp.map cimport map
from pylibcudf.io.types cimport (
SourceInfo,
SinkInfo,
TableWithMetadata,
CompressionType,
StatisticsFreq,
TableInputMetadata,
)
from pylibcudf.libcudf.io.orc_metadata cimport (
column_statistics,
parsed_orc_statistics,
statistics_type,
)
from pylibcudf.libcudf.io.orc cimport (
orc_chunked_writer,
orc_writer_options,
orc_writer_options_builder,
chunked_orc_writer_options,
chunked_orc_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table
from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)

cpdef TableWithMetadata read_orc(
SourceInfo source_info,
Expand Down Expand Up @@ -58,18 +67,45 @@ cpdef ParsedOrcStatistics read_parsed_orc_statistics(
SourceInfo source_info
)


cdef class OrcWriterOptions:
cdef orc_writer_options c_obj

@staticmethod
cdef OrcWriterOptionsBuilder builder(SinkInfo sink, Table table)

cdef Table table
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class OrcWriterOptionsBuilder:
cdef orc_writer_options_builder c_obj
cpdef OrcWriterOptionsBuilder compression(self, CompressionType comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, StatisticsFreq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, object kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableWithMetadata meta)
cdef Table table
cdef SinkInfo sink
cpdef OrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, map[string, string] kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
cpdef void close(self)
cpdef write(self, Table table)

cdef class ChunkedOrcWriterOptions:
cdef chunked_orc_writer_options c_obj
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class ChunkedOrcWriterOptionsBuilder:
cdef chunked_orc_writer_options_builder c_obj
cdef SinkInfo sink
cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata(
self, map[string, string] kvm
)
cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef ChunkedOrcWriterOptions build(self)
3 changes: 3 additions & 0 deletions python/pylibcudf/pylibcudf/io/orc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def read_parsed_orc_statistics(

class OrcWriterOptions:
def __init__(self): ...
def set_stripe_size_bytes(self, size_bytes: int) -> None: ...
def set_stripe_size_rows(self, size_rows: int) -> None: ...
def set_row_index_stride(self, stride: int) -> None: ...
@staticmethod
def builder(sink: SinkInfo, table: Table) -> OrcWriterOptionsBuilder: ...

Expand Down
Loading

0 comments on commit 763b870

Please sign in to comment.