From 3b99ed308449e677f93f961217fbe0b2cccade97 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 3 Aug 2023 17:57:45 -0700 Subject: [PATCH 01/15] Add failing tests --- daft/runners/partitioning.py | 18 ++++++++++++++ daft/table/table_io.py | 12 +++++++--- tests/table/table_io/test_parquet.py | 36 ++++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 06c086e29d..f4543dcb99 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys import weakref from abc import abstractmethod from dataclasses import dataclass @@ -11,6 +12,11 @@ from daft.logical.schema import Schema from daft.table import Table +if sys.version_info < (3, 8): + from typing_extensions import Literal +else: + from typing import Literal + if TYPE_CHECKING: import pandas as pd @@ -30,6 +36,18 @@ class TableReadOptions: column_names: list[str] | None = None +@dataclass(frozen=True) +class TableParseParquetOptions: + """Options for parsing Parquet + + Args: + int96_timestamps_precision: Precision to use when reading int96 timestamps, defaults to "ns" which + means that Parquet INT96 timestamps outside the range of years 1678-2262 will overflow. + """ + + int96_timestamps_precision: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns" + + @dataclass(frozen=True) class TableParseCSVOptions: """Options for parsing CSVs diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 2ad10ac9a8..c7e61f166d 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -17,7 +17,11 @@ from daft.expressions import ExpressionsProjection from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema -from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions +from daft.runners.partitioning import ( + TableParseCSVOptions, + TableParseParquetOptions, + TableReadOptions, +) from daft.table import Table if TYPE_CHECKING: @@ -97,6 +101,7 @@ def read_parquet( schema: Schema, fs: fsspec.AbstractFileSystem | None = None, read_options: TableReadOptions = TableReadOptions(), + parquet_options: TableParseParquetOptions = TableParseParquetOptions(), io_config: IOConfig | None = None, use_native_downloader: bool = False, ) -> Table: @@ -132,11 +137,11 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile(f) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile(f) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -152,6 +157,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, + coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision, ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index e810f1e403..4985065b59 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -12,7 +12,7 @@ import daft from daft.datatype import DataType, TimeUnit from daft.logical.schema import Schema -from daft.runners.partitioning import TableReadOptions +from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions from daft.table import Table, schema_inference, table_io @@ -158,7 +158,7 @@ def test_parquet_read_data_select_columns(use_native_downloader): @pytest.mark.parametrize("use_native_downloader", [True, False]) @pytest.mark.parametrize("use_deprecated_int96_timestamps", [True, False]) -def test_parquet_read_timestamps(use_native_downloader, use_deprecated_int96_timestamps): +def test_parquet_read_int96_timestamps(use_native_downloader, use_deprecated_int96_timestamps): data = { "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), "timestamp_us": pa.array([1, 2, 3], pa.timestamp("us")), @@ -188,3 +188,35 @@ def test_parquet_read_timestamps(use_native_downloader, use_deprecated_int96_tim use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" + + +@pytest.mark.parametrize("use_native_downloader", [True, False]) +@pytest.mark.parametrize("coerce_to", ["ms", "us"]) +def test_parquet_read_int96_timestamps_overflow(use_native_downloader, coerce_to): + # NOTE: datetime.datetime(3000, 1, 1) and datetime.datetime(1000, 1, 1) cannot be represented by our timestamp64(nanosecond) + # type. However they can be written to Parquet's INT96 type. Here we test that a round-trip is possible if provided with + # the appropriate flags. + data = { + "timestamp": pa.array( + [datetime.datetime(1000, 1, 1), datetime.datetime(2000, 1, 1), datetime.datetime(3000, 1, 1)], + pa.timestamp(coerce_to), + ), + } + schema = [ + ("timestamp", DataType.timestamp(getattr(TimeUnit, coerce_to)())), + ] + + with _parquet_write_helper( + pa.Table.from_pydict(data), + papq_write_table_kwargs={"use_deprecated_int96_timestamps": True}, + ) as f: + schema = Schema._from_field_name_and_types(schema) + expected = Table.from_pydict(data) + table = table_io.read_parquet( + f, + schema, + read_options=TableReadOptions(column_names=schema.column_names()), + parquet_options=TableParseParquetOptions(int96_timestamps_precision=coerce_to), + use_native_downloader=use_native_downloader, + ) + assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" From 0cae450e6e19411e15e702314249d813d65f344d Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 4 Aug 2023 11:39:29 -0700 Subject: [PATCH 02/15] Refactor our Parquet read code to thread the int96_timestamps_coerce_to_unit kwarg through to arrow2 reads --- daft/runners/partitioning.py | 4 +-- daft/table/table.py | 16 +++++++++- daft/table/table_io.py | 7 ++-- src/daft-parquet/src/file.rs | 48 ++++++++++++++++++++-------- src/daft-parquet/src/python.rs | 29 ++++++++++++++++- src/daft-parquet/src/read.rs | 20 +++++++++--- tests/table/table_io/test_parquet.py | 2 +- 7 files changed, 100 insertions(+), 26 deletions(-) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index f4543dcb99..dd9e5426c4 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -41,11 +41,11 @@ class TableParseParquetOptions: """Options for parsing Parquet Args: - int96_timestamps_precision: Precision to use when reading int96 timestamps, defaults to "ns" which + int96_timestamps_coerce_to_unit: Precision to use when reading int96 timestamps, defaults to "ns" which means that Parquet INT96 timestamps outside the range of years 1678-2262 will overflow. """ - int96_timestamps_precision: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns" + int96_timestamps_coerce_to_unit: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns" @dataclass(frozen=True) diff --git a/daft/table/table.py b/daft/table/table.py index 63dd4171ea..c70765f760 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from typing import TYPE_CHECKING, Any import pyarrow as pa @@ -15,6 +16,11 @@ from daft.logical.schema import Schema from daft.series import Series +if sys.version_info < (3, 8): + from typing_extensions import Literal +else: + from typing import Literal + _NUMPY_AVAILABLE = True try: import numpy as np @@ -352,10 +358,18 @@ def read_parquet( columns: list[str] | None = None, start_offset: int | None = None, num_rows: int | None = None, + int96_timestamps_coerce_to_unit: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns", io_config: IOConfig | None = None, ) -> Table: return Table._from_pytable( - _read_parquet(uri=path, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config) + _read_parquet( + uri=path, + columns=columns, + start_offset=start_offset, + num_rows=num_rows, + int96_timestamps_coerce_to_unit=int96_timestamps_coerce_to_unit, + io_config=io_config, + ) ) @classmethod diff --git a/daft/table/table_io.py b/daft/table/table_io.py index c7e61f166d..8dc0493c4e 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -122,6 +122,7 @@ def read_parquet( str(file), columns=read_options.column_names, num_rows=read_options.num_rows, + int96_timestamps_coerce_to_unit=parquet_options.int96_timestamps_coerce_to_unit, io_config=io_config, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) @@ -137,11 +138,11 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -157,7 +158,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, - coerce_int96_timestamp_unit=parquet_options.int96_timestamps_precision, + coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit, ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 7e303214b7..64429bb1e9 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::infer_schema; use common_error::DaftResult; -use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{datatypes::TimeUnit, utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::IOClient; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; @@ -20,13 +20,14 @@ use crate::{ UnableToParseSchemaFromMetadataSnafu, }; use arrow2::io::parquet::read::column_iter_to_arrays; + pub(crate) struct ParquetReaderBuilder { uri: String, metadata: parquet2::metadata::FileMetaData, - arrow_schema: arrow2::datatypes::Schema, selected_columns: Option>, row_start_offset: usize, num_rows: usize, + int96_timestamps_coerce_to_unit: TimeUnit, } use parquet2::read::decompress; @@ -94,16 +95,13 @@ impl ParquetReaderBuilder { let metadata = read_parquet_metadata(uri, size, io_client).await?; let num_rows = metadata.num_rows; - let schema = - infer_schema(&metadata) - .context(UnableToParseSchemaFromMetadataSnafu:: { path: uri.into() })?; Ok(ParquetReaderBuilder { uri: uri.into(), metadata, - arrow_schema: schema, selected_columns: None, row_start_offset: 0, num_rows, + int96_timestamps_coerce_to_unit: TimeUnit::Nanoseconds, }) } @@ -111,16 +109,16 @@ impl ParquetReaderBuilder { &self.metadata } - pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema { - &self.arrow_schema + pub fn parquet_schema(&self) -> &parquet2::metadata::SchemaDescriptor { + self.metadata().schema() } pub fn prune_columns(mut self, columns: &[&str]) -> super::Result { let avail_names = self - .arrow_schema - .fields + .parquet_schema() + .fields() .iter() - .map(|f| f.name.as_str()) + .map(|f| f.name()) .collect::>(); let mut names_to_keep = HashSet::new(); for col_name in columns { @@ -150,7 +148,15 @@ impl ParquetReaderBuilder { Ok(self) } - pub fn build(mut self) -> super::Result { + pub fn set_int96_timestamps_coerce_to_unit( + mut self, + int96_timestamps_coerce_to_unit: &TimeUnit, + ) -> Self { + self.int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit.to_owned(); + self + } + + pub fn build(self) -> super::Result { let mut row_ranges = vec![]; let mut curr_row_index = 0; @@ -174,13 +180,22 @@ impl ParquetReaderBuilder { curr_row_index += rg.num_rows(); } + // TODO(jay): Add arrow2 functionality to perform schema inference by taking into account the + // self.int96_timestamps_coerce_to_unit option, where Parquet int96 fields should be coerced + // into Arrow datetimes with the specified TimeUnit. + let mut arrow_schema = infer_schema(&self.metadata).context( + UnableToParseSchemaFromMetadataSnafu:: { + path: self.uri.clone(), + }, + )?; + if let Some(names_to_keep) = self.selected_columns { - self.arrow_schema + arrow_schema .fields .retain(|f| names_to_keep.contains(f.name.as_str())); } - ParquetFileReader::new(self.uri, self.metadata, self.arrow_schema, row_ranges) + ParquetFileReader::new(self.uri, self.metadata, arrow_schema, row_ranges) } } @@ -213,6 +228,10 @@ impl ParquetFileReader { }) } + pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema { + &self.arrow_schema + } + fn naive_read_plan(&self) -> super::Result { let arrow_fields = &self.arrow_schema.fields; @@ -330,6 +349,7 @@ impl ParquetFileReader { let (send, recv) = tokio::sync::oneshot::channel(); rayon::spawn(move || { + // TODO(jay): Fix arrow2 to handle the pytype=Int96/dtype=Timestamp(ms/us) cases let arr_iter = column_iter_to_arrays( decompressed_iters, ptypes.iter().collect(), diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 363ed2b4b9..b97a0f1dbb 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -3,11 +3,26 @@ use pyo3::prelude::*; pub mod pylib { use std::sync::Arc; + use common_error::{DaftError, DaftResult}; + use daft_core::datatypes::TimeUnit; use daft_core::python::{schema::PySchema, PySeries}; use daft_io::{get_io_client, python::IOConfig}; use daft_table::python::PyTable; use pyo3::{pyfunction, PyResult, Python}; + fn time_unit_from_str(s: &str) -> DaftResult { + match s { + "ns" => Ok(TimeUnit::Nanoseconds), + "us" => Ok(TimeUnit::Microseconds), + "ms" => Ok(TimeUnit::Milliseconds), + "s" => Ok(TimeUnit::Seconds), + _ => Err(DaftError::ValueError(format!( + "Unrecognized TimeUnit {}", + s + ))), + } + } + #[pyfunction] pub fn read_parquet( py: Python, @@ -15,15 +30,19 @@ pub mod pylib { columns: Option>, start_offset: Option, num_rows: Option, + int96_timestamps_coerce_to_unit: Option<&str>, io_config: Option, ) -> PyResult { py.allow_threads(|| { + let int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit + .map_or(Ok(TimeUnit::Nanoseconds), time_unit_from_str)?; let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; Ok(crate::read::read_parquet( uri, columns.as_deref(), start_offset, num_rows, + &int96_timestamps_coerce_to_unit, io_client, )? .into()) @@ -59,10 +78,18 @@ pub mod pylib { py: Python, uri: &str, io_config: Option, + int96_timestamps_coerce_to_unit: Option<&str>, ) -> PyResult { py.allow_threads(|| { + let int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit + .map_or(Ok(TimeUnit::Nanoseconds), time_unit_from_str)?; let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client)?).into()) + Ok(Arc::new(crate::read::read_parquet_schema( + uri, + io_client, + &int96_timestamps_coerce_to_unit, + )?) + .into()) }) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 8487340200..c67d84dbf5 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use common_error::DaftResult; +use daft_core::datatypes::TimeUnit; use daft_core::{ datatypes::{Int32Array, UInt64Array, Utf8Array}, schema::Schema, @@ -19,6 +20,7 @@ async fn read_parquet_single( columns: Option<&[&str]>, start_offset: Option, num_rows: Option, + int96_timestamps_coerce_to_unit: &TimeUnit, io_client: Arc, ) -> DaftResult { let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; @@ -30,8 +32,10 @@ async fn read_parquet_single( }; let builder = builder.limit(start_offset, num_rows)?; + let builder = builder.set_int96_timestamps_coerce_to_unit(int96_timestamps_coerce_to_unit); + let metadata_num_rows = builder.metadata().num_rows; - let metadata_num_columns = builder.arrow_schema().fields.len(); + let metadata_num_columns = builder.parquet_schema().fields().len(); let parquet_reader = builder.build()?; let ranges = parquet_reader.prebuffer_ranges(io_client)?; @@ -123,12 +127,19 @@ pub fn read_parquet_bulk( tables.into_iter().collect::>>() } -pub fn read_parquet_schema(uri: &str, io_client: Arc) -> DaftResult { +pub fn read_parquet_schema( + uri: &str, + io_client: Arc, + int96_timestamps_coerce_to_unit: &TimeUnit, +) -> DaftResult { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); let builder = runtime_handle .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; - Schema::try_from(builder.arrow_schema()) + + let builder = builder.set_int96_timestamps_coerce_to_unit(int96_timestamps_coerce_to_unit); + + Schema::try_from(builder.build()?.arrow_schema()) } pub fn read_parquet_statistics(uris: &Series, io_client: Arc) -> DaftResult
{ @@ -207,6 +218,7 @@ mod tests { use std::sync::Arc; use common_error::DaftResult; + use daft_core::datatypes::TimeUnit; use daft_io::{config::IOConfig, IOClient}; use super::read_parquet; @@ -219,7 +231,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_parquet(file, None, None, None, io_client)?; + let table = read_parquet(file, None, None, None, &TimeUnit::Nanoseconds, io_client)?; assert_eq!(table.len(), 100); Ok(()) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 4985065b59..4c06cfcf3b 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -216,7 +216,7 @@ def test_parquet_read_int96_timestamps_overflow(use_native_downloader, coerce_to f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - parquet_options=TableParseParquetOptions(int96_timestamps_precision=coerce_to), + parquet_options=TableParseParquetOptions(int96_timestamps_coerce_to_unit=coerce_to), use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" From 2b0c0c55244cfb2ba2d4090394c120110c9f9400 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 7 Aug 2023 16:16:10 -0700 Subject: [PATCH 03/15] Group options up --- daft/logical/schema.py | 26 +++++++- daft/runners/partitioning.py | 9 +-- daft/table/table.py | 15 +++-- daft/table/table_io.py | 11 ++-- src/daft-core/src/schema.rs | 12 +++- src/daft-parquet/src/file.rs | 42 +++++++++---- src/daft-parquet/src/python.rs | 91 ++++++++++++++++++++-------- src/daft-parquet/src/read.rs | 31 ++++++++-- tests/dataframe/test_creation.py | 8 +-- tests/table/table_io/test_parquet.py | 59 ++++++++++++++---- 10 files changed, 227 insertions(+), 77 deletions(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index f8552577d0..e73be71832 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -1,11 +1,18 @@ from __future__ import annotations +import sys from typing import TYPE_CHECKING, Iterator from daft.daft import PyField as _PyField +from daft.daft import PyParquetSchemaOptions as _PyParquetSchemaOptions from daft.daft import PySchema as _PySchema from daft.daft import read_parquet_schema as _read_parquet_schema -from daft.datatype import DataType +from daft.datatype import DataType, TimeUnit + +if sys.version_info < (3, 8): + pass +else: + pass if TYPE_CHECKING: from daft.io import IOConfig @@ -119,5 +126,18 @@ def __setstate__(self, state: bytes) -> None: self._schema.__setstate__(state) @classmethod - def from_parquet(cls, path: str, io_config: IOConfig | None = None) -> Schema: - return Schema._from_pyschema(_read_parquet_schema(uri=path, io_config=io_config)) + def from_parquet( + cls, + path: str, + io_config: IOConfig | None = None, + schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(), + ) -> Schema: + return Schema._from_pyschema( + _read_parquet_schema( + uri=path, + io_config=io_config, + schema_options=_PyParquetSchemaOptions( + schema=None, inference_option_int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit + ), + ) + ) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index dd9e5426c4..297f1fdd9f 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -9,13 +9,14 @@ import pyarrow as pa +from daft.datatype import TimeUnit from daft.logical.schema import Schema from daft.table import Table if sys.version_info < (3, 8): - from typing_extensions import Literal + pass else: - from typing import Literal + pass if TYPE_CHECKING: import pandas as pd @@ -41,11 +42,11 @@ class TableParseParquetOptions: """Options for parsing Parquet Args: - int96_timestamps_coerce_to_unit: Precision to use when reading int96 timestamps, defaults to "ns" which + schema_infer_int96_timestamps_time_unit: Precision to use when reading int96 timestamps, defaults to "ns" which means that Parquet INT96 timestamps outside the range of years 1678-2262 will overflow. """ - int96_timestamps_coerce_to_unit: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns" + schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns() @dataclass(frozen=True) diff --git a/daft/table/table.py b/daft/table/table.py index c70765f760..bd5308912f 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -7,19 +7,20 @@ from loguru import logger from daft.arrow_utils import ensure_table +from daft.daft import PyParquetSchemaOptions as _PyParquetSchemaOptions from daft.daft import PyTable as _PyTable from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk from daft.daft import read_parquet_statistics as _read_parquet_statistics -from daft.datatype import DataType +from daft.datatype import DataType, TimeUnit from daft.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema from daft.series import Series if sys.version_info < (3, 8): - from typing_extensions import Literal + pass else: - from typing import Literal + pass _NUMPY_AVAILABLE = True try: @@ -358,8 +359,9 @@ def read_parquet( columns: list[str] | None = None, start_offset: int | None = None, num_rows: int | None = None, - int96_timestamps_coerce_to_unit: Literal["ns"] | Literal["us"] | Literal["ms"] = "ns", io_config: IOConfig | None = None, + schema: Schema | None = None, + schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(), ) -> Table: return Table._from_pytable( _read_parquet( @@ -367,8 +369,11 @@ def read_parquet( columns=columns, start_offset=start_offset, num_rows=num_rows, - int96_timestamps_coerce_to_unit=int96_timestamps_coerce_to_unit, io_config=io_config, + schema_options=_PyParquetSchemaOptions( + schema=schema._schema if schema is not None else None, + inference_option_int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, + ), ) ) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 8dc0493c4e..385f0a8d88 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -122,10 +122,11 @@ def read_parquet( str(file), columns=read_options.column_names, num_rows=read_options.num_rows, - int96_timestamps_coerce_to_unit=parquet_options.int96_timestamps_coerce_to_unit, io_config=io_config, + schema=schema, + schema_infer_int96_timestamps_time_unit=parquet_options.schema_infer_int96_timestamps_time_unit, ) - return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) + return tbl f: IO if not isinstance(file, (str, pathlib.Path)): @@ -138,11 +139,11 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -158,7 +159,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, - coerce_int96_timestamp_unit=parquet_options.int96_timestamps_coerce_to_unit, + coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit, ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-core/src/schema.rs b/src/daft-core/src/schema.rs index 8ddf11cf1e..f867ba25ef 100644 --- a/src/daft-core/src/schema.rs +++ b/src/daft-core/src/schema.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashSet, + collections::{BTreeMap, HashSet}, fmt::{Display, Formatter, Result}, sync::Arc, }; @@ -85,6 +85,16 @@ impl Schema { } } + pub fn to_arrow(&self) -> DaftResult { + let arrow_fields: DaftResult> = + self.fields.iter().map(|(_, f)| f.to_arrow()).collect(); + let arrow_fields = arrow_fields?; + Ok(arrow2::datatypes::Schema { + fields: arrow_fields, + metadata: BTreeMap::new(), + }) + } + pub fn repr_html(&self) -> String { // Produces a
HTML element. diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 64429bb1e9..cd4de5b356 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -27,7 +27,8 @@ pub(crate) struct ParquetReaderBuilder { selected_columns: Option>, row_start_offset: usize, num_rows: usize, - int96_timestamps_coerce_to_unit: TimeUnit, + user_provided_arrow_schema: Option, + schema_infer_int96_timestamps_time_unit: TimeUnit, } use parquet2::read::decompress; @@ -101,7 +102,8 @@ impl ParquetReaderBuilder { selected_columns: None, row_start_offset: 0, num_rows, - int96_timestamps_coerce_to_unit: TimeUnit::Nanoseconds, + user_provided_arrow_schema: None, + schema_infer_int96_timestamps_time_unit: TimeUnit::Nanoseconds, }) } @@ -148,11 +150,20 @@ impl ParquetReaderBuilder { Ok(self) } - pub fn set_int96_timestamps_coerce_to_unit( + pub fn set_schema_infer_int96_timestamps_time_unit( mut self, - int96_timestamps_coerce_to_unit: &TimeUnit, + schema_infer_int96_timestamps_time_unit: &TimeUnit, ) -> Self { - self.int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit.to_owned(); + self.schema_infer_int96_timestamps_time_unit = + schema_infer_int96_timestamps_time_unit.to_owned(); + self + } + + pub fn set_user_provided_arrow_schema( + mut self, + schema: Option, + ) -> Self { + self.user_provided_arrow_schema = schema; self } @@ -180,14 +191,19 @@ impl ParquetReaderBuilder { curr_row_index += rg.num_rows(); } - // TODO(jay): Add arrow2 functionality to perform schema inference by taking into account the - // self.int96_timestamps_coerce_to_unit option, where Parquet int96 fields should be coerced - // into Arrow datetimes with the specified TimeUnit. - let mut arrow_schema = infer_schema(&self.metadata).context( - UnableToParseSchemaFromMetadataSnafu:: { - path: self.uri.clone(), - }, - )?; + let mut arrow_schema = match self.user_provided_arrow_schema { + Some(s) => s, + // TODO(jay): Add arrow2 functionality to perform schema inference by taking into account the + // self.schema_infer_int96_timestamps_time_unit option, where Parquet int96 fields should be coerced + // into Arrow datetimes with the specified TimeUnit. + None => { + infer_schema(&self.metadata).context(UnableToParseSchemaFromMetadataSnafu::< + String, + > { + path: self.uri.clone(), + })? + } + }; if let Some(names_to_keep) = self.selected_columns { arrow_schema diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index b97a0f1dbb..25a582fcec 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -5,21 +5,57 @@ pub mod pylib { use common_error::{DaftError, DaftResult}; use daft_core::datatypes::TimeUnit; + use daft_core::python::datatype::PyTimeUnit; use daft_core::python::{schema::PySchema, PySeries}; use daft_io::{get_io_client, python::IOConfig}; use daft_table::python::PyTable; - use pyo3::{pyfunction, PyResult, Python}; + use pyo3::{pyclass, pyfunction, pymethods, PyResult, Python}; - fn time_unit_from_str(s: &str) -> DaftResult { - match s { - "ns" => Ok(TimeUnit::Nanoseconds), - "us" => Ok(TimeUnit::Microseconds), - "ms" => Ok(TimeUnit::Milliseconds), - "s" => Ok(TimeUnit::Seconds), - _ => Err(DaftError::ValueError(format!( - "Unrecognized TimeUnit {}", - s - ))), + use crate::read::{ParquetSchemaInferenceOptions, ParquetSchemaOptions}; + + const DEFAULT_SCHEMA_OPTIONS: ParquetSchemaOptions = + ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { + int96_timestamps_time_unit: TimeUnit::Nanoseconds, + }); + + /// Python wrapper for ParquetSchemaOptions + /// + /// Represents the options for parsing Schemas from Parquet files. If `schema` is provided, then + /// all the `inference_option_*` will be ignored when parsing Schemas. + #[pyclass] + pub struct PyParquetSchemaOptions { + schema: Option, + inference_option_int96_timestamps_time_unit: Option, + } + + #[pymethods] + impl PyParquetSchemaOptions { + #[new] + fn new( + schema: Option, + inference_option_int96_timestamps_time_unit: Option, + ) -> Self { + PyParquetSchemaOptions { + schema, + inference_option_int96_timestamps_time_unit, + } + } + } + + impl TryFrom<&PyParquetSchemaOptions> for ParquetSchemaOptions { + type Error = DaftError; + fn try_from(value: &PyParquetSchemaOptions) -> DaftResult { + match &value.schema { + Some(s) => Ok(ParquetSchemaOptions::UserProvidedSchema(s.schema.clone())), + None => Ok(ParquetSchemaOptions::InferenceOptions( + ParquetSchemaInferenceOptions { + int96_timestamps_time_unit: value + .inference_option_int96_timestamps_time_unit + .as_ref() + .map_or(TimeUnit::Nanoseconds, |tu| tu.timeunit), + }, + )), + } } } @@ -30,19 +66,21 @@ pub mod pylib { columns: Option>, start_offset: Option, num_rows: Option, - int96_timestamps_coerce_to_unit: Option<&str>, io_config: Option, + schema_options: Option<&PyParquetSchemaOptions>, ) -> PyResult { py.allow_threads(|| { - let int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit - .map_or(Ok(TimeUnit::Nanoseconds), time_unit_from_str)?; let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; + let schema_options = match schema_options { + None => DEFAULT_SCHEMA_OPTIONS, + Some(opts) => opts.try_into()?, + }; Ok(crate::read::read_parquet( uri, columns.as_deref(), start_offset, num_rows, - &int96_timestamps_coerce_to_unit, + schema_options, io_client, )? .into()) @@ -78,18 +116,20 @@ pub mod pylib { py: Python, uri: &str, io_config: Option, - int96_timestamps_coerce_to_unit: Option<&str>, + schema_options: Option<&PyParquetSchemaOptions>, ) -> PyResult { py.allow_threads(|| { - let int96_timestamps_coerce_to_unit = int96_timestamps_coerce_to_unit - .map_or(Ok(TimeUnit::Nanoseconds), time_unit_from_str)?; - let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - Ok(Arc::new(crate::read::read_parquet_schema( - uri, - io_client, - &int96_timestamps_coerce_to_unit, - )?) - .into()) + let schema_options = match schema_options { + None => DEFAULT_SCHEMA_OPTIONS, + Some(opts) => opts.try_into()?, + }; + match schema_options { + ParquetSchemaOptions::UserProvidedSchema(s) => Ok(PySchema { schema: s }), + ParquetSchemaOptions::InferenceOptions(opts) => { + let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; + Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client, &opts)?).into()) + } + } }) } @@ -110,5 +150,6 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?; + parent.add_class::()?; Ok(()) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index c67d84dbf5..83e7e9381e 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -15,12 +15,22 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; + +pub struct ParquetSchemaInferenceOptions { + pub int96_timestamps_time_unit: TimeUnit, +} + +pub enum ParquetSchemaOptions { + UserProvidedSchema(Arc), + InferenceOptions(ParquetSchemaInferenceOptions), +} + async fn read_parquet_single( uri: &str, columns: Option<&[&str]>, start_offset: Option, num_rows: Option, - int96_timestamps_coerce_to_unit: &TimeUnit, + schema_options: ParquetSchemaOptions, io_client: Arc, ) -> DaftResult
{ let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; @@ -32,7 +42,15 @@ async fn read_parquet_single( }; let builder = builder.limit(start_offset, num_rows)?; - let builder = builder.set_int96_timestamps_coerce_to_unit(int96_timestamps_coerce_to_unit); + // Schema inference options + let builder = match schema_options { + ParquetSchemaOptions::UserProvidedSchema(s) => { + builder.set_user_provided_arrow_schema(Some(s.to_arrow()?)) + } + ParquetSchemaOptions::InferenceOptions(opts) => { + builder.set_schema_infer_int96_timestamps_time_unit(&opts.int96_timestamps_time_unit) + } + }; let metadata_num_rows = builder.metadata().num_rows; let metadata_num_columns = builder.parquet_schema().fields().len(); @@ -130,14 +148,16 @@ pub fn read_parquet_bulk( pub fn read_parquet_schema( uri: &str, io_client: Arc, - int96_timestamps_coerce_to_unit: &TimeUnit, + schema_inference_options: &ParquetSchemaInferenceOptions, ) -> DaftResult { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); let builder = runtime_handle .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; - let builder = builder.set_int96_timestamps_coerce_to_unit(int96_timestamps_coerce_to_unit); + let builder = builder.set_schema_infer_int96_timestamps_time_unit( + &schema_inference_options.int96_timestamps_time_unit, + ); Schema::try_from(builder.build()?.arrow_schema()) } @@ -218,7 +238,6 @@ mod tests { use std::sync::Arc; use common_error::DaftResult; - use daft_core::datatypes::TimeUnit; use daft_io::{config::IOConfig, IOClient}; use super::read_parquet; @@ -231,7 +250,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_parquet(file, None, None, None, &TimeUnit::Nanoseconds, io_client)?; + let table = read_parquet(file, None, None, None, io_client)?; assert_eq!(table.len(), 100); Ok(()) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index b30432b319..021192be32 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -748,10 +748,10 @@ def test_create_dataframe_parquet_specify_schema(valid_data: list[dict[str, floa df = daft.read_parquet( f.name, schema_hints={ - "sepal_length": DataType.float32(), - "sepal_width": DataType.float32(), - "petal_length": DataType.float32(), - "petal_width": DataType.float32(), + "sepal_length": DataType.float64(), + "sepal_width": DataType.float64(), + "petal_length": DataType.float64(), + "petal_width": DataType.float64(), "variety": DataType.string(), }, use_native_downloader=use_native_downloader, diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 4c06cfcf3b..0e6dab6924 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -156,9 +156,13 @@ def test_parquet_read_data_select_columns(use_native_downloader): assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" -@pytest.mark.parametrize("use_native_downloader", [True, False]) +### +# Test Parquet Int96 timestamps +### + + @pytest.mark.parametrize("use_deprecated_int96_timestamps", [True, False]) -def test_parquet_read_int96_timestamps(use_native_downloader, use_deprecated_int96_timestamps): +def test_parquet_read_int96_timestamps(use_deprecated_int96_timestamps): data = { "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), "timestamp_us": pa.array([1, 2, 3], pa.timestamp("us")), @@ -177,6 +181,7 @@ def test_parquet_read_int96_timestamps(use_native_downloader, use_deprecated_int papq_write_table_kwargs={ "use_deprecated_int96_timestamps": use_deprecated_int96_timestamps, "coerce_timestamps": "us" if not use_deprecated_int96_timestamps else None, + "store_schema": False, }, ) as f: schema = Schema._from_field_name_and_types(schema) @@ -185,30 +190,29 @@ def test_parquet_read_int96_timestamps(use_native_downloader, use_deprecated_int f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - use_native_downloader=use_native_downloader, + use_native_downloader=True, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" -@pytest.mark.parametrize("use_native_downloader", [True, False]) -@pytest.mark.parametrize("coerce_to", ["ms", "us"]) -def test_parquet_read_int96_timestamps_overflow(use_native_downloader, coerce_to): +@pytest.mark.parametrize("coerce_to", [TimeUnit.ms(), TimeUnit.us()]) +def test_parquet_read_int96_timestamps_overflow(coerce_to): # NOTE: datetime.datetime(3000, 1, 1) and datetime.datetime(1000, 1, 1) cannot be represented by our timestamp64(nanosecond) # type. However they can be written to Parquet's INT96 type. Here we test that a round-trip is possible if provided with # the appropriate flags. data = { "timestamp": pa.array( [datetime.datetime(1000, 1, 1), datetime.datetime(2000, 1, 1), datetime.datetime(3000, 1, 1)], - pa.timestamp(coerce_to), + pa.timestamp(str(coerce_to)), ), } schema = [ - ("timestamp", DataType.timestamp(getattr(TimeUnit, coerce_to)())), + ("timestamp", DataType.timestamp(coerce_to)), ] with _parquet_write_helper( pa.Table.from_pydict(data), - papq_write_table_kwargs={"use_deprecated_int96_timestamps": True}, + papq_write_table_kwargs={"use_deprecated_int96_timestamps": True, "store_schema": False}, ) as f: schema = Schema._from_field_name_and_types(schema) expected = Table.from_pydict(data) @@ -216,7 +220,40 @@ def test_parquet_read_int96_timestamps_overflow(use_native_downloader, coerce_to f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - parquet_options=TableParseParquetOptions(int96_timestamps_coerce_to_unit=coerce_to), - use_native_downloader=use_native_downloader, + parquet_options=TableParseParquetOptions(schema_infer_int96_timestamps_time_unit=coerce_to), + use_native_downloader=True, + ) + assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" + + +def test_parquet_infer_schema_int96_timestamps(): + data = { + "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), + "timestamp_us": pa.array([1, 2, 3], pa.timestamp("us")), + "timestamp_ns": pa.array([1, 2, 3], pa.timestamp("ns")), + } + schema = [ + ("timestamp_ms", DataType.timestamp(TimeUnit.ms())), + ("timestamp_us", DataType.timestamp(TimeUnit.us())), + ("timestamp_ns", DataType.timestamp(TimeUnit.ns())), + ] + + with _parquet_write_helper( + pa.Table.from_pydict(data), + papq_write_table_kwargs={ + "use_deprecated_int96_timestamps": True, + "store_schema": False, + }, + ) as f: + expected = Table.from_pydict(data).eval_expression_list( + [daft.col(c).cast(DataType.timestamp(TimeUnit.ms())) for c, _ in schema] + ) + schema = Schema._from_field_name_and_types(schema) + table = table_io.read_parquet( + f, + None, + read_options=TableReadOptions(column_names=schema.column_names()), + parquet_options=TableParseParquetOptions(schema_infer_int96_timestamps_time_unit=TimeUnit.ms()), + use_native_downloader=True, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" From eb48fc7638fe6a5325f9e9abd98fc845905f79b4 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 7 Aug 2023 16:41:21 -0700 Subject: [PATCH 04/15] Clean up API to avoid Python-Rust glue bloat --- daft/logical/schema.py | 8 ++-- daft/table/table.py | 10 ++-- daft/table/table_io.py | 10 ++-- src/daft-parquet/src/python.rs | 72 ++++++++++++---------------- tests/table/table_io/test_parquet.py | 3 ++ 5 files changed, 52 insertions(+), 51 deletions(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index e73be71832..546bd4b43f 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -4,7 +4,9 @@ from typing import TYPE_CHECKING, Iterator from daft.daft import PyField as _PyField -from daft.daft import PyParquetSchemaOptions as _PyParquetSchemaOptions +from daft.daft import ( + PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions, +) from daft.daft import PySchema as _PySchema from daft.daft import read_parquet_schema as _read_parquet_schema from daft.datatype import DataType, TimeUnit @@ -136,8 +138,8 @@ def from_parquet( _read_parquet_schema( uri=path, io_config=io_config, - schema_options=_PyParquetSchemaOptions( - schema=None, inference_option_int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit + schema_inference_options=_PyParquetSchemaInferenceOptions( + int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, ), ) ) diff --git a/daft/table/table.py b/daft/table/table.py index bd5308912f..3f5a410c54 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -7,7 +7,9 @@ from loguru import logger from daft.arrow_utils import ensure_table -from daft.daft import PyParquetSchemaOptions as _PyParquetSchemaOptions +from daft.daft import ( + PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions, +) from daft.daft import PyTable as _PyTable from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk @@ -370,9 +372,9 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, io_config=io_config, - schema_options=_PyParquetSchemaOptions( - schema=schema._schema if schema is not None else None, - inference_option_int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, + schema=schema._schema if schema is not None else None, + schema_inference_options=_PyParquetSchemaInferenceOptions( + int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, ), ) ) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 385f0a8d88..d06175c62f 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -139,11 +139,15 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit) + pqf = papq.ParquetFile( + f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit) + ) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit) + pqf = papq.ParquetFile( + f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit) + ) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -159,7 +163,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, - coerce_int96_timestamp_unit=parquet_options.schema_infer_int96_timestamps_time_unit, + coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit), ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 25a582fcec..3b603c9c99 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -3,7 +3,6 @@ use pyo3::prelude::*; pub mod pylib { use std::sync::Arc; - use common_error::{DaftError, DaftResult}; use daft_core::datatypes::TimeUnit; use daft_core::python::datatype::PyTimeUnit; use daft_core::python::{schema::PySchema, PySeries}; @@ -18,47 +17,26 @@ pub mod pylib { int96_timestamps_time_unit: TimeUnit::Nanoseconds, }); - /// Python wrapper for ParquetSchemaOptions + /// Python wrapper for ParquetSchemaInferenceOptions /// - /// Represents the options for parsing Schemas from Parquet files. If `schema` is provided, then - /// all the `inference_option_*` will be ignored when parsing Schemas. + /// Represents the options for inferring Schemas from Parquet files #[pyclass] - pub struct PyParquetSchemaOptions { - schema: Option, - inference_option_int96_timestamps_time_unit: Option, + #[derive(Clone)] + pub struct PyParquetSchemaInferenceOptions { + int96_timestamps_time_unit: PyTimeUnit, } #[pymethods] - impl PyParquetSchemaOptions { + impl PyParquetSchemaInferenceOptions { #[new] - fn new( - schema: Option, - inference_option_int96_timestamps_time_unit: Option, - ) -> Self { - PyParquetSchemaOptions { - schema, - inference_option_int96_timestamps_time_unit, - } - } - } - - impl TryFrom<&PyParquetSchemaOptions> for ParquetSchemaOptions { - type Error = DaftError; - fn try_from(value: &PyParquetSchemaOptions) -> DaftResult { - match &value.schema { - Some(s) => Ok(ParquetSchemaOptions::UserProvidedSchema(s.schema.clone())), - None => Ok(ParquetSchemaOptions::InferenceOptions( - ParquetSchemaInferenceOptions { - int96_timestamps_time_unit: value - .inference_option_int96_timestamps_time_unit - .as_ref() - .map_or(TimeUnit::Nanoseconds, |tu| tu.timeunit), - }, - )), + fn new(int96_timestamps_time_unit: PyTimeUnit) -> Self { + PyParquetSchemaInferenceOptions { + int96_timestamps_time_unit, } } } + #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_parquet( py: Python, @@ -67,13 +45,19 @@ pub mod pylib { start_offset: Option, num_rows: Option, io_config: Option, - schema_options: Option<&PyParquetSchemaOptions>, + schema: Option, + schema_inference_options: Option, ) -> PyResult { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - let schema_options = match schema_options { - None => DEFAULT_SCHEMA_OPTIONS, - Some(opts) => opts.try_into()?, + let schema_options = match (schema, schema_inference_options) { + (None, None) => DEFAULT_SCHEMA_OPTIONS, + (Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), + (_, Some(opts)) => { + ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { + int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit, + }) + } }; Ok(crate::read::read_parquet( uri, @@ -116,12 +100,18 @@ pub mod pylib { py: Python, uri: &str, io_config: Option, - schema_options: Option<&PyParquetSchemaOptions>, + schema: Option, + schema_inference_options: Option, ) -> PyResult { py.allow_threads(|| { - let schema_options = match schema_options { - None => DEFAULT_SCHEMA_OPTIONS, - Some(opts) => opts.try_into()?, + let schema_options = match (schema, schema_inference_options) { + (None, None) => DEFAULT_SCHEMA_OPTIONS, + (Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), + (_, Some(opts)) => { + ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { + int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit, + }) + } }; match schema_options { ParquetSchemaOptions::UserProvidedSchema(s) => Ok(PySchema { schema: s }), @@ -150,6 +140,6 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?; - parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 0e6dab6924..646847f1a8 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -226,6 +226,9 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to): assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" +@pytest.mark.skip( + reason="Need to implement `infer_schema` functionality in arrow2, or possibly implement our own custom patch on top" +) def test_parquet_infer_schema_int96_timestamps(): data = { "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), From 3956ec75b966ff283018d73a08241fff2d2ada7d Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 7 Aug 2023 17:44:02 -0700 Subject: [PATCH 05/15] Remove unused schema inference logic and options --- Cargo.lock | 2 +- daft/logical/schema.py | 9 +---- daft/runners/partitioning.py | 13 ------ daft/table/table.py | 9 +---- daft/table/table_io.py | 17 ++------ src/daft-parquet/src/file.rs | 13 +----- src/daft-parquet/src/python.rs | 59 +++++----------------------- src/daft-parquet/src/read.rs | 21 ++++------ tests/table/table_io/test_parquet.py | 39 +----------------- 9 files changed, 25 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8eb0d88695..51cffb817b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.17.1" -source = "git+https://github.com/Eventual-Inc/arrow2?branch=clark/expand-casting-support#46bc134e386f597eed00a0396ea2de224d12943c" +source = "git+https://github.com/Eventual-Inc/arrow2?branch=clark/expand-casting-support#37600dfb67447f2b9f1d4336c1a49af15d59ce2b" dependencies = [ "ahash", "arrow-format", diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 546bd4b43f..77672b6d2a 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -4,12 +4,9 @@ from typing import TYPE_CHECKING, Iterator from daft.daft import PyField as _PyField -from daft.daft import ( - PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions, -) from daft.daft import PySchema as _PySchema from daft.daft import read_parquet_schema as _read_parquet_schema -from daft.datatype import DataType, TimeUnit +from daft.datatype import DataType if sys.version_info < (3, 8): pass @@ -132,14 +129,10 @@ def from_parquet( cls, path: str, io_config: IOConfig | None = None, - schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(), ) -> Schema: return Schema._from_pyschema( _read_parquet_schema( uri=path, io_config=io_config, - schema_inference_options=_PyParquetSchemaInferenceOptions( - int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, - ), ) ) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 297f1fdd9f..d230d5fa3e 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -9,7 +9,6 @@ import pyarrow as pa -from daft.datatype import TimeUnit from daft.logical.schema import Schema from daft.table import Table @@ -37,18 +36,6 @@ class TableReadOptions: column_names: list[str] | None = None -@dataclass(frozen=True) -class TableParseParquetOptions: - """Options for parsing Parquet - - Args: - schema_infer_int96_timestamps_time_unit: Precision to use when reading int96 timestamps, defaults to "ns" which - means that Parquet INT96 timestamps outside the range of years 1678-2262 will overflow. - """ - - schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns() - - @dataclass(frozen=True) class TableParseCSVOptions: """Options for parsing CSVs diff --git a/daft/table/table.py b/daft/table/table.py index 3f5a410c54..7ceac569b0 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -7,14 +7,11 @@ from loguru import logger from daft.arrow_utils import ensure_table -from daft.daft import ( - PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions, -) from daft.daft import PyTable as _PyTable from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk from daft.daft import read_parquet_statistics as _read_parquet_statistics -from daft.datatype import DataType, TimeUnit +from daft.datatype import DataType from daft.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema from daft.series import Series @@ -363,7 +360,6 @@ def read_parquet( num_rows: int | None = None, io_config: IOConfig | None = None, schema: Schema | None = None, - schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(), ) -> Table: return Table._from_pytable( _read_parquet( @@ -373,9 +369,6 @@ def read_parquet( num_rows=num_rows, io_config=io_config, schema=schema._schema if schema is not None else None, - schema_inference_options=_PyParquetSchemaInferenceOptions( - int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit, - ), ) ) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index d06175c62f..125c0b7f61 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -17,11 +17,7 @@ from daft.expressions import ExpressionsProjection from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema -from daft.runners.partitioning import ( - TableParseCSVOptions, - TableParseParquetOptions, - TableReadOptions, -) +from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions from daft.table import Table if TYPE_CHECKING: @@ -101,7 +97,6 @@ def read_parquet( schema: Schema, fs: fsspec.AbstractFileSystem | None = None, read_options: TableReadOptions = TableReadOptions(), - parquet_options: TableParseParquetOptions = TableParseParquetOptions(), io_config: IOConfig | None = None, use_native_downloader: bool = False, ) -> Table: @@ -124,7 +119,6 @@ def read_parquet( num_rows=read_options.num_rows, io_config=io_config, schema=schema, - schema_infer_int96_timestamps_time_unit=parquet_options.schema_infer_int96_timestamps_time_unit, ) return tbl @@ -139,15 +133,11 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile( - f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit) - ) + pqf = papq.ParquetFile(f) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile( - f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit) - ) + pqf = papq.ParquetFile(f) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -163,7 +153,6 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, - coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit), ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index cd4de5b356..45cdd55e8b 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow2::io::parquet::read::infer_schema; use common_error::DaftResult; -use daft_core::{datatypes::TimeUnit, utils::arrow::cast_array_for_daft_if_needed, Series}; +use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::IOClient; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; @@ -28,7 +28,6 @@ pub(crate) struct ParquetReaderBuilder { row_start_offset: usize, num_rows: usize, user_provided_arrow_schema: Option, - schema_infer_int96_timestamps_time_unit: TimeUnit, } use parquet2::read::decompress; @@ -103,7 +102,6 @@ impl ParquetReaderBuilder { row_start_offset: 0, num_rows, user_provided_arrow_schema: None, - schema_infer_int96_timestamps_time_unit: TimeUnit::Nanoseconds, }) } @@ -150,15 +148,6 @@ impl ParquetReaderBuilder { Ok(self) } - pub fn set_schema_infer_int96_timestamps_time_unit( - mut self, - schema_infer_int96_timestamps_time_unit: &TimeUnit, - ) -> Self { - self.schema_infer_int96_timestamps_time_unit = - schema_infer_int96_timestamps_time_unit.to_owned(); - self - } - pub fn set_user_provided_arrow_schema( mut self, schema: Option, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 3b603c9c99..39024f6769 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -3,38 +3,12 @@ use pyo3::prelude::*; pub mod pylib { use std::sync::Arc; - use daft_core::datatypes::TimeUnit; - use daft_core::python::datatype::PyTimeUnit; use daft_core::python::{schema::PySchema, PySeries}; use daft_io::{get_io_client, python::IOConfig}; use daft_table::python::PyTable; - use pyo3::{pyclass, pyfunction, pymethods, PyResult, Python}; + use pyo3::{pyfunction, PyResult, Python}; - use crate::read::{ParquetSchemaInferenceOptions, ParquetSchemaOptions}; - - const DEFAULT_SCHEMA_OPTIONS: ParquetSchemaOptions = - ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { - int96_timestamps_time_unit: TimeUnit::Nanoseconds, - }); - - /// Python wrapper for ParquetSchemaInferenceOptions - /// - /// Represents the options for inferring Schemas from Parquet files - #[pyclass] - #[derive(Clone)] - pub struct PyParquetSchemaInferenceOptions { - int96_timestamps_time_unit: PyTimeUnit, - } - - #[pymethods] - impl PyParquetSchemaInferenceOptions { - #[new] - fn new(int96_timestamps_time_unit: PyTimeUnit) -> Self { - PyParquetSchemaInferenceOptions { - int96_timestamps_time_unit, - } - } - } + use crate::read::ParquetSchemaOptions; #[allow(clippy::too_many_arguments)] #[pyfunction] @@ -46,18 +20,12 @@ pub mod pylib { num_rows: Option, io_config: Option, schema: Option, - schema_inference_options: Option, ) -> PyResult { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - let schema_options = match (schema, schema_inference_options) { - (None, None) => DEFAULT_SCHEMA_OPTIONS, - (Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), - (_, Some(opts)) => { - ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { - int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit, - }) - } + let schema_options = match schema { + None => ParquetSchemaOptions::InferenceOptions, + Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), }; Ok(crate::read::read_parquet( uri, @@ -101,23 +69,17 @@ pub mod pylib { uri: &str, io_config: Option, schema: Option, - schema_inference_options: Option, ) -> PyResult { py.allow_threads(|| { - let schema_options = match (schema, schema_inference_options) { - (None, None) => DEFAULT_SCHEMA_OPTIONS, - (Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), - (_, Some(opts)) => { - ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions { - int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit, - }) - } + let schema_options = match schema { + None => ParquetSchemaOptions::InferenceOptions, + Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), }; match schema_options { ParquetSchemaOptions::UserProvidedSchema(s) => Ok(PySchema { schema: s }), - ParquetSchemaOptions::InferenceOptions(opts) => { + ParquetSchemaOptions::InferenceOptions => { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client, &opts)?).into()) + Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client)?).into()) } } }) @@ -140,6 +102,5 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?; parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?; - parent.add_class::()?; Ok(()) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 83e7e9381e..9618ea8420 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use common_error::DaftResult; -use daft_core::datatypes::TimeUnit; use daft_core::{ datatypes::{Int32Array, UInt64Array, Utf8Array}, schema::Schema, @@ -15,14 +14,9 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; - -pub struct ParquetSchemaInferenceOptions { - pub int96_timestamps_time_unit: TimeUnit, -} - pub enum ParquetSchemaOptions { UserProvidedSchema(Arc), - InferenceOptions(ParquetSchemaInferenceOptions), + InferenceOptions, } async fn read_parquet_single( @@ -47,9 +41,8 @@ async fn read_parquet_single( ParquetSchemaOptions::UserProvidedSchema(s) => { builder.set_user_provided_arrow_schema(Some(s.to_arrow()?)) } - ParquetSchemaOptions::InferenceOptions(opts) => { - builder.set_schema_infer_int96_timestamps_time_unit(&opts.int96_timestamps_time_unit) - } + // TODO: add builder options to customize schema inference + ParquetSchemaOptions::InferenceOptions => builder, }; let metadata_num_rows = builder.metadata().num_rows; @@ -100,6 +93,7 @@ async fn read_parquet_single( Ok(table) } +<<<<<<< HEAD pub fn read_parquet( uri: &str, columns: Option<&[&str]>, @@ -150,15 +144,14 @@ pub fn read_parquet_schema( io_client: Arc, schema_inference_options: &ParquetSchemaInferenceOptions, ) -> DaftResult { +======= +pub fn read_parquet_schema(uri: &str, io_client: Arc) -> DaftResult { +>>>>>>> 9f13bd76 (Remove unused schema inference logic and options) let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); let builder = runtime_handle .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; - let builder = builder.set_schema_infer_int96_timestamps_time_unit( - &schema_inference_options.int96_timestamps_time_unit, - ); - Schema::try_from(builder.build()?.arrow_schema()) } diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 646847f1a8..35d8c9aa6b 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -12,7 +12,7 @@ import daft from daft.datatype import DataType, TimeUnit from daft.logical.schema import Schema -from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions +from daft.runners.partitioning import TableReadOptions from daft.table import Table, schema_inference, table_io @@ -220,43 +220,6 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to): f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - parquet_options=TableParseParquetOptions(schema_infer_int96_timestamps_time_unit=coerce_to), - use_native_downloader=True, - ) - assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" - - -@pytest.mark.skip( - reason="Need to implement `infer_schema` functionality in arrow2, or possibly implement our own custom patch on top" -) -def test_parquet_infer_schema_int96_timestamps(): - data = { - "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), - "timestamp_us": pa.array([1, 2, 3], pa.timestamp("us")), - "timestamp_ns": pa.array([1, 2, 3], pa.timestamp("ns")), - } - schema = [ - ("timestamp_ms", DataType.timestamp(TimeUnit.ms())), - ("timestamp_us", DataType.timestamp(TimeUnit.us())), - ("timestamp_ns", DataType.timestamp(TimeUnit.ns())), - ] - - with _parquet_write_helper( - pa.Table.from_pydict(data), - papq_write_table_kwargs={ - "use_deprecated_int96_timestamps": True, - "store_schema": False, - }, - ) as f: - expected = Table.from_pydict(data).eval_expression_list( - [daft.col(c).cast(DataType.timestamp(TimeUnit.ms())) for c, _ in schema] - ) - schema = Schema._from_field_name_and_types(schema) - table = table_io.read_parquet( - f, - None, - read_options=TableReadOptions(column_names=schema.column_names()), - parquet_options=TableParseParquetOptions(schema_infer_int96_timestamps_time_unit=TimeUnit.ms()), use_native_downloader=True, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" From 65852b6a696787e927c33bf3fe5ff7457f5252f7 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 7 Aug 2023 17:47:50 -0700 Subject: [PATCH 06/15] Lints --- src/daft-core/src/schema.rs | 4 ++-- src/daft-parquet/src/file.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/daft-core/src/schema.rs b/src/daft-core/src/schema.rs index f867ba25ef..1fdcbfa6fb 100644 --- a/src/daft-core/src/schema.rs +++ b/src/daft-core/src/schema.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashSet}, + collections::HashSet, fmt::{Display, Formatter, Result}, sync::Arc, }; @@ -91,7 +91,7 @@ impl Schema { let arrow_fields = arrow_fields?; Ok(arrow2::datatypes::Schema { fields: arrow_fields, - metadata: BTreeMap::new(), + metadata: Default::default(), }) } diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 45cdd55e8b..a2d07f5c17 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -354,7 +354,6 @@ impl ParquetFileReader { let (send, recv) = tokio::sync::oneshot::channel(); rayon::spawn(move || { - // TODO(jay): Fix arrow2 to handle the pytype=Int96/dtype=Timestamp(ms/us) cases let arr_iter = column_iter_to_arrays( decompressed_iters, ptypes.iter().collect(), From e0b0e117a4e3c975d95e757c8fd99ef31fd40c4f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 11:12:06 -0700 Subject: [PATCH 07/15] Only pass flag instead of overloading schema for parsing --- daft/logical/schema.py | 4 +- daft/runners/partitioning.py | 12 +++++ daft/table/table.py | 14 ++++-- daft/table/table_io.py | 20 ++++++-- src/daft-parquet/src/file.rs | 31 +++++------- src/daft-parquet/src/python.rs | 43 +++++++++-------- src/daft-parquet/src/read.rs | 71 +++++++++++++++++++--------- tests/table/table_io/test_parquet.py | 13 +++-- 8 files changed, 131 insertions(+), 77 deletions(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 77672b6d2a..1cea59f9b7 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -6,7 +6,7 @@ from daft.daft import PyField as _PyField from daft.daft import PySchema as _PySchema from daft.daft import read_parquet_schema as _read_parquet_schema -from daft.datatype import DataType +from daft.datatype import DataType, TimeUnit if sys.version_info < (3, 8): pass @@ -129,10 +129,12 @@ def from_parquet( cls, path: str, io_config: IOConfig | None = None, + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), ) -> Schema: return Schema._from_pyschema( _read_parquet_schema( uri=path, io_config=io_config, + infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit, ) ) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index d230d5fa3e..8e3e794e61 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -9,6 +9,7 @@ import pyarrow as pa +from daft.datatype import TimeUnit from daft.logical.schema import Schema from daft.table import Table @@ -49,6 +50,17 @@ class TableParseCSVOptions: header_index: int | None = 0 +@dataclass(frozen=True) +class TableParseParquetOptions: + """Options for parsing Parquet files + + Args: + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit to use when parsing Int96 fields + """ + + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns() + + @dataclass(frozen=True) class PartialPartitionMetadata: num_rows: None | int diff --git a/daft/table/table.py b/daft/table/table.py index 7ceac569b0..ac34a4f207 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -11,7 +11,7 @@ from daft.daft import read_parquet as _read_parquet from daft.daft import read_parquet_bulk as _read_parquet_bulk from daft.daft import read_parquet_statistics as _read_parquet_statistics -from daft.datatype import DataType +from daft.datatype import DataType, TimeUnit from daft.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema from daft.series import Series @@ -359,7 +359,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, io_config: IOConfig | None = None, - schema: Schema | None = None, + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), ) -> Table: return Table._from_pytable( _read_parquet( @@ -368,7 +368,7 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, io_config=io_config, - schema=schema._schema if schema is not None else None, + infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, ) ) @@ -380,9 +380,15 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, io_config: IOConfig | None = None, + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), ) -> list[Table]: pytables = _read_parquet_bulk( - uris=paths, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config + uris=paths, + columns=columns, + start_offset=start_offset, + num_rows=num_rows, + io_config=io_config, + infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit, ) return [Table._from_pytable(t) for t in pytables] diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 125c0b7f61..b3c53bbec4 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -17,7 +17,11 @@ from daft.expressions import ExpressionsProjection from daft.filesystem import _resolve_paths_and_filesystem from daft.logical.schema import Schema -from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions +from daft.runners.partitioning import ( + TableParseCSVOptions, + TableParseParquetOptions, + TableReadOptions, +) from daft.table import Table if TYPE_CHECKING: @@ -97,6 +101,7 @@ def read_parquet( schema: Schema, fs: fsspec.AbstractFileSystem | None = None, read_options: TableReadOptions = TableReadOptions(), + parquet_options: TableParseParquetOptions = TableParseParquetOptions(), io_config: IOConfig | None = None, use_native_downloader: bool = False, ) -> Table: @@ -118,9 +123,9 @@ def read_parquet( columns=read_options.column_names, num_rows=read_options.num_rows, io_config=io_config, - schema=schema, + infer_schema_int96_timestamps_coerce_timeunit=parquet_options.infer_schema_int96_timestamps_coerce_timeunit, ) - return tbl + return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) f: IO if not isinstance(file, (str, pathlib.Path)): @@ -133,11 +138,15 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile(f) + pqf = papq.ParquetFile( + f, coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit) + ) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile(f) + pqf = papq.ParquetFile( + f, coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit) + ) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -153,6 +162,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, + coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit), ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index a2d07f5c17..33f5919ec0 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -15,6 +15,7 @@ use snafu::ResultExt; use crate::{ metadata::read_parquet_metadata, + read::ParquetSchemaInferenceOptions, read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass}, JoinSnafu, OneShotRecvSnafu, UnableToCreateParquetPageStreamSnafu, UnableToOpenFileSnafu, UnableToParseSchemaFromMetadataSnafu, @@ -27,7 +28,7 @@ pub(crate) struct ParquetReaderBuilder { selected_columns: Option>, row_start_offset: usize, num_rows: usize, - user_provided_arrow_schema: Option, + schema_inference_options: ParquetSchemaInferenceOptions, } use parquet2::read::decompress; @@ -101,7 +102,7 @@ impl ParquetReaderBuilder { selected_columns: None, row_start_offset: 0, num_rows, - user_provided_arrow_schema: None, + schema_inference_options: Default::default(), }) } @@ -148,11 +149,8 @@ impl ParquetReaderBuilder { Ok(self) } - pub fn set_user_provided_arrow_schema( - mut self, - schema: Option, - ) -> Self { - self.user_provided_arrow_schema = schema; + pub fn set_infer_schema_options(mut self, opts: &ParquetSchemaInferenceOptions) -> Self { + self.schema_inference_options = opts.clone(); self } @@ -180,19 +178,12 @@ impl ParquetReaderBuilder { curr_row_index += rg.num_rows(); } - let mut arrow_schema = match self.user_provided_arrow_schema { - Some(s) => s, - // TODO(jay): Add arrow2 functionality to perform schema inference by taking into account the - // self.schema_infer_int96_timestamps_time_unit option, where Parquet int96 fields should be coerced - // into Arrow datetimes with the specified TimeUnit. - None => { - infer_schema(&self.metadata).context(UnableToParseSchemaFromMetadataSnafu::< - String, - > { - path: self.uri.clone(), - })? - } - }; + // TODO(jay): Add our own inference wrapper here + let mut arrow_schema = infer_schema(&self.metadata).context( + UnableToParseSchemaFromMetadataSnafu:: { + path: self.uri.clone(), + }, + )?; if let Some(names_to_keep) = self.selected_columns { arrow_schema diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 39024f6769..3827233d98 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -3,12 +3,12 @@ use pyo3::prelude::*; pub mod pylib { use std::sync::Arc; - use daft_core::python::{schema::PySchema, PySeries}; + use daft_core::python::{datatype::PyTimeUnit, schema::PySchema, PySeries}; use daft_io::{get_io_client, python::IOConfig}; use daft_table::python::PyTable; use pyo3::{pyfunction, PyResult, Python}; - use crate::read::ParquetSchemaOptions; + use crate::read::ParquetSchemaInferenceOptions; #[allow(clippy::too_many_arguments)] #[pyfunction] @@ -19,21 +19,20 @@ pub mod pylib { start_offset: Option, num_rows: Option, io_config: Option, - schema: Option, + infer_schema_int96_timestamps_coerce_timeunit: Option, ) -> PyResult { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - let schema_options = match schema { - None => ParquetSchemaOptions::InferenceOptions, - Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), - }; + let schema_infer_options = ParquetSchemaInferenceOptions::new( + infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + ); Ok(crate::read::read_parquet( uri, columns.as_deref(), start_offset, num_rows, - schema_options, io_client, + &schema_infer_options, )? .into()) }) @@ -47,15 +46,20 @@ pub mod pylib { start_offset: Option, num_rows: Option, io_config: Option, + infer_schema_int96_timestamps_coerce_timeunit: Option, ) -> PyResult> { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; + let schema_infer_options = ParquetSchemaInferenceOptions::new( + infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + ); Ok(crate::read::read_parquet_bulk( uris.as_ref(), columns.as_deref(), start_offset, num_rows, io_client, + &schema_infer_options, )? .into_iter() .map(|v| v.into()) @@ -68,20 +72,19 @@ pub mod pylib { py: Python, uri: &str, io_config: Option, - schema: Option, + infer_schema_int96_timestamps_coerce_timeunit: Option, ) -> PyResult { py.allow_threads(|| { - let schema_options = match schema { - None => ParquetSchemaOptions::InferenceOptions, - Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema), - }; - match schema_options { - ParquetSchemaOptions::UserProvidedSchema(s) => Ok(PySchema { schema: s }), - ParquetSchemaOptions::InferenceOptions => { - let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; - Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client)?).into()) - } - } + let schema_infer_options = ParquetSchemaInferenceOptions::new( + infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + ); + let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; + Ok(Arc::new(crate::read::read_parquet_schema( + uri, + io_client, + &schema_infer_options, + )?) + .into()) }) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 9618ea8420..67c3518c0d 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use common_error::DaftResult; use daft_core::{ - datatypes::{Int32Array, UInt64Array, Utf8Array}, + datatypes::{Int32Array, TimeUnit, UInt64Array, Utf8Array}, schema::Schema, DataType, IntoSeries, Series, }; @@ -14,9 +14,29 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; -pub enum ParquetSchemaOptions { - UserProvidedSchema(Arc), - InferenceOptions, +#[derive(Clone)] +pub struct ParquetSchemaInferenceOptions { + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit, +} + +impl ParquetSchemaInferenceOptions { + pub fn new(infer_schema_int96_timestamps_coerce_timeunit: Option) -> Self { + let default: ParquetSchemaInferenceOptions = Default::default(); + let infer_schema_int96_timestamps_coerce_timeunit = + infer_schema_int96_timestamps_coerce_timeunit + .unwrap_or(default.infer_schema_int96_timestamps_coerce_timeunit); + ParquetSchemaInferenceOptions { + infer_schema_int96_timestamps_coerce_timeunit, + } + } +} + +impl Default for ParquetSchemaInferenceOptions { + fn default() -> Self { + ParquetSchemaInferenceOptions { + infer_schema_int96_timestamps_coerce_timeunit: TimeUnit::Nanoseconds, + } + } } async fn read_parquet_single( @@ -24,10 +44,11 @@ async fn read_parquet_single( columns: Option<&[&str]>, start_offset: Option, num_rows: Option, - schema_options: ParquetSchemaOptions, io_client: Arc, + schema_infer_options: &ParquetSchemaInferenceOptions, ) -> DaftResult
{ let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?; + let builder = builder.set_infer_schema_options(schema_infer_options); let builder = if let Some(columns) = columns { builder.prune_columns(columns)? @@ -36,15 +57,6 @@ async fn read_parquet_single( }; let builder = builder.limit(start_offset, num_rows)?; - // Schema inference options - let builder = match schema_options { - ParquetSchemaOptions::UserProvidedSchema(s) => { - builder.set_user_provided_arrow_schema(Some(s.to_arrow()?)) - } - // TODO: add builder options to customize schema inference - ParquetSchemaOptions::InferenceOptions => builder, - }; - let metadata_num_rows = builder.metadata().num_rows; let metadata_num_columns = builder.parquet_schema().fields().len(); @@ -93,18 +105,26 @@ async fn read_parquet_single( Ok(table) } -<<<<<<< HEAD pub fn read_parquet( uri: &str, columns: Option<&[&str]>, start_offset: Option, num_rows: Option, io_client: Arc, + schema_infer_options: &ParquetSchemaInferenceOptions, ) -> DaftResult
{ let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { - read_parquet_single(uri, columns, start_offset, num_rows, io_client).await + read_parquet_single( + uri, + columns, + start_offset, + num_rows, + io_client, + schema_infer_options, + ) + .await }) } @@ -114,6 +134,7 @@ pub fn read_parquet_bulk( start_offset: Option, num_rows: Option, io_client: Arc, + schema_infer_options: &ParquetSchemaInferenceOptions, ) -> DaftResult> { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); @@ -125,12 +146,20 @@ pub fn read_parquet_bulk( let uri = uri.to_string(); let owned_columns = owned_columns.clone(); let io_client = io_client.clone(); + let schema_infer_options = schema_infer_options.clone(); tokio::task::spawn(async move { let columns = owned_columns .as_ref() .map(|s| s.iter().map(AsRef::as_ref).collect::>()); - read_parquet_single(&uri, columns.as_deref(), start_offset, num_rows, io_client) - .await + read_parquet_single( + &uri, + columns.as_deref(), + start_offset, + num_rows, + io_client, + &schema_infer_options, + ) + .await }) })) .await @@ -144,13 +173,11 @@ pub fn read_parquet_schema( io_client: Arc, schema_inference_options: &ParquetSchemaInferenceOptions, ) -> DaftResult { -======= -pub fn read_parquet_schema(uri: &str, io_client: Arc) -> DaftResult { ->>>>>>> 9f13bd76 (Remove unused schema inference logic and options) let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); let builder = runtime_handle .block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?; + let builder = builder.set_infer_schema_options(schema_inference_options); Schema::try_from(builder.build()?.arrow_schema()) } @@ -243,7 +270,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_parquet(file, None, None, None, io_client)?; + let table = read_parquet(file, None, None, None, io_client, Default::default())?; assert_eq!(table.len(), 100); Ok(()) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 35d8c9aa6b..12d8c88df1 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -12,7 +12,7 @@ import daft from daft.datatype import DataType, TimeUnit from daft.logical.schema import Schema -from daft.runners.partitioning import TableReadOptions +from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions from daft.table import Table, schema_inference, table_io @@ -161,8 +161,9 @@ def test_parquet_read_data_select_columns(use_native_downloader): ### +@pytest.mark.parametrize("use_native_downloader", [True, False]) @pytest.mark.parametrize("use_deprecated_int96_timestamps", [True, False]) -def test_parquet_read_int96_timestamps(use_deprecated_int96_timestamps): +def test_parquet_read_int96_timestamps(use_deprecated_int96_timestamps, use_native_downloader): data = { "timestamp_ms": pa.array([1, 2, 3], pa.timestamp("ms")), "timestamp_us": pa.array([1, 2, 3], pa.timestamp("us")), @@ -190,13 +191,14 @@ def test_parquet_read_int96_timestamps(use_deprecated_int96_timestamps): f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - use_native_downloader=True, + use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" +@pytest.mark.parametrize("use_native_downloader", [True, False]) @pytest.mark.parametrize("coerce_to", [TimeUnit.ms(), TimeUnit.us()]) -def test_parquet_read_int96_timestamps_overflow(coerce_to): +def test_parquet_read_int96_timestamps_overflow(coerce_to, use_native_downloader): # NOTE: datetime.datetime(3000, 1, 1) and datetime.datetime(1000, 1, 1) cannot be represented by our timestamp64(nanosecond) # type. However they can be written to Parquet's INT96 type. Here we test that a round-trip is possible if provided with # the appropriate flags. @@ -220,6 +222,7 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to): f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - use_native_downloader=True, + parquet_options=TableParseParquetOptions(infer_schema_int96_timestamps_coerce_timeunit=coerce_to), + use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" From 0d45352cba274ac27e76de64dae1e926ec5138de Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 11:36:39 -0700 Subject: [PATCH 08/15] Working solution but need to recursively find int96 fields --- src/daft-parquet/src/file.rs | 52 ++++++++++++++++++++++++++++++++---- src/daft-parquet/src/read.rs | 2 +- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 33f5919ec0..0b54b18ed1 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -1,14 +1,16 @@ use std::{collections::HashSet, sync::Arc}; -use arrow2::io::parquet::read::infer_schema; +use arrow2::io::parquet::read::infer_schema as arrow2_infer_schema; use common_error::DaftResult; use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::IOClient; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; use parquet2::{ + metadata::FileMetaData, page::{CompressedPage, Page}, read::get_page_stream_from_column_start, + schema::types::{ParquetType, PhysicalType, PrimitiveType}, FallibleStreamingIterator, }; use snafu::ResultExt; @@ -32,6 +34,46 @@ pub(crate) struct ParquetReaderBuilder { } use parquet2::read::decompress; +fn infer_schema( + uri: &str, + metadata: &FileMetaData, + infer_schema_options: &ParquetSchemaInferenceOptions, +) -> super::Result { + let mut arrow2_schema = + arrow2_infer_schema(metadata).context(UnableToParseSchemaFromMetadataSnafu:: { + path: uri.to_string(), + })?; + let parquet_schema = metadata.schema(); + let int96_fields = parquet_schema + .fields() + .iter() + .filter(|f| matches!(f, ParquetType::PrimitiveType(PrimitiveType {physical_type: pt, ..}) if pt == &PhysicalType::Int96)) + .map(|f| f.name()) + .collect::>(); + arrow2_schema.fields = arrow2_schema + .fields + .iter() + .map(|f| { + if int96_fields.contains(f.name.as_str()) { + arrow2::datatypes::Field::new( + f.name.clone(), + arrow2::datatypes::DataType::Timestamp( + infer_schema_options + .infer_schema_int96_timestamps_coerce_timeunit + .to_arrow() + .unwrap(), + None, + ), + f.is_nullable, + ) + } else { + f.clone() + } + }) + .collect(); + Ok(arrow2_schema) +} + fn streaming_decompression>>( input: S, ) -> impl futures::Stream> { @@ -179,10 +221,10 @@ impl ParquetReaderBuilder { } // TODO(jay): Add our own inference wrapper here - let mut arrow_schema = infer_schema(&self.metadata).context( - UnableToParseSchemaFromMetadataSnafu:: { - path: self.uri.clone(), - }, + let mut arrow_schema = infer_schema( + self.uri.as_str(), + &self.metadata, + &self.schema_inference_options, )?; if let Some(names_to_keep) = self.selected_columns { diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 67c3518c0d..0941e3ab4c 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -16,7 +16,7 @@ use crate::{file::ParquetReaderBuilder, JoinSnafu}; #[derive(Clone)] pub struct ParquetSchemaInferenceOptions { - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit, + pub infer_schema_int96_timestamps_coerce_timeunit: TimeUnit, } impl ParquetSchemaInferenceOptions { From 28777ab36ccb2b2d3f556da11716dffb6cc917ee Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 11:41:00 -0700 Subject: [PATCH 09/15] Add test case for schema inference --- daft/logical/schema.py | 2 +- src/daft-parquet/src/file.rs | 2 ++ tests/table/table_io/test_parquet.py | 22 ++++++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 1cea59f9b7..9908d0f4d0 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -135,6 +135,6 @@ def from_parquet( _read_parquet_schema( uri=path, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit, + infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, ) ) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 0b54b18ed1..b54df7dbcf 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -44,6 +44,8 @@ fn infer_schema( path: uri.to_string(), })?; let parquet_schema = metadata.schema(); + + // TODO(jay): Need to handle the recursive nested cases as well -- this currently only handles top-level int96 fields let int96_fields = parquet_schema .fields() .iter() diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 12d8c88df1..45f9ee36aa 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -226,3 +226,25 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to, use_native_downloader use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" + + +@pytest.mark.parametrize("use_native_downloader", [True, False]) +@pytest.mark.parametrize("coerce_to", [TimeUnit.ms(), TimeUnit.us()]) +def test_parquet_read_int96_timestamps_schema_inference(coerce_to, use_native_downloader): + data = { + "timestamp": pa.array( + [datetime.datetime(1000, 1, 1), datetime.datetime(2000, 1, 1), datetime.datetime(3000, 1, 1)], + pa.timestamp(str(coerce_to)), + ), + } + schema = [ + ("timestamp", DataType.timestamp(coerce_to)), + ] + expected = Schema._from_field_name_and_types(schema) + + with _parquet_write_helper( + pa.Table.from_pydict(data), + papq_write_table_kwargs={"use_deprecated_int96_timestamps": True, "store_schema": False}, + ) as f: + schema = Schema.from_parquet(f, infer_schema_int96_timestamps_coerce_timeunit=coerce_to) + assert schema == expected, f"Expected:\n{expected}\n\nReceived:\n{schema}" From fdaca49649f5db4f6f2311b35cd1248eedb94995 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 12:02:40 -0700 Subject: [PATCH 10/15] Fix for pyarrow 6 which does not have store_schema kwarg --- tests/table/table_io/test_parquet.py | 31 +++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 45f9ee36aa..71a546733c 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -15,6 +15,8 @@ from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions from daft.table import Table, schema_inference, table_io +PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) + def test_read_input(tmpdir): tmpdir = pathlib.Path(tmpdir) @@ -177,13 +179,16 @@ def test_parquet_read_int96_timestamps(use_deprecated_int96_timestamps, use_nati data["timestamp_ns"] = pa.array([1, 2, 3], pa.timestamp("ns")) schema.append(("timestamp_ns", DataType.timestamp(TimeUnit.ns()))) + papq_write_table_kwargs = { + "use_deprecated_int96_timestamps": use_deprecated_int96_timestamps, + "coerce_timestamps": "us" if not use_deprecated_int96_timestamps else None, + } + if PYARROW_GE_7_0_0: + papq_write_table_kwargs["store_schema"] = False + with _parquet_write_helper( pa.Table.from_pydict(data), - papq_write_table_kwargs={ - "use_deprecated_int96_timestamps": use_deprecated_int96_timestamps, - "coerce_timestamps": "us" if not use_deprecated_int96_timestamps else None, - "store_schema": False, - }, + papq_write_table_kwargs=papq_write_table_kwargs, ) as f: schema = Schema._from_field_name_and_types(schema) expected = Table.from_pydict(data) @@ -212,9 +217,15 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to, use_native_downloader ("timestamp", DataType.timestamp(coerce_to)), ] + papq_write_table_kwargs = { + "use_deprecated_int96_timestamps": True, + } + if PYARROW_GE_7_0_0: + papq_write_table_kwargs["store_schema"] = False + with _parquet_write_helper( pa.Table.from_pydict(data), - papq_write_table_kwargs={"use_deprecated_int96_timestamps": True, "store_schema": False}, + papq_write_table_kwargs=papq_write_table_kwargs, ) as f: schema = Schema._from_field_name_and_types(schema) expected = Table.from_pydict(data) @@ -242,9 +253,15 @@ def test_parquet_read_int96_timestamps_schema_inference(coerce_to, use_native_do ] expected = Schema._from_field_name_and_types(schema) + papq_write_table_kwargs = { + "use_deprecated_int96_timestamps": True, + } + if PYARROW_GE_7_0_0: + papq_write_table_kwargs["store_schema"] = False + with _parquet_write_helper( pa.Table.from_pydict(data), - papq_write_table_kwargs={"use_deprecated_int96_timestamps": True, "store_schema": False}, + papq_write_table_kwargs=papq_write_table_kwargs, ) as f: schema = Schema.from_parquet(f, infer_schema_int96_timestamps_coerce_timeunit=coerce_to) assert schema == expected, f"Expected:\n{expected}\n\nReceived:\n{schema}" From c0d90d8cc8bbd580a1b82cb44702ce49e46dda8d Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 13:12:55 -0700 Subject: [PATCH 11/15] Add fix for read_parquet_bulk --- daft/table/table.py | 2 +- tests/table/table_io/test_parquet.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/daft/table/table.py b/daft/table/table.py index ac34a4f207..022f3f82d3 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -388,7 +388,7 @@ def read_parquet_bulk( start_offset=start_offset, num_rows=num_rows, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit, + infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, ) return [Table._from_pytable(t) for t in pytables] diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index 71a546733c..adc8109721 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -239,9 +239,8 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to, use_native_downloader assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" -@pytest.mark.parametrize("use_native_downloader", [True, False]) @pytest.mark.parametrize("coerce_to", [TimeUnit.ms(), TimeUnit.us()]) -def test_parquet_read_int96_timestamps_schema_inference(coerce_to, use_native_downloader): +def test_parquet_read_int96_timestamps_schema_inference(coerce_to): data = { "timestamp": pa.array( [datetime.datetime(1000, 1, 1), datetime.datetime(2000, 1, 1), datetime.datetime(3000, 1, 1)], From c701557d0f80569e8ff866fb00d56ceff9ffac73 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 17:20:43 -0700 Subject: [PATCH 12/15] Use new infer_schema_with_options API from arrow2 fork --- Cargo.lock | 2 +- src/daft-core/src/array/ops/cast.rs | 9 ++- src/daft-core/src/datatypes/dtype.rs | 6 +- src/daft-core/src/datatypes/time_unit.rs | 12 ++-- .../src/series/array_impl/data_array.rs | 8 ++- src/daft-parquet/src/file.rs | 62 ++++--------------- 6 files changed, 30 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51cffb817b..981840029c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.17.1" -source = "git+https://github.com/Eventual-Inc/arrow2?branch=clark/expand-casting-support#37600dfb67447f2b9f1d4336c1a49af15d59ce2b" +source = "git+https://github.com/Eventual-Inc/arrow2?branch=clark/expand-casting-support#7ffc54215a0dd7ea69e34c6b1f76b2790fbc29bc" dependencies = [ "ahash", "arrow-format", diff --git a/src/daft-core/src/array/ops/cast.rs b/src/daft-core/src/array/ops/cast.rs index a5d954aa8d..91f70ee69c 100644 --- a/src/daft-core/src/array/ops/cast.rs +++ b/src/daft-core/src/array/ops/cast.rs @@ -320,9 +320,8 @@ pub(super) fn decimal128_to_str(val: i128, _precision: u8, scale: i8) -> String } pub(super) fn timestamp_to_str_naive(val: i64, unit: &TimeUnit) -> String { - let chrono_ts = { - arrow2::temporal_conversions::timestamp_to_naive_datetime(val, unit.to_arrow().unwrap()) - }; + let chrono_ts = + { arrow2::temporal_conversions::timestamp_to_naive_datetime(val, unit.to_arrow()) }; let format_str = match unit { TimeUnit::Seconds => "%Y-%m-%dT%H:%M:%S", TimeUnit::Milliseconds => "%Y-%m-%dT%H:%M:%S%.3f", @@ -343,7 +342,7 @@ pub(super) fn timestamp_to_str_offset( TimeUnit::Microseconds => chrono::SecondsFormat::Micros, TimeUnit::Nanoseconds => chrono::SecondsFormat::Nanos, }; - arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow().unwrap(), offset) + arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow(), offset) .to_rfc3339_opts(seconds_format, false) } @@ -354,7 +353,7 @@ pub(super) fn timestamp_to_str_tz(val: i64, unit: &TimeUnit, tz: &chrono_tz::Tz) TimeUnit::Microseconds => chrono::SecondsFormat::Micros, TimeUnit::Nanoseconds => chrono::SecondsFormat::Nanos, }; - arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow().unwrap(), tz) + arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow(), tz) .to_rfc3339_opts(seconds_format, false) } diff --git a/src/daft-core/src/datatypes/dtype.rs b/src/daft-core/src/datatypes/dtype.rs index 767ad7d672..de2a980d92 100644 --- a/src/daft-core/src/datatypes/dtype.rs +++ b/src/daft-core/src/datatypes/dtype.rs @@ -134,11 +134,11 @@ impl DataType { DataType::Float64 => Ok(ArrowType::Float64), DataType::Decimal128(precision, scale) => Ok(ArrowType::Decimal(*precision, *scale)), DataType::Timestamp(unit, timezone) => { - Ok(ArrowType::Timestamp(unit.to_arrow()?, timezone.clone())) + Ok(ArrowType::Timestamp(unit.to_arrow(), timezone.clone())) } DataType::Date => Ok(ArrowType::Date32), - DataType::Time(unit) => Ok(ArrowType::Time64(unit.to_arrow()?)), - DataType::Duration(unit) => Ok(ArrowType::Duration(unit.to_arrow()?)), + DataType::Time(unit) => Ok(ArrowType::Time64(unit.to_arrow())), + DataType::Duration(unit) => Ok(ArrowType::Duration(unit.to_arrow())), DataType::Binary => Ok(ArrowType::LargeBinary), DataType::Utf8 => Ok(ArrowType::LargeUtf8), DataType::FixedSizeList(field, size) => { diff --git a/src/daft-core/src/datatypes/time_unit.rs b/src/daft-core/src/datatypes/time_unit.rs index 24b6c087ee..e34ad91275 100644 --- a/src/daft-core/src/datatypes/time_unit.rs +++ b/src/daft-core/src/datatypes/time_unit.rs @@ -1,7 +1,5 @@ use arrow2::datatypes::TimeUnit as ArrowTimeUnit; -use common_error::DaftResult; - use serde::{Deserialize, Serialize}; #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)] @@ -14,12 +12,12 @@ pub enum TimeUnit { impl TimeUnit { #![allow(clippy::wrong_self_convention)] - pub fn to_arrow(&self) -> DaftResult { + pub fn to_arrow(&self) -> ArrowTimeUnit { match self { - TimeUnit::Nanoseconds => Ok(ArrowTimeUnit::Nanosecond), - TimeUnit::Microseconds => Ok(ArrowTimeUnit::Microsecond), - TimeUnit::Milliseconds => Ok(ArrowTimeUnit::Millisecond), - TimeUnit::Seconds => Ok(ArrowTimeUnit::Second), + TimeUnit::Nanoseconds => ArrowTimeUnit::Nanosecond, + TimeUnit::Microseconds => ArrowTimeUnit::Microsecond, + TimeUnit::Milliseconds => ArrowTimeUnit::Millisecond, + TimeUnit::Seconds => ArrowTimeUnit::Second, } } } diff --git a/src/daft-core/src/series/array_impl/data_array.rs b/src/daft-core/src/series/array_impl/data_array.rs index 8243a6bc18..a8706d3d5e 100644 --- a/src/daft-core/src/series/array_impl/data_array.rs +++ b/src/daft-core/src/series/array_impl/data_array.rs @@ -137,9 +137,11 @@ fn logical_to_arrow<'a>( .as_any() .downcast_ref::>() .unwrap(); - let casted: Box = Box::new(downcasted.clone().to( - arrow2::datatypes::DataType::Duration(unit.to_arrow().unwrap()), - )); + let casted: Box = Box::new( + downcasted + .clone() + .to(arrow2::datatypes::DataType::Duration(unit.to_arrow())), + ); Cow::Owned(casted) } diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index b54df7dbcf..8af810ee45 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -1,16 +1,14 @@ use std::{collections::HashSet, sync::Arc}; -use arrow2::io::parquet::read::infer_schema as arrow2_infer_schema; +use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series}; use daft_io::IOClient; use daft_table::Table; use futures::{future::try_join_all, StreamExt}; use parquet2::{ - metadata::FileMetaData, page::{CompressedPage, Page}, read::get_page_stream_from_column_start, - schema::types::{ParquetType, PhysicalType, PrimitiveType}, FallibleStreamingIterator, }; use snafu::ResultExt; @@ -34,48 +32,6 @@ pub(crate) struct ParquetReaderBuilder { } use parquet2::read::decompress; -fn infer_schema( - uri: &str, - metadata: &FileMetaData, - infer_schema_options: &ParquetSchemaInferenceOptions, -) -> super::Result { - let mut arrow2_schema = - arrow2_infer_schema(metadata).context(UnableToParseSchemaFromMetadataSnafu:: { - path: uri.to_string(), - })?; - let parquet_schema = metadata.schema(); - - // TODO(jay): Need to handle the recursive nested cases as well -- this currently only handles top-level int96 fields - let int96_fields = parquet_schema - .fields() - .iter() - .filter(|f| matches!(f, ParquetType::PrimitiveType(PrimitiveType {physical_type: pt, ..}) if pt == &PhysicalType::Int96)) - .map(|f| f.name()) - .collect::>(); - arrow2_schema.fields = arrow2_schema - .fields - .iter() - .map(|f| { - if int96_fields.contains(f.name.as_str()) { - arrow2::datatypes::Field::new( - f.name.clone(), - arrow2::datatypes::DataType::Timestamp( - infer_schema_options - .infer_schema_int96_timestamps_coerce_timeunit - .to_arrow() - .unwrap(), - None, - ), - f.is_nullable, - ) - } else { - f.clone() - } - }) - .collect(); - Ok(arrow2_schema) -} - fn streaming_decompression>>( input: S, ) -> impl futures::Stream> { @@ -222,12 +178,18 @@ impl ParquetReaderBuilder { curr_row_index += rg.num_rows(); } - // TODO(jay): Add our own inference wrapper here - let mut arrow_schema = infer_schema( - self.uri.as_str(), + let mut arrow_schema = infer_schema_with_options( &self.metadata, - &self.schema_inference_options, - )?; + &Some(arrow2::io::parquet::read::schema::SchemaInferenceOptions { + int96_coerce_to_timeunit: self + .schema_inference_options + .infer_schema_int96_timestamps_coerce_timeunit + .to_arrow(), + }), + ) + .context(UnableToParseSchemaFromMetadataSnafu:: { + path: self.uri.clone(), + })?; if let Some(names_to_keep) = self.selected_columns { arrow_schema From e641eae91891136e3fba9ec38a3550de8fd36c8e Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 17:32:36 -0700 Subject: [PATCH 13/15] Rename flag to coerce_int96_timestamp_unit --- daft/logical/schema.py | 4 ++-- daft/runners/partitioning.py | 4 ++-- daft/table/table.py | 8 ++++---- daft/table/table_io.py | 12 ++++-------- src/daft-parquet/src/file.rs | 2 +- src/daft-parquet/src/python.rs | 12 ++++++------ src/daft-parquet/src/read.rs | 13 ++++++------- tests/table/table_io/test_parquet.py | 4 ++-- 8 files changed, 27 insertions(+), 32 deletions(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 9908d0f4d0..b9051e627d 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -129,12 +129,12 @@ def from_parquet( cls, path: str, io_config: IOConfig | None = None, - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), + coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), ) -> Schema: return Schema._from_pyschema( _read_parquet_schema( uri=path, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, ) ) diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 8e3e794e61..44212a0d32 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -55,10 +55,10 @@ class TableParseParquetOptions: """Options for parsing Parquet files Args: - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit to use when parsing Int96 fields + coerce_int96_timestamp_unit: TimeUnit to use when parsing Int96 fields """ - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns() + coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns() @dataclass(frozen=True) diff --git a/daft/table/table.py b/daft/table/table.py index 022f3f82d3..78eec94404 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -359,7 +359,7 @@ def read_parquet( start_offset: int | None = None, num_rows: int | None = None, io_config: IOConfig | None = None, - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), + coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), ) -> Table: return Table._from_pytable( _read_parquet( @@ -368,7 +368,7 @@ def read_parquet( start_offset=start_offset, num_rows=num_rows, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, ) ) @@ -380,7 +380,7 @@ def read_parquet_bulk( start_offset: int | None = None, num_rows: int | None = None, io_config: IOConfig | None = None, - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit = TimeUnit.ns(), + coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), ) -> list[Table]: pytables = _read_parquet_bulk( uris=paths, @@ -388,7 +388,7 @@ def read_parquet_bulk( start_offset=start_offset, num_rows=num_rows, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=infer_schema_int96_timestamps_coerce_timeunit._timeunit, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, ) return [Table._from_pytable(t) for t in pytables] diff --git a/daft/table/table_io.py b/daft/table/table_io.py index b3c53bbec4..b68b8ba23f 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -123,7 +123,7 @@ def read_parquet( columns=read_options.column_names, num_rows=read_options.num_rows, io_config=io_config, - infer_schema_int96_timestamps_coerce_timeunit=parquet_options.infer_schema_int96_timestamps_coerce_timeunit, + coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit, ) return _cast_table_to_schema(tbl, read_options=read_options, schema=schema) @@ -138,15 +138,11 @@ def read_parquet( # If no rows required, we manually construct an empty table with the right schema if read_options.num_rows == 0: - pqf = papq.ParquetFile( - f, coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit) - ) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit)) arrow_schema = pqf.metadata.schema.to_arrow_schema() table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema) elif read_options.num_rows is not None: - pqf = papq.ParquetFile( - f, coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit) - ) + pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit)) # Only read the required row groups. rows_needed = read_options.num_rows for i in range(pqf.metadata.num_row_groups): @@ -162,7 +158,7 @@ def read_parquet( table = papq.read_table( f, columns=read_options.column_names, - coerce_int96_timestamp_unit=str(parquet_options.infer_schema_int96_timestamps_coerce_timeunit), + coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit), ) return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 8af810ee45..ad22db45ec 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -183,7 +183,7 @@ impl ParquetReaderBuilder { &Some(arrow2::io::parquet::read::schema::SchemaInferenceOptions { int96_coerce_to_timeunit: self .schema_inference_options - .infer_schema_int96_timestamps_coerce_timeunit + .coerce_int96_timestamp_unit .to_arrow(), }), ) diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 3827233d98..d77959a4cb 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -19,12 +19,12 @@ pub mod pylib { start_offset: Option, num_rows: Option, io_config: Option, - infer_schema_int96_timestamps_coerce_timeunit: Option, + coerce_int96_timestamp_unit: Option, ) -> PyResult { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; let schema_infer_options = ParquetSchemaInferenceOptions::new( - infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + coerce_int96_timestamp_unit.map(|tu| tu.timeunit), ); Ok(crate::read::read_parquet( uri, @@ -46,12 +46,12 @@ pub mod pylib { start_offset: Option, num_rows: Option, io_config: Option, - infer_schema_int96_timestamps_coerce_timeunit: Option, + coerce_int96_timestamp_unit: Option, ) -> PyResult> { py.allow_threads(|| { let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; let schema_infer_options = ParquetSchemaInferenceOptions::new( - infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + coerce_int96_timestamp_unit.map(|tu| tu.timeunit), ); Ok(crate::read::read_parquet_bulk( uris.as_ref(), @@ -72,11 +72,11 @@ pub mod pylib { py: Python, uri: &str, io_config: Option, - infer_schema_int96_timestamps_coerce_timeunit: Option, + coerce_int96_timestamp_unit: Option, ) -> PyResult { py.allow_threads(|| { let schema_infer_options = ParquetSchemaInferenceOptions::new( - infer_schema_int96_timestamps_coerce_timeunit.map(|tu| tu.timeunit), + coerce_int96_timestamp_unit.map(|tu| tu.timeunit), ); let io_client = get_io_client(io_config.unwrap_or_default().config.into())?; Ok(Arc::new(crate::read::read_parquet_schema( diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 0941e3ab4c..02fa26d1fa 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -16,17 +16,16 @@ use crate::{file::ParquetReaderBuilder, JoinSnafu}; #[derive(Clone)] pub struct ParquetSchemaInferenceOptions { - pub infer_schema_int96_timestamps_coerce_timeunit: TimeUnit, + pub coerce_int96_timestamp_unit: TimeUnit, } impl ParquetSchemaInferenceOptions { - pub fn new(infer_schema_int96_timestamps_coerce_timeunit: Option) -> Self { + pub fn new(coerce_int96_timestamp_unit: Option) -> Self { let default: ParquetSchemaInferenceOptions = Default::default(); - let infer_schema_int96_timestamps_coerce_timeunit = - infer_schema_int96_timestamps_coerce_timeunit - .unwrap_or(default.infer_schema_int96_timestamps_coerce_timeunit); + let coerce_int96_timestamp_unit = + coerce_int96_timestamp_unit.unwrap_or(default.coerce_int96_timestamp_unit); ParquetSchemaInferenceOptions { - infer_schema_int96_timestamps_coerce_timeunit, + coerce_int96_timestamp_unit, } } } @@ -34,7 +33,7 @@ impl ParquetSchemaInferenceOptions { impl Default for ParquetSchemaInferenceOptions { fn default() -> Self { ParquetSchemaInferenceOptions { - infer_schema_int96_timestamps_coerce_timeunit: TimeUnit::Nanoseconds, + coerce_int96_timestamp_unit: TimeUnit::Nanoseconds, } } } diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index adc8109721..e43da3e247 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -233,7 +233,7 @@ def test_parquet_read_int96_timestamps_overflow(coerce_to, use_native_downloader f, schema, read_options=TableReadOptions(column_names=schema.column_names()), - parquet_options=TableParseParquetOptions(infer_schema_int96_timestamps_coerce_timeunit=coerce_to), + parquet_options=TableParseParquetOptions(coerce_int96_timestamp_unit=coerce_to), use_native_downloader=use_native_downloader, ) assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}" @@ -262,5 +262,5 @@ def test_parquet_read_int96_timestamps_schema_inference(coerce_to): pa.Table.from_pydict(data), papq_write_table_kwargs=papq_write_table_kwargs, ) as f: - schema = Schema.from_parquet(f, infer_schema_int96_timestamps_coerce_timeunit=coerce_to) + schema = Schema.from_parquet(f, coerce_int96_timestamp_unit=coerce_to) assert schema == expected, f"Expected:\n{expected}\n\nReceived:\n{schema}" From 5437cd4a82f48b55a3820a9b77d044115b617ef1 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 17:41:01 -0700 Subject: [PATCH 14/15] Fix rust unit test --- src/daft-parquet/src/read.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 02fa26d1fa..c10a6cba2c 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -269,7 +269,7 @@ mod tests { let io_client = Arc::new(IOClient::new(io_config.into())?); - let table = read_parquet(file, None, None, None, io_client, Default::default())?; + let table = read_parquet(file, None, None, None, io_client, &Default::default())?; assert_eq!(table.len(), 100); Ok(()) From ec8cf24422887cac7ebb2b50cfe1d7637b2ffba5 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 20:49:01 -0700 Subject: [PATCH 15/15] Update Cargo toml to non-orphaned rev --- Cargo.lock | 2 +- Cargo.toml | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 981840029c..7c8fad56ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.17.1" -source = "git+https://github.com/Eventual-Inc/arrow2?branch=clark/expand-casting-support#7ffc54215a0dd7ea69e34c6b1f76b2790fbc29bc" +source = "git+https://github.com/Eventual-Inc/arrow2?rev=84b2c0c0#84b2c0c0ec8f7a4e160bf3eaf1bcea474f3c58c4" dependencies = [ "ahash", "arrow-format", diff --git a/Cargo.toml b/Cargo.toml index 5a5dd08b67..940b8e944a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,9 +73,10 @@ snafu = "0.7.4" tokio = {version = "1.29.1", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]} [workspace.dependencies.arrow2] -branch = "clark/expand-casting-support" git = "https://github.com/Eventual-Inc/arrow2" package = "arrow2" +# branch = "jay/int96" +rev = "84b2c0c0" version = "0.17.1" [workspace.dependencies.bincode]