|
21 | 21 |
|
22 | 22 | from pyarrow.includes.common cimport * |
23 | 23 | from pyarrow.includes.libarrow cimport * |
24 | | -from pyarrow.lib cimport (_Weakrefable, MemoryPool, |
| 24 | + |
| 25 | +from pyarrow.lib cimport (_Weakrefable, Schema, |
| 26 | + RecordBatchReader, MemoryPool, |
25 | 27 | maybe_unbox_memory_pool, |
26 | 28 | get_input_stream, pyarrow_wrap_table, |
27 | 29 | pyarrow_wrap_schema, pyarrow_unwrap_schema) |
@@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out): |
266 | 268 | out[0] = parse_options.options |
267 | 269 |
|
268 | 270 |
|
| 271 | +cdef class JSONStreamingReader(RecordBatchReader): |
| 272 | + """An object that reads record batches incrementally from a JSON file. |
| 273 | +
|
| 274 | + Should not be instantiated directly by user code. |
| 275 | + """ |
| 276 | + cdef readonly: |
| 277 | + Schema schema |
| 278 | + |
| 279 | + def __init__(self): |
| 280 | + raise TypeError(f"Do not call {self.__class__.__name__}'s " |
| 281 | + "constructor directly, " |
| 282 | + "use pyarrow.json.open_json() instead.") |
| 283 | + |
| 284 | + cdef _open(self, shared_ptr[CInputStream] stream, |
| 285 | + CJSONReadOptions c_read_options, |
| 286 | + CJSONParseOptions c_parse_options, |
| 287 | + MemoryPool memory_pool): |
| 288 | + cdef: |
| 289 | + shared_ptr[CSchema] c_schema |
| 290 | + CIOContext io_context |
| 291 | + |
| 292 | + io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) |
| 293 | + |
| 294 | + with nogil: |
| 295 | + self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue( |
| 296 | + CJSONStreamingReader.Make(stream, move(c_read_options), |
| 297 | + move(c_parse_options), io_context)) |
| 298 | + c_schema = self.reader.get().schema() |
| 299 | + |
| 300 | + self.schema = pyarrow_wrap_schema(c_schema) |
| 301 | + |
| 302 | + |
269 | 303 | def read_json(input_file, read_options=None, parse_options=None, |
270 | 304 | MemoryPool memory_pool=None): |
271 | 305 | """ |
@@ -308,3 +342,45 @@ def read_json(input_file, read_options=None, parse_options=None, |
308 | 342 | table = GetResultValue(reader.get().Read()) |
309 | 343 |
|
310 | 344 | return pyarrow_wrap_table(table) |
| 345 | + |
| 346 | + |
| 347 | +def open_json(input_file, read_options=None, parse_options=None, |
| 348 | + MemoryPool memory_pool=None): |
| 349 | + """ |
| 350 | + Open a streaming reader of JSON data. |
| 351 | +
|
| 352 | + Reading using this function is always single-threaded. |
| 353 | +
|
| 354 | + Parameters |
| 355 | + ---------- |
| 356 | + input_file : string, path or file-like object |
| 357 | + The location of JSON data. If a string or path, and if it ends |
| 358 | + with a recognized compressed file extension (e.g. ".gz" or ".bz2"), |
| 359 | + the data is automatically decompressed when reading. |
| 360 | + read_options : pyarrow.json.ReadOptions, optional |
| 361 | + Options for the JSON reader (see pyarrow.json.ReadOptions constructor |
| 362 | + for defaults) |
| 363 | + parse_options : pyarrow.json.ParseOptions, optional |
| 364 | + Options for the JSON parser |
| 365 | + (see pyarrow.json.ParseOptions constructor for defaults) |
| 366 | + memory_pool : MemoryPool, optional |
| 367 | + Pool to allocate RecordBatch memory from |
| 368 | +
|
| 369 | + Returns |
| 370 | + ------- |
| 371 | + :class:`pyarrow.json.JSONStreamingReader` |
| 372 | + """ |
| 373 | + cdef: |
| 374 | + shared_ptr[CInputStream] stream |
| 375 | + CJSONReadOptions c_read_options |
| 376 | + CJSONParseOptions c_parse_options |
| 377 | + JSONStreamingReader reader |
| 378 | + |
| 379 | + _get_reader(input_file, &stream) |
| 380 | + _get_read_options(read_options, &c_read_options) |
| 381 | + _get_parse_options(parse_options, &c_parse_options) |
| 382 | + |
| 383 | + reader = JSONStreamingReader.__new__(JSONStreamingReader) |
| 384 | + reader._open(stream, move(c_read_options), move(c_parse_options), |
| 385 | + memory_pool) |
| 386 | + return reader |
0 commit comments