diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index d36dad67abbaa..9d4711b2eb0ac 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -21,7 +21,9 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * -from pyarrow.lib cimport (_Weakrefable, MemoryPool, + +from pyarrow.lib cimport (_Weakrefable, Schema, + RecordBatchReader, MemoryPool, maybe_unbox_memory_pool, get_input_stream, pyarrow_wrap_table, pyarrow_wrap_schema, pyarrow_unwrap_schema) @@ -265,6 +267,37 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out): else: out[0] = parse_options.options +cdef class JSONStreamingReader(RecordBatchReader): + """An object that reads record batches incrementally from a JSON file. + + Should not be instantiated directly by user code. + """ + cdef readonly: + Schema schema + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, " + "use pyarrow.json.open_json() instead." + .format(self.__class__.__name__)) + + cdef _open(self, shared_ptr[CInputStream] stream, + CJSONReadOptions c_read_options, + CJSONParseOptions c_parse_options, + MemoryPool memory_pool): + cdef: + shared_ptr[CSchema] c_schema + CIOContext io_context + + io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) + + with nogil: + self.reader = GetResultValue( + CJSONStreamingReader.Make(stream, move(c_read_options), + move(c_parse_options), io_context)) + c_schema = self.reader.get().schema() + + self.schema = pyarrow_wrap_schema(c_schema) + def read_json(input_file, read_options=None, parse_options=None, MemoryPool memory_pool=None): @@ -308,3 +341,45 @@ def read_json(input_file, read_options=None, parse_options=None, table = GetResultValue(reader.get().Read()) return pyarrow_wrap_table(table) + + +def open_json(input_file, read_options=None, parse_options=None, + MemoryPool memory_pool=None): + """ + Open a streaming reader of JSON data. + + Reading using this function is always single-threaded. + + Parameters + ---------- + input_file : string, path or file-like object + The location of JSON data. If a string or path, and if it ends + with a recognized compressed file extension (e.g. ".gz" or ".bz2"), + the data is automatically decompressed when reading. + read_options : pyarrow.json.ReadOptions, optional + Options for the JSON reader (see pyarrow.json.ReadOptions constructor + for defaults) + parse_options : pyarrow.json.ParseOptions, optional + Options for the JSON parser + (see pyarrow.json.ParseOptions constructor for defaults) + memory_pool : MemoryPool, optional + Pool to allocate Table memory from + + Returns + ------- + :class:`pyarrow.json.JSONStreamingReader` + """ + cdef: + shared_ptr[CInputStream] stream + CJSONReadOptions c_read_options + CJSONParseOptions c_parse_options + JSONStreamingReader reader + + _get_reader(input_file, &stream) + _get_read_options(read_options, &c_read_options) + _get_parse_options(parse_options, &c_parse_options) + + reader = JSONStreamingReader.__new__(JSONStreamingReader) + reader._open(stream, move(c_read_options), move(c_parse_options), + memory_pool) + return reader diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index b2edeb0b4192f..4e94801038a68 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2173,6 +2173,13 @@ cdef extern from "arrow/json/reader.h" namespace "arrow::json" nogil: CResult[shared_ptr[CTable]] Read() + cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"( + CRecordBatchReader): + @staticmethod + CResult[shared_ptr[CJSONStreamingReader]] Make( + shared_ptr[CInputStream], + CJSONReadOptions, CJSONParseOptions, CIOContext) + cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil: diff --git a/python/pyarrow/json.py b/python/pyarrow/json.py index a864f5d998a44..24e604613500c 100644 --- a/python/pyarrow/json.py +++ b/python/pyarrow/json.py @@ -16,4 +16,4 @@ # under the License. -from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa +from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json # noqa diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 978c92307a69e..e1a1cc0ce9d51 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import abc from collections import OrderedDict from decimal import Decimal import io @@ -30,7 +31,7 @@ import pytest import pyarrow as pa -from pyarrow.json import read_json, ReadOptions, ParseOptions +from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions def generate_col_names(): @@ -111,26 +112,20 @@ def test_parse_options(pickle_module): unexpected_field_behavior="ignore") -class BaseTestJSONRead: - +class BaseTestJSON(abc.ABC): + @abc.abstractmethod def read_bytes(self, b, **kwargs): - return self.read_json(pa.py_buffer(b), **kwargs) + """ + :param b: bytes to be parsed + :param kwargs: arguments passed on to open the json file + :return: b parsed as a single RecordBatch + """ + raise NotImplementedError def check_names(self, table, names): assert table.num_columns == len(names) assert [c.name for c in table.columns] == names - def test_file_object(self): - data = b'{"a": 1, "b": 2}\n' - expected_data = {'a': [1], 'b': [2]} - bio = io.BytesIO(data) - table = self.read_json(bio) - assert table.to_pydict() == expected_data - # Text files not allowed - sio = io.StringIO(data.decode()) - with pytest.raises(TypeError): - self.read_json(sio) - def test_block_sizes(self): rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' read_options = ReadOptions() @@ -229,25 +224,6 @@ def test_empty_rows(self): assert table.num_columns == 0 assert table.num_rows == 2 - def test_reconcile_across_blocks(self): - # ARROW-12065: reconciling inferred types across blocks - first_row = b'{ }\n' - read_options = ReadOptions(block_size=len(first_row)) - for next_rows, expected_pylist in [ - (b'{"a": 0}', [None, 0]), - (b'{"a": []}', [None, []]), - (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), - (b'{"a": {}}', [None, {}]), - (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', - [None, {"b": None}, {"b": {"c": 1}}]), - ]: - table = self.read_bytes(first_row + next_rows, - read_options=read_options) - expected = {"a": expected_pylist} - assert table.to_pydict() == expected - # Check that the issue was exercised - assert table.column("a").num_chunks > 1 - def test_explicit_schema_decimal(self): rows = (b'{"a": 1}\n' b'{"a": 1.45}\n' @@ -339,6 +315,281 @@ def test_stress_block_sizes(self): assert table.to_pydict() == expected.to_pydict() +class BaseTestJSONRead(BaseTestJSON): + + def read_bytes(self, b, **kwargs): + return self.read_json(pa.py_buffer(b), **kwargs) + + def test_file_object(self): + data = b'{"a": 1, "b": 2}\n' + expected_data = {'a': [1], 'b': [2]} + bio = io.BytesIO(data) + table = self.read_json(bio) + assert table.to_pydict() == expected_data + # Text files not allowed + sio = io.StringIO(data.decode()) + with pytest.raises(TypeError): + self.read_json(sio) + + def test_reconcile_across_blocks(self): + # ARROW-12065: reconciling inferred types across blocks + first_row = b'{ }\n' + read_options = ReadOptions(block_size=len(first_row)) + for next_rows, expected_pylist in [ + (b'{"a": 0}', [None, 0]), + (b'{"a": []}', [None, []]), + (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), + (b'{"a": {}}', [None, {}]), + (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', + [None, {"b": None}, {"b": {"c": 1}}]), + ]: + table = self.read_bytes(first_row + next_rows, + read_options=read_options) + expected = {"a": expected_pylist} + assert table.to_pydict() == expected + # Check that the issue was exercised + assert table.column("a").num_chunks > 1 + + +class BaseTestStreamingJSONRead(BaseTestJSON): + def open_json(self, json, *args, **kwargs): + """ + Reads the JSON file into memory using pyarrow's open_json + json The JSON bytes + args Positional arguments to be forwarded to pyarrow's open_json + kwargs Keyword arguments to be forwarded to pyarrow's open_json + """ + read_options = kwargs.setdefault('read_options', ReadOptions()) + read_options.use_threads = self.use_threads + return open_json(json, *args, **kwargs) + + def open_bytes(self, b, **kwargs): + return self.open_json(pa.py_buffer(b), **kwargs) + + def check_reader(self, reader, expected_schema, expected_data): + assert reader.schema == expected_schema + batches = list(reader) + assert len(batches) == len(expected_data) + for batch, expected_batch in zip(batches, expected_data): + batch.validate(full=True) + assert batch.schema == expected_schema + assert batch.to_pydict() == expected_batch + + def read_bytes(self, b, **kwargs): + return self.open_bytes(b, **kwargs).read_all() + + def test_file_object(self): + data = b'{"a": 1, "b": 2}\n' + expected_data = {'a': [1], 'b': [2]} + bio = io.BytesIO(data) + reader = self.open_json(bio) + expected_schema = pa.schema([('a', pa.int64()), + ('b', pa.int64())]) + self.check_reader(reader, expected_schema, [expected_data]) + + def test_bad_first_chunk(self): + bad_first_chunk = b'{"i": 0 }\n{"i": 1}' + read_options = ReadOptions() + read_options.block_size = 3 + with pytest.raises( + pa.ArrowInvalid, + match="straddling object straddles two block boundaries*" + ): + self.open_bytes(bad_first_chunk, read_options=read_options) + + def test_bad_middle_chunk(self): + bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}' + read_options = ReadOptions() + read_options.block_size = 10 + expected_schema = pa.schema([('i', pa.int64())]) + + reader = self.open_bytes(bad_middle_chunk, read_options=read_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'i': [0] + } + with pytest.raises( + pa.ArrowInvalid, + match="straddling object straddles two block boundaries*" + ): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_bad_first_parse(self): + bad_first_block = b'{"n": }\n{"n": 10000}' + read_options = ReadOptions() + read_options.block_size = 16 + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: Invalid value.*"): + self.open_bytes(bad_first_block, read_options=read_options) + + def test_bad_middle_parse_after_empty(self): + bad_first_block = b'{ }{"n": }\n{"n": 10000}' + read_options = ReadOptions() + read_options.block_size = 16 + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: Invalid value.*"): + self.open_bytes(bad_first_block, read_options=read_options) + + def test_bad_middle_parse(self): + bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}' + read_options = ReadOptions() + read_options.block_size = 10 + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes(bad_middle_chunk, read_options=read_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [1000] + } + with pytest.raises( + pa.ArrowInvalid, + match="JSON parse error:\ + Missing a comma or '}' after an object member*" + ): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_first_block(self): + bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_bad_first_block(self): + bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_bad_middle_block(self): + bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + assert reader.read_next_batch().to_pydict() == { + 'n': [1] + } + + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_ignore_leading_empty_blocks(self): + leading_empty_chunk = b' \n{"b": true, "s": "foo"}' + explicit_schema = pa.schema([ + ('b', pa.bool_()), + ('s', pa.utf8()) + ]) + read_options = ReadOptions(block_size=24) + parse_options = ParseOptions(explicit_schema=explicit_schema) + expected_data = { + 'b': [True], 's': ["foo"] + } + + reader = self.open_bytes( + leading_empty_chunk, + read_options=read_options, + parse_options=parse_options) + self.check_reader(reader, explicit_schema, [expected_data]) + + def test_inference(self): + rows = b'{"a": 0, "b": "foo" }\n\ + {"a": 1, "c": true }\n{"a": 2, "d": 4.0}' + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()) + ]) + expected_data = {'a': [0], 'b': ["foo"]} + + read_options = ReadOptions(block_size=32) + parse_options = ParseOptions(unexpected_field_behavior="infer") + reader = self.open_bytes( + rows, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: unexpected field"): + reader.read_next_batch() + + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()), + ('c', pa.bool_()), + ]) + expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]} + read_options = ReadOptions(block_size=64) + reader = self.open_bytes(rows, read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: unexpected field"): + reader.read_next_batch() + + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()), + ('c', pa.bool_()), + ('d', pa.float64()), + ]) + expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None], + 'c': [None, True, None], 'd': [None, None, 4.0]} + read_options = ReadOptions(block_size=96) + reader = self.open_bytes(rows, read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + + class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): def read_json(self, *args, **kwargs): @@ -357,3 +608,17 @@ def read_json(self, *args, **kwargs): table = read_json(*args, **kwargs) table.validate(full=True) return table + + +class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): + + @property + def use_threads(self): + return False + + +@pytest.mark.threading +class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): + @property + def use_threads(self): + return True