Skip to content

Commit

Permalink
Use HostBuffer, rename to c_obj
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke committed Nov 15, 2024
1 parent 4b8402d commit 097decb
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 96 deletions.
21 changes: 4 additions & 17 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.contiguous_split cimport HostBuffer
from pylibcudf.expressions cimport Expression
from pylibcudf.io.types cimport (
compression_type,
Expand Down Expand Up @@ -47,7 +48,7 @@ cpdef read_parquet(
)

cdef class ParquetWriterOptions:
cdef parquet_writer_options options
cdef parquet_writer_options c_obj

@staticmethod
cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table)
Expand All @@ -67,7 +68,7 @@ cdef class ParquetWriterOptions:
cpdef void set_max_dictionary_size(self, int size_rows)

cdef class ParquetWriterOptionsBuilder:
cdef parquet_writer_options_builder builder
cdef parquet_writer_options_builder c_obj

cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata)

Expand All @@ -89,18 +90,4 @@ cdef class ParquetWriterOptionsBuilder:

cpdef ParquetWriterOptions build(self)


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
cdef unique_ptr[vector[uint8_t]] in_vec

# these two things declare part of the buffer interface
cdef Py_ssize_t shape[1]
cdef Py_ssize_t strides[1]

@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] in_vec
)

cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options)
cpdef HostBuffer write_parquet(ParquetWriterOptions options)
90 changes: 32 additions & 58 deletions python/pylibcudf/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pylibcudf.contiguous_split cimport HostBuffer
from pylibcudf.expressions cimport Expression
from pylibcudf.io.types cimport (
SinkInfo,
Expand All @@ -32,7 +33,13 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.table cimport Table

__all__ = ["ChunkedParquetReader", "read_parquet"]
__all__ = [
"ChunkedParquetReader",
"ParquetWriterOptions",
"ParquetWriterOptionsBuilder",
"read_parquet",
"write_parquet"
]


cdef parquet_reader_options _setup_parquet_reader_options(
Expand Down Expand Up @@ -261,7 +268,7 @@ cdef class ParquetWriterOptions:
cdef ParquetWriterOptionsBuilder bldr = ParquetWriterOptionsBuilder.__new__(
ParquetWriterOptionsBuilder
)
bldr.builder = parquet_writer_options.builder(sink.c_obj, table.view())
bldr.c_obj = parquet_writer_options.builder(sink.c_obj, table.view())
return bldr

cpdef void set_partitions(self, list partitions):
Expand All @@ -280,11 +287,11 @@ cdef class ParquetWriterOptions:
cdef vector[partition_info] c_partions
cdef PartitionInfo partition

c_partions.reserve(len(partitions))
c_obj.reserve(len(partitions))
for partition in partitions:
c_partions.push_back(partition.c_obj)
c_obj.push_back(partition.c_obj)

self.options.set_partitions(c_partions)
self.c_obj.set_partitions(c_partions)

cpdef void set_column_chunks_file_paths(self, list file_paths):
"""
Expand All @@ -299,7 +306,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_column_chunks_file_paths([fp.encode() for fp in file_paths])
self.c_obj.set_column_chunks_file_paths([fp.encode() for fp in file_paths])

cpdef void set_row_group_size_bytes(self, int size_bytes):
"""
Expand All @@ -314,7 +321,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_row_group_size_bytes(size_bytes)
self.c_obj.set_row_group_size_bytes(size_bytes)

cpdef void set_row_group_size_rows(self, int size_rows):
"""
Expand All @@ -329,7 +336,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_row_group_size_rows(size_rows)
self.c_obj.set_row_group_size_rows(size_rows)

cpdef void set_max_page_size_bytes(self, int size_bytes):
"""
Expand All @@ -344,7 +351,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_max_page_size_bytes(size_bytes)
self.c_obj.set_max_page_size_bytes(size_bytes)

cpdef void set_max_page_size_rows(self, int size_rows):
"""
Expand All @@ -359,7 +366,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_max_page_size_rows(size_rows)
self.c_obj.set_max_page_size_rows(size_rows)

cpdef void set_max_dictionary_size(self, int size_rows):
"""
Expand All @@ -374,7 +381,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_max_dictionary_size(size_rows)
self.c_obj.set_max_dictionary_size(size_rows)


cdef class ParquetWriterOptionsBuilder:
Expand All @@ -392,7 +399,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.metadata(metadata.c_obj)
self.c_obj.metadata(metadata.c_obj)
return self

cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata):
Expand All @@ -408,7 +415,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.key_value_metadata(
self.c_obj.key_value_metadata(
[
{key.encode(): value.encode() for key, value in mapping.items()}
for mapping in metadata
Expand All @@ -429,7 +436,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.compression(compression)
self.c_obj.compression(compression)
return self

cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf):
Expand All @@ -445,7 +452,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.stats_level(sf)
self.c_obj.stats_level(sf)
return self

cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled):
Expand All @@ -461,7 +468,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.int96_timestamps(enabled)
self.c_obj.int96_timestamps(enabled)
return self

cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled):
Expand All @@ -477,7 +484,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.write_v2_headers(enabled)
self.c_obj.write_v2_headers(enabled)
return self

cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy_t val):
Expand All @@ -493,7 +500,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.dictionary_policy(val)
self.c_obj.dictionary_policy(val)
return self

cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled):
Expand All @@ -509,7 +516,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.utc_timestamps(enabled)
self.c_obj.utc_timestamps(enabled)
return self

cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled):
Expand All @@ -525,7 +532,7 @@ cdef class ParquetWriterOptionsBuilder:
-------
Self
"""
self.builder.write_arrow_schema(enabled)
self.c_obj.write_arrow_schema(enabled)
return self

cpdef ParquetWriterOptions build(self):
Expand All @@ -539,44 +546,11 @@ cdef class ParquetWriterOptionsBuilder:
cdef ParquetWriterOptions parquet_options = ParquetWriterOptions.__new__(
ParquetWriterOptions
)
parquet_options.options = move(self.builder.build())
parquet_options.c_obj = move(self.c_obj.build())
return parquet_options


cdef class BufferArrayFromVector:
@staticmethod
cdef BufferArrayFromVector from_unique_ptr(
unique_ptr[vector[uint8_t]] in_vec
):
cdef BufferArrayFromVector buf = BufferArrayFromVector()
buf.in_vec = move(in_vec)
buf.length = dereference(buf.in_vec).size()
return buf

def __getbuffer__(self, Py_buffer *buffer, int flags):
cdef Py_ssize_t itemsize = sizeof(uint8_t)

self.shape[0] = self.length
self.strides[0] = 1

buffer.buf = dereference(self.in_vec).data()

buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = itemsize
buffer.len = self.length * itemsize # product(shape) * itemsize
buffer.ndim = 1
buffer.obj = self
buffer.readonly = 0
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL

def __releasebuffer__(self, Py_buffer *buffer):
pass


cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options):
cpdef HostBuffer write_parquet(ParquetWriterOptions options):
"""
Writes a set of columns to parquet format.
Expand All @@ -587,15 +561,15 @@ cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options):
Returns
-------
BufferArrayFromVector
pylibcudf.contiguous_split.HostBuffer
A blob that contains the file metadata
(parquet FileMetadata thrift message) if requested in
parquet_writer_options (empty blob otherwise).
"""
cdef parquet_writer_options c_options = options.options
cdef parquet_writer_options c_options = options.c_obj
cdef unique_ptr[vector[uint8_t]] c_result

with nogil:
c_result = cpp_write_parquet(c_options)

return BufferArrayFromVector.from_unique_ptr(move(c_result))
return HostBuffer.from_unique_ptr(move(c_result))
29 changes: 9 additions & 20 deletions python/pylibcudf/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,17 @@ __all__ = [
cdef class PartitionInfo:
"""
Information used while writing partitioned datasets.
"""

@staticmethod
def from_start_and_rows(int start_row, int num_rows):
return PartitionInfo.from_start_and_num(start_row, num_rows)

@staticmethod
cdef PartitionInfo from_start_and_num(int start_row, int num_rows):
"""
Construct a PartitionInfo.
Parameters
----------
start_row : int
The start row of the partition.
Parameters
----------
start_row : int
The start row of the partition.
num_rows : int
The number of rows in the partition.
"""
cdef PartitionInfo parition_info = PartitionInfo.__new__(PartitionInfo)
parition_info.c_obj = partition_info(start_row, num_rows)
return parition_info
num_rows : int
The number of rows in the partition.
"""
def __init__(int start_row, int num_rows):
self.c_obj = partition_info(start_row, num_rows)


cdef class ColumnInMetadata:
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_read_parquet_filters(
@pytest.mark.parametrize("write_arrow_schema", [True, False])
@pytest.mark.parametrize(
"partitions",
[None, [plc.io.types.PartitionInfo.from_start_and_rows(0, 10)]],
[None, [plc.io.types.PartitionInfo(0, 10)]],
)
@pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]])
@pytest.mark.parametrize("row_group_size_bytes", [None, 100])
Expand Down

0 comments on commit 097decb

Please sign in to comment.