Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-14932: [Python] Add python bindings for JSON streaming reader #45084

Merged
merged 7 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/python/api/formats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ JSON Files

ReadOptions
ParseOptions
open_json
read_json

.. _api.parquet:
Expand Down
12 changes: 12 additions & 0 deletions docs/source/python/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,15 @@ and pass it to :func:`read_json`. For example, you can pass an explicit

Similarly, you can choose performance settings by passing a
:class:`ReadOptions` instance to :func:`read_json`.


Incremental reading
-------------------

For memory-constrained environments, it is also possible to read a JSON file
one batch at a time, using :func:`open_json`.

In this case, type inference is done on the first block and types are frozen afterwards.
To make sure the right data types are inferred, either set
:attr:`ReadOptions.block_size` to a large enough value, or use
:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.
2 changes: 1 addition & 1 deletion python/pyarrow/_csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None, parse_options=None,
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate Table memory from
Pool to allocate RecordBatch memory from

Returns
-------
Expand Down
78 changes: 77 additions & 1 deletion python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, MemoryPool,

from pyarrow.lib cimport (_Weakrefable, Schema,
RecordBatchReader, MemoryPool,
maybe_unbox_memory_pool,
get_input_stream, pyarrow_wrap_table,
pyarrow_wrap_schema, pyarrow_unwrap_schema)
Expand Down Expand Up @@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out):
out[0] = parse_options.options


cdef class JSONStreamingReader(RecordBatchReader):
pan-x-c marked this conversation as resolved.
Show resolved Hide resolved
"""An object that reads record batches incrementally from a JSON file.

Should not be instantiated directly by user code.
"""
cdef readonly:
Schema schema

def __init__(self):
raise TypeError(f"Do not call {self.__class__.__name__}'s "
"constructor directly, "
"use pyarrow.json.open_json() instead.")

cdef _open(self, shared_ptr[CInputStream] stream,
CJSONReadOptions c_read_options,
CJSONParseOptions c_parse_options,
MemoryPool memory_pool):
cdef:
shared_ptr[CSchema] c_schema
CIOContext io_context

io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))

with nogil:
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
CJSONStreamingReader.Make(stream, move(c_read_options),
move(c_parse_options), io_context))
c_schema = self.reader.get().schema()

self.schema = pyarrow_wrap_schema(c_schema)


def read_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Expand Down Expand Up @@ -308,3 +342,45 @@ def read_json(input_file, read_options=None, parse_options=None,
table = GetResultValue(reader.get().Read())

return pyarrow_wrap_table(table)


def open_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Open a streaming reader of JSON data.

Reading using this function is always single-threaded.

Parameters
----------
input_file : string, path or file-like object
The location of JSON data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options : pyarrow.json.ReadOptions, optional
Options for the JSON reader (see pyarrow.json.ReadOptions constructor
for defaults)
parse_options : pyarrow.json.ParseOptions, optional
Options for the JSON parser
(see pyarrow.json.ParseOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate RecordBatch memory from

Returns
-------
:class:`pyarrow.json.JSONStreamingReader`
"""
cdef:
shared_ptr[CInputStream] stream
CJSONReadOptions c_read_options
CJSONParseOptions c_parse_options
JSONStreamingReader reader

_get_reader(input_file, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)

reader = JSONStreamingReader.__new__(JSONStreamingReader)
reader._open(stream, move(c_read_options), move(c_parse_options),
memory_pool)
return reader
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,13 @@ cdef extern from "arrow/json/reader.h" namespace "arrow::json" nogil:

CResult[shared_ptr[CTable]] Read()

cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"(
CRecordBatchReader):
@staticmethod
CResult[shared_ptr[CJSONStreamingReader]] Make(
shared_ptr[CInputStream],
CJSONReadOptions, CJSONParseOptions, CIOContext)


cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil:

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
# under the License.


from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa
from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json # noqa
2 changes: 1 addition & 1 deletion python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def read_bytes(self, b, **kwargs):
"""
:param b: bytes to be parsed
:param kwargs: arguments passed on to open the csv file
:return: b parsed as a single RecordBatch
:return: b parsed as a single Table
"""
raise NotImplementedError

Expand Down
Loading
Loading