Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37390: [Python] Expose StreamDecoder API to pyarrow #37371

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,22 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:

CIpcWriteStats stats()

cdef cppclass CStreamListener " arrow::ipc::Listener":

CStatus OnEOS()
CStatus OnRecordBatchDecoded(shared_ptr[CRecordBatch])
CStatus OnSchemaDecoded(shared_ptr[CSchema])

cdef cppclass CStreamDecoder " arrow::ipc::StreamDecoder":

CStreamDecoder(const shared_ptr[CStreamListener], const CIpcReadOptions&)

int64_t next_required_size()
CIpcReadStats stats()
shared_ptr[CSchema] schema()
CStatus Consume(uint8_t* data, int64_t size)
CStatus ConsumeBuffer" Consume"(shared_ptr[CBuffer] buffer)

cdef cppclass CRecordBatchStreamReader \
" arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow_python.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ cdef extern from "arrow/python/ipc.h" namespace "arrow::py":
CResult[shared_ptr[CRecordBatchReader]] Make(shared_ptr[CSchema],
object)

cdef cppclass CPyStreamListenerProxy" arrow::py::PyStreamListenerProxy" \
(CStreamListener):
CPyStreamListenerProxy(PyObject* listener_impl)


cdef extern from "arrow/python/ipc.h" namespace "arrow::py" nogil:
cdef cppclass CCastingRecordBatchReader" arrow::py::CastingRecordBatchReader" \
Expand Down
20 changes: 20 additions & 0 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,26 @@ cdef class RecordBatchReader(_Weakrefable):
self.reader = c_reader
return self

cdef class _StreamDecoder:
cdef:
CIpcReadOptions options
shared_ptr[CStreamDecoder] decoder
shared_ptr[CPyStreamListenerProxy] proxy

def __cinit__(self):
pass

def _open(self, listener, IpcReadOptions options=IpcReadOptions()):
self.options = options.c_options
self.proxy = make_shared[CPyStreamListenerProxy](<PyObject*>listener)
self.decoder = make_shared[CStreamDecoder](self.proxy, self.options)

def next_required_size(self):
return self.decoder.get().next_required_size()

def consume_buffer(self, Buffer buffer):
check_status(self.decoder.get().ConsumeBuffer(pyarrow_unwrap_buffer(buffer)))


cdef class _RecordBatchStreamReader(RecordBatchReader):
cdef:
Expand Down
40 changes: 40 additions & 0 deletions python/pyarrow/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# Arrow file and stream reader/writer classes, and other messaging tools

from abc import abstractmethod
import os

import pyarrow as pa
Expand Down Expand Up @@ -52,6 +53,45 @@ def __init__(self, source, *, options=None, memory_pool=None):
self._open(source, options=options, memory_pool=memory_pool)


class StreamListener:
"""
Abstract class for which an implementation instance
may be provided to a StreamDecoder
"""
@abstractmethod
def OnEOS(self):
...

@abstractmethod
def OnRecordBatchDecoded(self, batch):
...

@abstractmethod
def OnSchemaDecoded(self, schema):
...


class StreamDecoder(lib._StreamDecoder):
"""
Expermental StreamDecoder API

Parameters
----------
listener : StreamListener
An instance of a StreamListener
options : pyarrow.ipc.IpcWriteOptions
Options for IPC serialization.
memory_pool : MemoryPool, default None
If None, default memory pool is used.
"""

def __init__(self, listener: StreamListener, *, options=None, memory_pool=None):
if not isinstance(listener, StreamListener):
raise TypeError("Expected listener to be a subclass of StreamListener")
options = _ensure_default_ipc_read_options(options)
self._open(listener, options=options)


_ipc_writer_class_doc = """\
Parameters
----------
Expand Down
26 changes: 26 additions & 0 deletions python/pyarrow/src/arrow/python/ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ipc.h"

#include <memory>
#include <stdexcept>

#include "arrow/compute/cast.h"
#include "arrow/python/pyarrow.h"
Expand Down Expand Up @@ -64,6 +65,31 @@ Result<std::shared_ptr<RecordBatchReader>> PyRecordBatchReader::Make(
return reader;
}


inline static PyObject* OnEOSName = PyUnicode_InternFromString("OnEOS");
inline static PyObject* OnRecordBatchDecodedName =
PyUnicode_InternFromString("OnRecordBatchDecoded");
inline static PyObject* OnSchemaDecodedName =
PyUnicode_InternFromString("OnSchemaDecoded");

Status PyStreamListenerProxy::OnEOS() {
PyObject_CallMethodNoArgs(impl_.obj(), OnEOSName);
RETURN_IF_PYERROR();
return Status::OK();
}
Status PyStreamListenerProxy::OnRecordBatchDecoded(std::shared_ptr<RecordBatch> batch) {
PyObject_CallMethodObjArgs(impl_.obj(), OnRecordBatchDecodedName, wrap_batch(batch),
NULL);
RETURN_IF_PYERROR();
return Status::OK();
}
Status PyStreamListenerProxy::OnSchemaDecoded(std::shared_ptr<Schema> schema) {
PyObject_CallMethodObjArgs(impl_.obj(), OnSchemaDecodedName, wrap_schema(schema), NULL);
RETURN_IF_PYERROR();
return Status::OK();
}
PyStreamListenerProxy::PyStreamListenerProxy(PyObject* obj) : impl_{obj} {}

CastingRecordBatchReader::CastingRecordBatchReader() = default;

Status CastingRecordBatchReader::Init(std::shared_ptr<RecordBatchReader> parent,
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/src/arrow/python/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>

#include "arrow/ipc/reader.h"
#include "arrow/python/common.h"
#include "arrow/python/visibility.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -48,6 +49,19 @@ class ARROW_PYTHON_EXPORT PyRecordBatchReader : public RecordBatchReader {
OwnedRefNoGIL iterator_;
};


class ARROW_PYTHON_EXPORT PyStreamListenerProxy : public ::arrow::ipc::Listener {
public:
PyStreamListenerProxy(PyObject*);
Status OnEOS() override;
Status OnRecordBatchDecoded(std::shared_ptr<RecordBatch> batch) override;
Status OnSchemaDecoded(std::shared_ptr<Schema> schema) override;

private:
OwnedRefNoGIL impl_;
};


class ARROW_PYTHON_EXPORT CastingRecordBatchReader : public RecordBatchReader {
public:
std::shared_ptr<Schema> schema() const override;
Expand Down
33 changes: 33 additions & 0 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,39 @@ def test_open_file_from_buffer(file_fixture):
assert reader3.stats == st1


def test_stream_decoder(stream_fixture):
stream_fixture.write_batches()
source = stream_fixture.get_source()

class Listener(pa.ipc.StreamListener):
def __init__(self):
self._batches = []

def OnEOS(self):
pass

def OnRecordBatchDecoded(self, batch):
self._batches.append(batch)

def OnSchemaDecoded(self, schema):
pass

standard_reader = pa.RecordBatchStreamReader(source)
result1 = standard_reader.read_all()
assert result1.num_rows == 25

listener = Listener()

decoder = pa.ipc.StreamDecoder(listener)

with pa.input_stream(pa.BufferReader(stream_fixture.get_source())) as stream:
buff = stream.read_buffer()
decoder.consume_buffer(buff)

result2 = pa.Table.from_batches(listener._batches)
assert result1.equals(result2)


@pytest.mark.pandas
def test_file_read_pandas(file_fixture):
frames = [batch.to_pandas() for batch in file_fixture.write_batches()]
Expand Down
Loading