Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsert docs #1665

Merged
merged 2 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,71 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
```

### Upsert

PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.

Consider the following table, with some data:

```python
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType

import pyarrow as pa

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1]
)

tbl = catalog.create_table("default.cities", schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
tbl.append(df)
```

Next, we'll upsert a table into the Iceberg table:

```python
df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},

# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},

# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1
```

PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.

## Inspecting tables

To explore the table metadata, tables can be inspected.
Expand Down
15 changes: 13 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ def name_mapping(self) -> Optional[NameMapping]:
def upsert(
self,
df: pa.Table,
join_cols: list[str],
join_cols: Optional[List[str]] = None,
when_matched_update_all: bool = True,
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
Expand All @@ -1119,11 +1119,13 @@ def upsert(
Args:

df: The input dataframe to upsert with the table's data.
join_cols: The columns to join on. These are essentially analogous to primary keys
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

Example Use Cases:
Case 1: Both Parameters = True (Full Upsert)
Existing row found → Update it
Expand All @@ -1148,6 +1150,15 @@ def upsert(
"""
from pyiceberg.table import upsert_util

if join_cols is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 we should also update the docstring

join_cols = []
for field_id in self.schema().identifier_field_ids:
col = self.schema().find_column_name(field_id)
if col is not None:
join_cols.append(col)
else:
raise ValueError(f"Field-ID could not be found: {join_cols}")

if not when_matched_update_all and not when_not_matched_insert_all:
raise ValueError("no upsert options selected...exiting")

Expand Down
52 changes: 52 additions & 0 deletions tests/table/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# under the License.
from pathlib import PosixPath

import pyarrow as pa
import pytest
from datafusion import SessionContext
from pyarrow import Table as pa_table

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.table import UpsertResult
from pyiceberg.types import IntegerType, NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog, Table


Expand Down Expand Up @@ -314,3 +317,52 @@ def test_key_cols_misaligned(catalog: Catalog) -> None:

with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""):
table.upsert(df=df_src, join_cols=["order_id"])


def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
identifier = "default.test_upsert_with_identifier_fields"
_drop_table(catalog, identifier)

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema,
)
tbl.append(df)

df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},
# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},
# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema,
)
upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1