Skip to content

Commit

Permalink
Add upsert docs (#1665)
Browse files Browse the repository at this point in the history
And make the join-cols optional using the identifier fields.
  • Loading branch information
Fokko authored Feb 16, 2025
1 parent dd58ac1 commit 300b840
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
65 changes: 65 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,71 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
```

### Upsert

PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.

Consider the following table, with some data:

```python
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType
import pyarrow as pa
schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1]
)
tbl = catalog.create_table("default.cities", schema=schema)
arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)
# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
tbl.append(df)
```

Next, we'll upsert a table into the Iceberg table:

```python
df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},
# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},
# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema
)
upd = tbl.upsert(df)
assert upd.rows_updated == 1
assert upd.rows_inserted == 1
```

PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.

## Inspecting tables

To explore the table metadata, tables can be inspected.
Expand Down
15 changes: 13 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ def name_mapping(self) -> Optional[NameMapping]:
def upsert(
self,
df: pa.Table,
join_cols: list[str],
join_cols: Optional[List[str]] = None,
when_matched_update_all: bool = True,
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
Expand All @@ -1119,11 +1119,13 @@ def upsert(
Args:
df: The input dataframe to upsert with the table's data.
join_cols: The columns to join on. These are essentially analogous to primary keys
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
Example Use Cases:
Case 1: Both Parameters = True (Full Upsert)
Existing row found → Update it
Expand All @@ -1148,6 +1150,15 @@ def upsert(
"""
from pyiceberg.table import upsert_util

if join_cols is None:
join_cols = []
for field_id in self.schema().identifier_field_ids:
col = self.schema().find_column_name(field_id)
if col is not None:
join_cols.append(col)
else:
raise ValueError(f"Field-ID could not be found: {join_cols}")

if not when_matched_update_all and not when_not_matched_insert_all:
raise ValueError("no upsert options selected...exiting")

Expand Down
52 changes: 52 additions & 0 deletions tests/table/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# under the License.
from pathlib import PosixPath

import pyarrow as pa
import pytest
from datafusion import SessionContext
from pyarrow import Table as pa_table

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.table import UpsertResult
from pyiceberg.types import IntegerType, NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog, Table


Expand Down Expand Up @@ -314,3 +317,52 @@ def test_key_cols_misaligned(catalog: Catalog) -> None:

with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""):
table.upsert(df=df_src, join_cols=["order_id"])


def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
identifier = "default.test_upsert_with_identifier_fields"
_drop_table(catalog, identifier)

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema,
)
tbl.append(df)

df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},
# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},
# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema,
)
upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1

0 comments on commit 300b840

Please sign in to comment.