Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: Infer the types when reading #1669

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
HDFS_KERB_TICKET,
HDFS_PORT,
HDFS_USER,
PYARROW_USE_LARGE_TYPES_ON_READ,
S3_ACCESS_KEY_ID,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
Expand Down Expand Up @@ -1348,7 +1347,6 @@ def _task_to_record_batches(
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
use_large_types: bool = True,
partition_spec: Optional[PartitionSpec] = None,
) -> Iterator[pa.RecordBatch]:
_, _, path = _parse_location(task.file.file_path)
Expand Down Expand Up @@ -1382,9 +1380,7 @@ def _task_to_record_batches(
# https://github.com/apache/arrow/issues/41884
# https://github.com/apache/arrow/issues/43183
# Would be good to remove this later on
schema=_pyarrow_schema_ensure_large_types(physical_schema)
if use_large_types
else (_pyarrow_schema_ensure_small_types(physical_schema)),
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
Expand Down Expand Up @@ -1419,7 +1415,6 @@ def _task_to_record_batches(
file_project_schema,
current_batch,
downcast_ns_timestamp_to_us=True,
use_large_types=use_large_types,
)

# Inject projected column values if available
Expand Down Expand Up @@ -1504,14 +1499,6 @@ def __init__(
self._case_sensitive = case_sensitive
self._limit = limit

@property
def _use_large_types(self) -> bool:
"""Whether to represent data as large arrow types.

Defaults to True.
"""
return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

@property
def _projected_field_ids(self) -> Set[int]:
"""Set of field IDs that should be projected from the data files."""
Expand Down Expand Up @@ -1602,10 +1589,14 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)
# Always use large types, since we cannot infer it in a streaming fashion,
# without fetching all the schemas first, which defeats the purpose of streaming
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
self,
tasks: Iterable[FileScanTask],
deletes_per_file: Dict[str, List[ChunkedArray]],
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand All @@ -1620,7 +1611,6 @@ def _record_batches_from_scan_tasks_and_deletes(
deletes_per_file.get(task.file.file_path),
self._case_sensitive,
self._table_metadata.name_mapping(),
self._use_large_types,
self._table_metadata.spec(),
)
for batch in batches:
Expand All @@ -1639,13 +1629,12 @@ def _to_requested_schema(
batch: pa.RecordBatch,
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
use_large_types: bool = True,
) -> pa.RecordBatch:
# We could reuse some of these visitors
struct_array = visit_with_partner(
requested_schema,
batch,
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types),
ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids),
ArrowAccessor(file_schema),
)
return pa.RecordBatch.from_struct_array(struct_array)
Expand All @@ -1655,19 +1644,17 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
_file_schema: Schema
_include_field_ids: bool
_downcast_ns_timestamp_to_us: bool
_use_large_types: bool
_use_large_types: Optional[bool]

def __init__(
self,
file_schema: Schema,
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
use_large_types: bool = True,
Copy link
Collaborator

@sungwy sungwy Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've always dreaded the process of updating our code base in these internal classes and functions because we do not yet have a properly defined list of public classes 😞

Would this change require a deprecation notice first?

) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types

