Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Read parquet tables with int96 coercion option #1231

Merged
merged 15 commits into from
Aug 9, 2023
Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ snafu = "0.7.4"
tokio = {version = "1.29.1", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}

[workspace.dependencies.arrow2]
branch = "clark/expand-casting-support"
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
# branch = "jay/int96"
rev = "84b2c0c0"
version = "0.17.1"

[workspace.dependencies.bincode]
Expand Down
23 changes: 20 additions & 3 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING, Iterator

from daft.daft import PyField as _PyField
from daft.daft import PySchema as _PySchema
from daft.daft import read_parquet_schema as _read_parquet_schema
from daft.datatype import DataType
from daft.datatype import DataType, TimeUnit

if sys.version_info < (3, 8):
pass
else:
pass

if TYPE_CHECKING:
from daft.io import IOConfig
Expand Down Expand Up @@ -119,5 +125,16 @@ def __setstate__(self, state: bytes) -> None:
self._schema.__setstate__(state)

@classmethod
def from_parquet(cls, path: str, io_config: IOConfig | None = None) -> Schema:
return Schema._from_pyschema(_read_parquet_schema(uri=path, io_config=io_config))
def from_parquet(
cls,
path: str,
io_config: IOConfig | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> Schema:
return Schema._from_pyschema(
_read_parquet_schema(
uri=path,
io_config=io_config,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
)
18 changes: 18 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import sys
import weakref
from abc import abstractmethod
from dataclasses import dataclass
Expand All @@ -8,9 +9,15 @@

import pyarrow as pa

from daft.datatype import TimeUnit
from daft.logical.schema import Schema
from daft.table import Table

if sys.version_info < (3, 8):
pass
else:
pass

if TYPE_CHECKING:
import pandas as pd

Expand Down Expand Up @@ -43,6 +50,17 @@ class TableParseCSVOptions:
header_index: int | None = 0


@dataclass(frozen=True)
class TableParseParquetOptions:
"""Options for parsing Parquet files

Args:
coerce_int96_timestamp_unit: TimeUnit to use when parsing Int96 fields
"""

coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns()


@dataclass(frozen=True)
class PartialPartitionMetadata:
num_rows: None | int
Expand Down
26 changes: 23 additions & 3 deletions daft/table/table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING, Any

import pyarrow as pa
Expand All @@ -10,11 +11,16 @@
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType
from daft.datatype import DataType, TimeUnit
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
from daft.series import Series

if sys.version_info < (3, 8):
pass
else:
pass

_NUMPY_AVAILABLE = True
try:
import numpy as np
Expand Down Expand Up @@ -353,9 +359,17 @@ def read_parquet(
start_offset: int | None = None,
num_rows: int | None = None,
io_config: IOConfig | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> Table:
return Table._from_pytable(
_read_parquet(uri=path, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config)
_read_parquet(
uri=path,
columns=columns,
start_offset=start_offset,
num_rows=num_rows,
io_config=io_config,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
)

@classmethod
Expand All @@ -366,9 +380,15 @@ def read_parquet_bulk(
start_offset: int | None = None,
num_rows: int | None = None,
io_config: IOConfig | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> list[Table]:
pytables = _read_parquet_bulk(
uris=paths, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config
uris=paths,
columns=columns,
start_offset=start_offset,
num_rows=num_rows,
io_config=io_config,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
return [Table._from_pytable(t) for t in pytables]

Expand Down
13 changes: 10 additions & 3 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from daft.expressions import ExpressionsProjection
from daft.filesystem import _resolve_paths_and_filesystem
from daft.logical.schema import Schema
from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions
from daft.runners.partitioning import (
TableParseCSVOptions,
TableParseParquetOptions,
TableReadOptions,
)
from daft.table import Table

if TYPE_CHECKING:
Expand Down Expand Up @@ -97,6 +101,7 @@ def read_parquet(
schema: Schema,
fs: fsspec.AbstractFileSystem | None = None,
read_options: TableReadOptions = TableReadOptions(),
parquet_options: TableParseParquetOptions = TableParseParquetOptions(),
io_config: IOConfig | None = None,
use_native_downloader: bool = False,
) -> Table:
Expand All @@ -118,6 +123,7 @@ def read_parquet(
columns=read_options.column_names,
num_rows=read_options.num_rows,
io_config=io_config,
coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

Expand All @@ -132,11 +138,11 @@ def read_parquet(

# If no rows required, we manually construct an empty table with the right schema
if read_options.num_rows == 0:
pqf = papq.ParquetFile(f)
pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit))
arrow_schema = pqf.metadata.schema.to_arrow_schema()
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema)
elif read_options.num_rows is not None:
pqf = papq.ParquetFile(f)
pqf = papq.ParquetFile(f, coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit))
# Only read the required row groups.
rows_needed = read_options.num_rows
for i in range(pqf.metadata.num_row_groups):
Expand All @@ -152,6 +158,7 @@ def read_parquet(
table = papq.read_table(
f,
columns=read_options.column_names,
coerce_int96_timestamp_unit=str(parquet_options.coerce_int96_timestamp_unit),
)

return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema)
Expand Down
9 changes: 4 additions & 5 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,8 @@ pub(super) fn decimal128_to_str(val: i128, _precision: u8, scale: i8) -> String
}

pub(super) fn timestamp_to_str_naive(val: i64, unit: &TimeUnit) -> String {
let chrono_ts = {
arrow2::temporal_conversions::timestamp_to_naive_datetime(val, unit.to_arrow().unwrap())
};
let chrono_ts =
{ arrow2::temporal_conversions::timestamp_to_naive_datetime(val, unit.to_arrow()) };
let format_str = match unit {
TimeUnit::Seconds => "%Y-%m-%dT%H:%M:%S",
TimeUnit::Milliseconds => "%Y-%m-%dT%H:%M:%S%.3f",
Expand All @@ -343,7 +342,7 @@ pub(super) fn timestamp_to_str_offset(
TimeUnit::Microseconds => chrono::SecondsFormat::Micros,
TimeUnit::Nanoseconds => chrono::SecondsFormat::Nanos,
};
arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow().unwrap(), offset)
arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow(), offset)
.to_rfc3339_opts(seconds_format, false)
}

Expand All @@ -354,7 +353,7 @@ pub(super) fn timestamp_to_str_tz(val: i64, unit: &TimeUnit, tz: &chrono_tz::Tz)
TimeUnit::Microseconds => chrono::SecondsFormat::Micros,
TimeUnit::Nanoseconds => chrono::SecondsFormat::Nanos,
};
arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow().unwrap(), tz)
arrow2::temporal_conversions::timestamp_to_datetime(val, unit.to_arrow(), tz)
.to_rfc3339_opts(seconds_format, false)
}

