Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-14932: [Python] Add python bindings for JSON streaming reader #45084

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, MemoryPool,

from pyarrow.lib cimport (_Weakrefable, Schema,
RecordBatchReader, MemoryPool,
maybe_unbox_memory_pool,
get_input_stream, pyarrow_wrap_table,
pyarrow_wrap_schema, pyarrow_unwrap_schema)
Expand Down Expand Up @@ -265,6 +267,37 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out):
else:
out[0] = parse_options.options

cdef class JSONStreamingReader(RecordBatchReader):
"""An object that reads record batches incrementally from a JSON file.

Should not be instantiated directly by user code.
"""
cdef readonly:
Schema schema

def __init__(self):
raise TypeError("Do not call {}'s constructor directly, "
"use pyarrow.json.open_json() instead."
.format(self.__class__.__name__))

cdef _open(self, shared_ptr[CInputStream] stream,
CJSONReadOptions c_read_options,
CJSONParseOptions c_parse_options,
MemoryPool memory_pool):
cdef:
shared_ptr[CSchema] c_schema
CIOContext io_context

io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))

with nogil:
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
CJSONStreamingReader.Make(stream, move(c_read_options),
move(c_parse_options), io_context))
c_schema = self.reader.get().schema()

self.schema = pyarrow_wrap_schema(c_schema)


def read_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
Expand Down Expand Up @@ -308,3 +341,45 @@ def read_json(input_file, read_options=None, parse_options=None,
table = GetResultValue(reader.get().Read())

return pyarrow_wrap_table(table)


def open_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Open a streaming reader of JSON data.

Reading using this function is always single-threaded.

Parameters
----------
input_file : string, path or file-like object
The location of JSON data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options : pyarrow.json.ReadOptions, optional
Options for the JSON reader (see pyarrow.json.ReadOptions constructor
for defaults)
parse_options : pyarrow.json.ParseOptions, optional
Options for the JSON parser
(see pyarrow.json.ParseOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate Table memory from

Returns
-------
:class:`pyarrow.json.JSONStreamingReader`
"""
cdef:
shared_ptr[CInputStream] stream
CJSONReadOptions c_read_options
CJSONParseOptions c_parse_options
JSONStreamingReader reader

_get_reader(input_file, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)

reader = JSONStreamingReader.__new__(JSONStreamingReader)
reader._open(stream, move(c_read_options), move(c_parse_options),
memory_pool)
return reader
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,13 @@ cdef extern from "arrow/json/reader.h" namespace "arrow::json" nogil:

CResult[shared_ptr[CTable]] Read()

cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"(
CRecordBatchReader):
@staticmethod
CResult[shared_ptr[CJSONStreamingReader]] Make(
shared_ptr[CInputStream],
CJSONReadOptions, CJSONParseOptions, CIOContext)


cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil:

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
# under the License.


from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa
from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json # noqa
Loading
Loading