def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
file_field = self._file_schema.find_field(field.field_id)
Expand All @@ -1677,8 +1664,6 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
target_schema = schema_to_pyarrow(
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
)
if not self._use_large_types:
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
return values.cast(target_schema)
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
return pa.RecordBatchReader.from_batches(
target_schema,
batches,
)
).cast(target_schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still return large types if you stream the batches because we don't want to fetch all the schemas upfront.


def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
"""Read a Pandas DataFrame eagerly from this Iceberg table.
Expand Down
21 changes: 8 additions & 13 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
Expand Down Expand Up @@ -535,11 +535,6 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
pa.field("foo", pa.string(), nullable=False),
]
)
arrow_schema_large = pa.schema(
[
pa.field("foo", pa.large_string(), nullable=False),
]
)

tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema)

Expand All @@ -561,27 +556,27 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca
tbl.add_files([file_path])

table_schema = tbl.scan().to_arrow().schema
assert table_schema == arrow_schema_large
assert table_schema == arrow_schema

file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet"
_write_parquet(
tbl.io,
file_path_large,
arrow_schema_large,
arrow_schema,
pa.Table.from_pylist(
[
{
"foo": "normal",
}
],
schema=arrow_schema_large,
schema=arrow_schema,
),
)

tbl.add_files([file_path_large])

table_schema = tbl.scan().to_arrow().schema
assert table_schema == arrow_schema_large
assert table_schema == arrow_schema


@pytest.mark.integration
Expand Down Expand Up @@ -695,8 +690,8 @@ def test_add_files_with_valid_upcast(
pa.schema(
(
pa.field("long", pa.int64(), nullable=True),
pa.field("list", pa.large_list(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True),
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
)
Expand Down Expand Up @@ -746,7 +741,7 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo
"qux": date(2024, 3, 7),
}
],
schema=_pyarrow_schema_ensure_large_types(ARROW_SCHEMA),
schema=ARROW_SCHEMA,
)

lhs = spark.table(f"{identifier}").toPandas()
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:

@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
def test_table_scan_keep_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_default_to_large_types"
arrow_table = pa.Table.from_arrays(
[
Expand Down Expand Up @@ -837,10 +837,10 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None:

expected_schema = pa.schema(
[
pa.field("string", pa.large_string()),
pa.field("string", pa.string()),
pa.field("string-to-binary", pa.large_binary()),
pa.field("binary", pa.large_binary()),
pa.field("list", pa.large_list(pa.large_string())),
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
]
)
assert result_table.schema.equals(expected_schema)
Expand Down Expand Up @@ -881,7 +881,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
expected_schema = pa.schema(
[
pa.field("string", pa.string()),
pa.field("string-to-binary", pa.binary()),
pa.field("string-to-binary", pa.large_binary()),
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
]
Expand Down
28 changes: 18 additions & 10 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from urllib.parse import urlparse

import pandas as pd
import pandas.testing
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
Expand Down Expand Up @@ -401,7 +402,14 @@ def test_python_writes_dictionary_encoded_column_with_spark_reads(
tbl.append(arrow_table)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
assert spark_df.equals(pyiceberg_df)

# We're just interested in the content, PyIceberg actually makes a nice Categorical out of it:
# E AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="name") are different
# E
# E Attribute "dtype" are different
# E [left]: object
# E [right]: CategoricalDtype(categories=['AB', 'CD', 'EF'], ordered=False, categories_dtype=object)
pandas.testing.assert_frame_equal(spark_df, pyiceberg_df, check_dtype=False, check_categorical=False)


@pytest.mark.integration
Expand All @@ -422,7 +430,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
}
pa_schema = pa.schema(
[
pa.field("foo", pa.large_string()),
pa.field("foo", pa.string()),
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field(
Expand All @@ -432,7 +440,7 @@ def test_python_writes_with_small_and_large_types_spark_reads(
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field("bar", pa.large_string()),
pa.field("bar", pa.string()),
]
),
),
Expand All @@ -448,17 +456,17 @@ def test_python_writes_with_small_and_large_types_spark_reads(
arrow_table_on_read = tbl.scan().to_arrow()
assert arrow_table_on_read.schema == pa.schema(
[
pa.field("foo", pa.large_string()),
pa.field("foo", pa.string()),
pa.field("id", pa.int32()),
pa.field("name", pa.large_string()),
pa.field("name", pa.string()),
pa.field(
"address",
pa.struct(
[
pa.field("street", pa.large_string()),
pa.field("city", pa.large_string()),
pa.field("street", pa.string()),
pa.field("city", pa.string()),
pa.field("zip", pa.int32()),
pa.field("bar", pa.large_string()),
pa.field("bar", pa.string()),
]
),
),
Expand Down Expand Up @@ -1164,8 +1172,8 @@ def test_table_write_schema_with_valid_upcast(
pa.schema(
(
pa.field("long", pa.int64(), nullable=True),
pa.field("list", pa.large_list(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
)
Expand Down
18 changes: 9 additions & 9 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,10 +1065,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None:

assert (
repr(result_table.schema)
== """properties: map<large_string, large_string>
child 0, entries: struct<key: large_string not null, value: large_string not null> not null
child 0, key: large_string not null
child 1, value: large_string not null"""
== """properties: map<string, string>
child 0, entries: struct<key: string not null, value: string not null> not null
child 0, key: string not null
child 1, value: string not null"""
)


Expand Down Expand Up @@ -1184,7 +1184,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa
assert (
str(table.scan().to_arrow())
== """pyarrow.Table
other_field: large_string
other_field: string
partition_id: int64
----
other_field: [["foo"]]
Expand Down Expand Up @@ -1246,7 +1246,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC
assert (
str(table.scan().to_arrow())
== """pyarrow.Table
field_1: large_string
field_1: string
field_2: int64
field_3: int64
----
Expand Down Expand Up @@ -1471,9 +1471,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s
assert actual.as_py() == expected
assert (
repr(result_table.schema)
== """locations: map<large_string, struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, entries: struct<key: large_string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
child 0, key: large_string not null
== """locations: map<string, struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, entries: struct<key: string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
child 0, key: string not null
child 1, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null
child 0, latitude: double not null
child 1, longitude: double not null
Expand Down