Expand Down
6 changes: 3 additions & 3 deletions src/daft-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ impl DataType {
DataType::Float64 => Ok(ArrowType::Float64),
DataType::Decimal128(precision, scale) => Ok(ArrowType::Decimal(*precision, *scale)),
DataType::Timestamp(unit, timezone) => {
Ok(ArrowType::Timestamp(unit.to_arrow()?, timezone.clone()))
Ok(ArrowType::Timestamp(unit.to_arrow(), timezone.clone()))
}
DataType::Date => Ok(ArrowType::Date32),
DataType::Time(unit) => Ok(ArrowType::Time64(unit.to_arrow()?)),
DataType::Duration(unit) => Ok(ArrowType::Duration(unit.to_arrow()?)),
DataType::Time(unit) => Ok(ArrowType::Time64(unit.to_arrow())),
DataType::Duration(unit) => Ok(ArrowType::Duration(unit.to_arrow())),
DataType::Binary => Ok(ArrowType::LargeBinary),
DataType::Utf8 => Ok(ArrowType::LargeUtf8),
DataType::FixedSizeList(field, size) => {
Expand Down
12 changes: 5 additions & 7 deletions src/daft-core/src/datatypes/time_unit.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use arrow2::datatypes::TimeUnit as ArrowTimeUnit;

use common_error::DaftResult;

use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
Expand All @@ -14,12 +12,12 @@ pub enum TimeUnit {

impl TimeUnit {
#![allow(clippy::wrong_self_convention)]
pub fn to_arrow(&self) -> DaftResult<ArrowTimeUnit> {
pub fn to_arrow(&self) -> ArrowTimeUnit {
match self {
TimeUnit::Nanoseconds => Ok(ArrowTimeUnit::Nanosecond),
TimeUnit::Microseconds => Ok(ArrowTimeUnit::Microsecond),
TimeUnit::Milliseconds => Ok(ArrowTimeUnit::Millisecond),
TimeUnit::Seconds => Ok(ArrowTimeUnit::Second),
TimeUnit::Nanoseconds => ArrowTimeUnit::Nanosecond,
TimeUnit::Microseconds => ArrowTimeUnit::Microsecond,
TimeUnit::Milliseconds => ArrowTimeUnit::Millisecond,
TimeUnit::Seconds => ArrowTimeUnit::Second,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ impl Schema {
}
}

pub fn to_arrow(&self) -> DaftResult<arrow2::datatypes::Schema> {
let arrow_fields: DaftResult<Vec<arrow2::datatypes::Field>> =
self.fields.iter().map(|(_, f)| f.to_arrow()).collect();
let arrow_fields = arrow_fields?;
Ok(arrow2::datatypes::Schema {
fields: arrow_fields,
metadata: Default::default(),
})
}

pub fn repr_html(&self) -> String {
// Produces a <table> HTML element.

Expand Down
8 changes: 5 additions & 3 deletions src/daft-core/src/series/array_impl/data_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ fn logical_to_arrow<'a>(
.as_any()
.downcast_ref::<arrow2::array::PrimitiveArray<i64>>()
.unwrap();
let casted: Box<dyn arrow2::array::Array> = Box::new(downcasted.clone().to(
arrow2::datatypes::DataType::Duration(unit.to_arrow().unwrap()),
));
let casted: Box<dyn arrow2::array::Array> = Box::new(
downcasted
.clone()
.to(arrow2::datatypes::DataType::Duration(unit.to_arrow())),
);
Cow::Owned(casted)
}

Expand Down
Loading