From c63d4215497aecfc3cc4c9ed0f09ff9ccf0b5d71 Mon Sep 17 00:00:00 2001 From: Akshay Subramanian Date: Tue, 17 Jan 2023 17:31:23 -0500 Subject: [PATCH 1/7] GH-14932: Add python bindings for JSON streaming reader --- python/pyarrow/_json.pyx | 77 ++++++- python/pyarrow/includes/libarrow.pxd | 7 + python/pyarrow/json.py | 2 +- python/pyarrow/tests/test_json.py | 310 +++++++++++++++++++++++++-- 4 files changed, 378 insertions(+), 18 deletions(-) diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index d36dad67abbaa..9d4711b2eb0ac 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -21,7 +21,9 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * -from pyarrow.lib cimport (_Weakrefable, MemoryPool, + +from pyarrow.lib cimport (_Weakrefable, Schema, + RecordBatchReader, MemoryPool, maybe_unbox_memory_pool, get_input_stream, pyarrow_wrap_table, pyarrow_wrap_schema, pyarrow_unwrap_schema) @@ -265,6 +267,37 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out): else: out[0] = parse_options.options +cdef class JSONStreamingReader(RecordBatchReader): + """An object that reads record batches incrementally from a JSON file. + + Should not be instantiated directly by user code. + """ + cdef readonly: + Schema schema + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly, " + "use pyarrow.json.open_json() instead." + .format(self.__class__.__name__)) + + cdef _open(self, shared_ptr[CInputStream] stream, + CJSONReadOptions c_read_options, + CJSONParseOptions c_parse_options, + MemoryPool memory_pool): + cdef: + shared_ptr[CSchema] c_schema + CIOContext io_context + + io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) + + with nogil: + self.reader = GetResultValue( + CJSONStreamingReader.Make(stream, move(c_read_options), + move(c_parse_options), io_context)) + c_schema = self.reader.get().schema() + + self.schema = pyarrow_wrap_schema(c_schema) + def read_json(input_file, read_options=None, parse_options=None, MemoryPool memory_pool=None): @@ -308,3 +341,45 @@ def read_json(input_file, read_options=None, parse_options=None, table = GetResultValue(reader.get().Read()) return pyarrow_wrap_table(table) + + +def open_json(input_file, read_options=None, parse_options=None, + MemoryPool memory_pool=None): + """ + Open a streaming reader of JSON data. + + Reading using this function is always single-threaded. + + Parameters + ---------- + input_file : string, path or file-like object + The location of JSON data. If a string or path, and if it ends + with a recognized compressed file extension (e.g. ".gz" or ".bz2"), + the data is automatically decompressed when reading. + read_options : pyarrow.json.ReadOptions, optional + Options for the JSON reader (see pyarrow.json.ReadOptions constructor + for defaults) + parse_options : pyarrow.json.ParseOptions, optional + Options for the JSON parser + (see pyarrow.json.ParseOptions constructor for defaults) + memory_pool : MemoryPool, optional + Pool to allocate Table memory from + + Returns + ------- + :class:`pyarrow.json.JSONStreamingReader` + """ + cdef: + shared_ptr[CInputStream] stream + CJSONReadOptions c_read_options + CJSONParseOptions c_parse_options + JSONStreamingReader reader + + _get_reader(input_file, &stream) + _get_read_options(read_options, &c_read_options) + _get_parse_options(parse_options, &c_parse_options) + + reader = JSONStreamingReader.__new__(JSONStreamingReader) + reader._open(stream, move(c_read_options), move(c_parse_options), + memory_pool) + return reader diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index b2edeb0b4192f..4e94801038a68 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2173,6 +2173,13 @@ cdef extern from "arrow/json/reader.h" namespace "arrow::json" nogil: CResult[shared_ptr[CTable]] Read() + cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"( + CRecordBatchReader): + @staticmethod + CResult[shared_ptr[CJSONStreamingReader]] Make( + shared_ptr[CInputStream], + CJSONReadOptions, CJSONParseOptions, CIOContext) + cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil: diff --git a/python/pyarrow/json.py b/python/pyarrow/json.py index a864f5d998a44..24e604613500c 100644 --- a/python/pyarrow/json.py +++ b/python/pyarrow/json.py @@ -16,4 +16,4 @@ # under the License. -from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa +from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json # noqa diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 978c92307a69e..73736f89a0b1b 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import abc from collections import OrderedDict from decimal import Decimal import io @@ -30,7 +31,7 @@ import pytest import pyarrow as pa -from pyarrow.json import read_json, ReadOptions, ParseOptions +from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions def generate_col_names(): @@ -110,27 +111,20 @@ def test_parse_options(pickle_module): newlines_in_values=False, unexpected_field_behavior="ignore") - -class BaseTestJSONRead: - +class BaseTestJSON(abc.ABC): + @abc.abstractmethod def read_bytes(self, b, **kwargs): - return self.read_json(pa.py_buffer(b), **kwargs) + """ + :param b: bytes to be parsed + :param kwargs: arguments passed on to open the csv file + :return: b parsed as a single RecordBatch + """ + raise NotImplementedError def check_names(self, table, names): assert table.num_columns == len(names) assert [c.name for c in table.columns] == names - def test_file_object(self): - data = b'{"a": 1, "b": 2}\n' - expected_data = {'a': [1], 'b': [2]} - bio = io.BytesIO(data) - table = self.read_json(bio) - assert table.to_pydict() == expected_data - # Text files not allowed - sio = io.StringIO(data.decode()) - with pytest.raises(TypeError): - self.read_json(sio) - def test_block_sizes(self): rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' read_options = ReadOptions() @@ -338,6 +332,280 @@ def test_stress_block_sizes(self): # Better error output assert table.to_pydict() == expected.to_pydict() +class BaseTestJSONRead(BaseTestJSON): + + def read_bytes(self, b, **kwargs): + return self.read_json(pa.py_buffer(b), **kwargs) + + def test_file_object(self): + data = b'{"a": 1, "b": 2}\n' + expected_data = {'a': [1], 'b': [2]} + bio = io.BytesIO(data) + table = self.read_json(bio) + assert table.to_pydict() == expected_data + # Text files not allowed + sio = io.StringIO(data.decode()) + with pytest.raises(TypeError): + self.read_json(sio) + + def test_reconcile_accross_blocks(self): + # ARROW-12065: reconciling inferred types across blocks + first_row = b'{ }\n' + read_options = ReadOptions(block_size=len(first_row)) + for next_rows, expected_pylist in [ + (b'{"a": 0}', [None, 0]), + (b'{"a": []}', [None, []]), + (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), + (b'{"a": {}}', [None, {}]), + (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', + [None, {"b": None}, {"b": {"c": 1}}]), + ]: + table = self.read_bytes(first_row + next_rows, + read_options=read_options) + expected = {"a": expected_pylist} + assert table.to_pydict() == expected + # Check that the issue was exercised + assert table.column("a").num_chunks > 1 + + +class BaseTestStreamingJSONRead(BaseTestJSON): + def open_json(self, json, *args, **kwargs): + """ + Reads the JSON file into memory using pyarrow's open_json + json The JSON bytes + args Positional arguments to be forwarded to pyarrow's open_json + kwargs Keyword arguments to be forwarded to pyarrow's open_json + """ + read_options = kwargs.setdefault('read_options', ReadOptions()) + read_options.use_threads = self.use_threads + return open_json(json, *args, **kwargs) + + def open_bytes(self, b, **kwargs): + return self.open_json(pa.py_buffer(b), **kwargs) + + def check_reader(self, reader, expected_schema, expected_data): + assert reader.schema == expected_schema + batches = list(reader) + assert len(batches) == len(expected_data) + for batch, expected_batch in zip(batches, expected_data): + batch.validate(full=True) + assert batch.schema == expected_schema + assert batch.to_pydict() == expected_batch + + def read_bytes(self, b, **kwargs): + return self.open_bytes(b, **kwargs).read_all() + + def test_file_object(self): + data = b'{"a": 1, "b": 2}\n' + expected_data = {'a': [1], 'b': [2]} + bio = io.BytesIO(data) + reader = self.open_json(bio) + expected_schema = pa.schema([('a', pa.int64()), + ('b', pa.int64())]) + self.check_reader(reader, expected_schema, [expected_data]) + + def test_bad_first_chunk(self): + bad_first_chunk = b'{"i": 0 }\n{"i": 1}' + read_options = ReadOptions() + read_options.block_size = 3 + with pytest.raises( + pa.ArrowInvalid, + match="straddling object straddles two block boundaries*" + ): + self.open_bytes(bad_first_chunk, read_options=read_options) + + def test_bad_middle_chunk(self): + bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}' + read_options = ReadOptions() + read_options.block_size = 10 + expected_schema = pa.schema([('i', pa.int64())]) + + reader = self.open_bytes(bad_middle_chunk, read_options=read_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'i': [0] + } + with pytest.raises( + pa.ArrowInvalid, + match="straddling object straddles two block boundaries*" + ): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_bad_first_parse(self): + bad_first_block = b'{"n": }\n{"n": 10000}' + read_options = ReadOptions() + read_options.block_size = 16 + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: Invalid value.*"): + self.open_bytes(bad_first_block, read_options=read_options) + + def test_bad_middle_parse_after_empty(self): + bad_first_block = b'{ }{"n": }\n{"n": 10000}' + read_options = ReadOptions() + read_options.block_size = 16 + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: Invalid value.*"): + self.open_bytes(bad_first_block, read_options=read_options) + + def test_bad_middle_parse(self): + bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}' + read_options = ReadOptions() + read_options.block_size = 10 + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes(bad_middle_chunk, read_options=read_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [1000] + } + with pytest.raises( + pa.ArrowInvalid, + match="JSON parse error:\ + Missing a comma or '}' after an object member*" + ): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_first_block(self): + bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_bad_first_block(self): + bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_non_linewise_chunker_bad_middle_block(self): + bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}' + read_options = ReadOptions(block_size=10) + parse_options = ParseOptions(newlines_in_values=True) + expected_schema = pa.schema([('n', pa.int64())]) + + reader = self.open_bytes( + bad_middle_chunk, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == { + 'n': [0] + } + assert reader.read_next_batch().to_pydict() == { + 'n': [1] + } + + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error *"): + reader.read_next_batch() + + with pytest.raises(StopIteration): + reader.read_next_batch() + + def test_ignore_leading_empty_blocks(self): + leading_empty_chunk = b' \n{"b": true, "s": "foo"}' + explicit_schema = pa.schema([ + ('b', pa.bool_()), + ('s', pa.utf8()) + ]) + read_options = ReadOptions(block_size=24) + parse_options = ParseOptions(explicit_schema=explicit_schema) + expected_data = { + 'b': [True], 's': ["foo"] + } + + reader = self.open_bytes( + leading_empty_chunk, + read_options=read_options, + parse_options=parse_options) + self.check_reader(reader, explicit_schema, [expected_data]) + + def test_inference(self): + rows = b'{"a": 0, "b": "foo" }\n\ + {"a": 1, "c": true }\n{"a": 2, "d": 4.0}' + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()) + ]) + expected_data = {'a': [0], 'b': ["foo"]} + + read_options = ReadOptions(block_size=32) + parse_options = ParseOptions(unexpected_field_behavior="infer") + reader = self.open_bytes( + rows, + read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: unexpected field"): + reader.read_next_batch() + + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()), + ('c', pa.bool_()), + ]) + expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]} + read_options = ReadOptions(block_size=64) + reader = self.open_bytes(rows, read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + with pytest.raises(pa.ArrowInvalid, + match="JSON parse error: unexpected field"): + reader.read_next_batch() + + expected_schema = pa.schema([ + ('a', pa.int64()), + ('b', pa.utf8()), + ('c', pa.bool_()), + ('d', pa.float64()), + ]) + expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None], + 'c': [None, True, None], 'd': [None, None, 4.0]} + read_options = ReadOptions(block_size=96) + reader = self.open_bytes(rows, read_options=read_options, + parse_options=parse_options) + assert reader.schema == expected_schema + assert reader.read_next_batch().to_pydict() == expected_data + class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): @@ -357,3 +625,13 @@ def read_json(self, *args, **kwargs): table = read_json(*args, **kwargs) table.validate(full=True) return table + + +class TestSerialStreamingJSONRead( + BaseTestStreamingJSONRead, + unittest.TestCase +): + + @property + def use_threads(self): + return False From 7c0938c03e0453f2f8d71b0d6739ab5b52520ed7 Mon Sep 17 00:00:00 2001 From: pxc Date: Fri, 20 Dec 2024 13:35:15 +0800 Subject: [PATCH 2/7] Add tests with JSONStreamingReader --- python/pyarrow/tests/test_json.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 73736f89a0b1b..8195af239b933 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -111,12 +111,13 @@ def test_parse_options(pickle_module): newlines_in_values=False, unexpected_field_behavior="ignore") + class BaseTestJSON(abc.ABC): @abc.abstractmethod def read_bytes(self, b, **kwargs): """ :param b: bytes to be parsed - :param kwargs: arguments passed on to open the csv file + :param kwargs: arguments passed on to open the json file :return: b parsed as a single RecordBatch """ raise NotImplementedError @@ -332,11 +333,12 @@ def test_stress_block_sizes(self): # Better error output assert table.to_pydict() == expected.to_pydict() + class BaseTestJSONRead(BaseTestJSON): def read_bytes(self, b, **kwargs): return self.read_json(pa.py_buffer(b), **kwargs) - + def test_file_object(self): data = b'{"a": 1, "b": 2}\n' expected_data = {'a': [1], 'b': [2]} @@ -347,7 +349,7 @@ def test_file_object(self): sio = io.StringIO(data.decode()) with pytest.raises(TypeError): self.read_json(sio) - + def test_reconcile_accross_blocks(self): # ARROW-12065: reconciling inferred types across blocks first_row = b'{ }\n' @@ -627,11 +629,15 @@ def read_json(self, *args, **kwargs): return table -class TestSerialStreamingJSONRead( - BaseTestStreamingJSONRead, - unittest.TestCase -): +class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): @property def use_threads(self): return False + + +@pytest.mark.threading +class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): + @property + def use_threads(self): + return True From cf576b790584f839475dc4ac7feaef1049ee651b Mon Sep 17 00:00:00 2001 From: pxc Date: Fri, 20 Dec 2024 06:49:02 +0000 Subject: [PATCH 3/7] Fix test_reconcile_across_blocks --- python/pyarrow/tests/test_json.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index 8195af239b933..db9a59d647071 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -608,6 +608,25 @@ def test_inference(self): assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == expected_data + def test_reconcile_across_blocks(self): + # Modified from BaseTestJSON.test_reconcile_across_blocks + # Because in `open_json`, the inference is done on the first block. + # The inferred schema will be used to parse the rest of the data, + # which will fail if the schema is not compatible with the following + # blocks. + first_row = b'{ }\n' + read_options = ReadOptions(block_size=len(first_row)) + for next_rows in [ + b'{"a": 0}', + b'{"a": []}', + b'{"a": []}\n{"a": [[1]]}', + b'{"a": {}}', + b'{"a": {}}\n{"a": {"b": {"c": 1}}}', + ]: + with pytest.raises(pa.ArrowInvalid): + _ = self.read_bytes(first_row + next_rows, + read_options=read_options) + class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): From ebb7d4fbba0fd520a2066b456411d3d5f0bed131 Mon Sep 17 00:00:00 2001 From: pxc Date: Tue, 7 Jan 2025 07:11:48 +0000 Subject: [PATCH 4/7] Fix test_json --- python/pyarrow/tests/test_json.py | 40 +------------------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index db9a59d647071..e1a1cc0ce9d51 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -224,25 +224,6 @@ def test_empty_rows(self): assert table.num_columns == 0 assert table.num_rows == 2 - def test_reconcile_across_blocks(self): - # ARROW-12065: reconciling inferred types across blocks - first_row = b'{ }\n' - read_options = ReadOptions(block_size=len(first_row)) - for next_rows, expected_pylist in [ - (b'{"a": 0}', [None, 0]), - (b'{"a": []}', [None, []]), - (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), - (b'{"a": {}}', [None, {}]), - (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', - [None, {"b": None}, {"b": {"c": 1}}]), - ]: - table = self.read_bytes(first_row + next_rows, - read_options=read_options) - expected = {"a": expected_pylist} - assert table.to_pydict() == expected - # Check that the issue was exercised - assert table.column("a").num_chunks > 1 - def test_explicit_schema_decimal(self): rows = (b'{"a": 1}\n' b'{"a": 1.45}\n' @@ -350,7 +331,7 @@ def test_file_object(self): with pytest.raises(TypeError): self.read_json(sio) - def test_reconcile_accross_blocks(self): + def test_reconcile_across_blocks(self): # ARROW-12065: reconciling inferred types across blocks first_row = b'{ }\n' read_options = ReadOptions(block_size=len(first_row)) @@ -608,25 +589,6 @@ def test_inference(self): assert reader.schema == expected_schema assert reader.read_next_batch().to_pydict() == expected_data - def test_reconcile_across_blocks(self): - # Modified from BaseTestJSON.test_reconcile_across_blocks - # Because in `open_json`, the inference is done on the first block. - # The inferred schema will be used to parse the rest of the data, - # which will fail if the schema is not compatible with the following - # blocks. - first_row = b'{ }\n' - read_options = ReadOptions(block_size=len(first_row)) - for next_rows in [ - b'{"a": 0}', - b'{"a": []}', - b'{"a": []}\n{"a": [[1]]}', - b'{"a": {}}', - b'{"a": {}}\n{"a": {"b": {"c": 1}}}', - ]: - with pytest.raises(pa.ArrowInvalid): - _ = self.read_bytes(first_row + next_rows, - read_options=read_options) - class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): From 5abd9123f2b25ca0e9161cc2ba2ec2482fc31426 Mon Sep 17 00:00:00 2001 From: pxc Date: Thu, 6 Feb 2025 09:51:46 +0800 Subject: [PATCH 5/7] Fix comments --- python/pyarrow/_csv.pyx | 2 +- python/pyarrow/_json.pyx | 9 +++++---- python/pyarrow/tests/test_csv.py | 2 +- python/pyarrow/tests/test_json.py | 11 ++++------- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 508488c0c3b3c..e53c6d1847566 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None, parse_options=None, Options for converting CSV data (see pyarrow.csv.ConvertOptions constructor for defaults) memory_pool : MemoryPool, optional - Pool to allocate Table memory from + Pool to allocate RecordBatch memory from Returns ------- diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 9d4711b2eb0ac..c023baeec1c82 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -267,6 +267,7 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out): else: out[0] = parse_options.options + cdef class JSONStreamingReader(RecordBatchReader): """An object that reads record batches incrementally from a JSON file. @@ -276,9 +277,9 @@ cdef class JSONStreamingReader(RecordBatchReader): Schema schema def __init__(self): - raise TypeError("Do not call {}'s constructor directly, " - "use pyarrow.json.open_json() instead." - .format(self.__class__.__name__)) + raise TypeError(f"Do not call {self.__class__.__name__}'s " + "constructor directly, " + "use pyarrow.json.open_json() instead.") cdef _open(self, shared_ptr[CInputStream] stream, CJSONReadOptions c_read_options, @@ -363,7 +364,7 @@ def open_json(input_file, read_options=None, parse_options=None, Options for the JSON parser (see pyarrow.json.ParseOptions constructor for defaults) memory_pool : MemoryPool, optional - Pool to allocate Table memory from + Pool to allocate RecordBatch memory from Returns ------- diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 6a36b41daf302..239ae55f2f760 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -387,7 +387,7 @@ def read_bytes(self, b, **kwargs): """ :param b: bytes to be parsed :param kwargs: arguments passed on to open the csv file - :return: b parsed as a single RecordBatch + :return: b parsed as a single Table """ raise NotImplementedError diff --git a/python/pyarrow/tests/test_json.py b/python/pyarrow/tests/test_json.py index e1a1cc0ce9d51..c3f9fe333bd02 100644 --- a/python/pyarrow/tests/test_json.py +++ b/python/pyarrow/tests/test_json.py @@ -118,7 +118,7 @@ def read_bytes(self, b, **kwargs): """ :param b: bytes to be parsed :param kwargs: arguments passed on to open the json file - :return: b parsed as a single RecordBatch + :return: b parsed as a single Table """ raise NotImplementedError @@ -612,13 +612,10 @@ def read_json(self, *args, **kwargs): class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): - @property - def use_threads(self): - return False + use_threads = False @pytest.mark.threading class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): - @property - def use_threads(self): - return True + + use_threads = True From 8ca2578d98a3ed99372323da2017a2cbe27641c2 Mon Sep 17 00:00:00 2001 From: pxc Date: Thu, 6 Feb 2025 10:12:51 +0800 Subject: [PATCH 6/7] Update doc for open_json --- docs/source/python/api/formats.rst | 1 + docs/source/python/json.rst | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/docs/source/python/api/formats.rst b/docs/source/python/api/formats.rst index 86e2585ac2537..a4cf3bbcdd3ad 100644 --- a/docs/source/python/api/formats.rst +++ b/docs/source/python/api/formats.rst @@ -66,6 +66,7 @@ JSON Files ReadOptions ParseOptions + open_json read_json .. _api.parquet: diff --git a/docs/source/python/json.rst b/docs/source/python/json.rst index eff6135d895a7..55beb325caf1f 100644 --- a/docs/source/python/json.rst +++ b/docs/source/python/json.rst @@ -115,3 +115,20 @@ and pass it to :func:`read_json`. For example, you can pass an explicit Similarly, you can choose performance settings by passing a :class:`ReadOptions` instance to :func:`read_json`. + + +Incremental reading +------------------- + +For memory-constrained environments, it is also possible to read a JSON file +one batch at a time, using :func:`open_json`. + +There are a few caveats: + +1. For now, the incremental reader is always single-threaded (regardless of + :attr:`ReadOptions.use_threads`) + +2. Type inference is done on the first block and types are frozen afterwards; + to make sure the right data types are inferred, either set + :attr:`ReadOptions.block_size` to a large enough value, or use + :attr:`ParseOptions.explicit_schema` to set the desired data types explicitly. From 1ff6e3f4f99b02ffff825718f7428edd794075a5 Mon Sep 17 00:00:00 2001 From: pxc Date: Thu, 6 Feb 2025 11:12:05 +0000 Subject: [PATCH 7/7] Fix doc of open_json --- docs/source/python/json.rst | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/docs/source/python/json.rst b/docs/source/python/json.rst index 55beb325caf1f..277b8e134947f 100644 --- a/docs/source/python/json.rst +++ b/docs/source/python/json.rst @@ -123,12 +123,7 @@ Incremental reading For memory-constrained environments, it is also possible to read a JSON file one batch at a time, using :func:`open_json`. -There are a few caveats: - -1. For now, the incremental reader is always single-threaded (regardless of - :attr:`ReadOptions.use_threads`) - -2. Type inference is done on the first block and types are frozen afterwards; - to make sure the right data types are inferred, either set - :attr:`ReadOptions.block_size` to a large enough value, or use - :attr:`ParseOptions.explicit_schema` to set the desired data types explicitly. +In this case, type inference is done on the first block and types are frozen afterwards. +To make sure the right data types are inferred, either set +:attr:`ReadOptions.block_size` to a large enough value, or use +:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.