Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsert docs #1665

Merged
merged 2 commits into from
Feb 16, 2025
Merged

Add upsert docs #1665

merged 2 commits into from
Feb 16, 2025

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Feb 15, 2025

And make the join-cols optional using the identifier fields.

And make the join-cols optional using the identifier fields.
This was referenced Feb 15, 2025
@soumilshah1995
Copy link

lovely

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, minor comment on updating the docstring

@@ -1148,6 +1148,15 @@ def upsert(
"""
from pyiceberg.table import upsert_util

if join_cols is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 we should also update the docstring

@ananthdurai
Copy link

jon_cols seems focused on the primary key. How do we specify the partition column to enable partition pruning?

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this doesnt respect identifier_field_ids columns' uniqueness

For example,

def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
    identifier = "default.test_upsert_with_identifier_fields"
    _drop_table(catalog, identifier)

    schema = Schema(
        NestedField(1, "city", StringType(), required=True),
        NestedField(2, "inhabitants", IntegerType(), required=True),
        # Mark City as the identifier field, also known as the primary-key
        identifier_field_ids=[1],
    )

    tbl = catalog.create_table(identifier, schema=schema)

    arrow_schema = pa.schema(
        [
            pa.field("city", pa.string(), nullable=False),
            pa.field("inhabitants", pa.int32(), nullable=False),
        ]
    )

    # Write some data
    df = pa.Table.from_pylist(
        [
            {"city": "Amsterdam", "inhabitants": 921402},
            {"city": "San Francisco", "inhabitants": 808988},
            {"city": "Drachten", "inhabitants": 45019},
            {"city": "Paris", "inhabitants": 2103000},
        ],
        schema=arrow_schema,
    )
    tbl.append(df)

    df = pa.Table.from_pylist(
        [
            {"city": "Paris", "inhabitants": 921402},
        ],
        schema=arrow_schema,
    )
    upd = tbl.upsert(df, join_cols=["inhabitants"], when_not_matched_insert_all=True)

    print(tbl.scan().to_pandas())

@kevinjqliu
Copy link
Contributor

jon_cols seems focused on the primary key. How do we specify the partition column to enable partition pruning

@ananthdurai the partition columns are part of the Iceberg table definition. The upsert function is called on an Iceberg table. upsert does an overwrite and/or an append. Both of these functions are aware of Iceberg's partition and handles partition pruning as part of Iceberg's partition write feature

@Fokko
Copy link
Contributor Author

Fokko commented Feb 16, 2025

@kevinjqliu Yes, that is an issue, but we don't respect this for any of the operations (append, etc). Doing this would make the operations expensive so we could leave this up to the user. Two more opinionated approaches are:

  • Don't allow join_cols if the table has identifier fields.
  • Remove the join_cols column.

I think it would be nice to push Iceberg-specific features like the identifier fields, but I think the above might be too opinionated. Would love to hear what others think.

@ananthdurai Kevin already provided an excellent answer, If you want to learn more, I would recommend reading the docs on hidden partition pruning.

@mattmartin14
Copy link
Contributor

@kevinjqliu Yes, that is an issue, but we don't respect this for any of the operations (append, etc). Doing this would make the operations expensive so we could leave this up to the user. Two more opinionated approaches are:

  • Don't allow join_cols if the table has identifier fields.

  • Remove the join_cols column.

I think it would be nice to push Iceberg-specific features like the identifier fields, but I think the above might be too opinionated. Would love to hear what others think.

@ananthdurai Kevin already provided an excellent answer, If you want to learn more, I would recommend reading the docs on hidden partition pruning.
@Fokko ,

I honestly didn't even know about the iceberg specific identifier fields until you had recently mentioned it. I can't imagine many have. I see situations where teams have already built a ton of iceberg tables and it would be easier and more explicit for the user to understand if join_cols is an option they can call out. Otherwise, for users that do not know the internal schema of the table and see the code for the first time with no join_cols specified, they will probably be puzzled and wonder "how is this thing doing this correctly?"

I'd personally leave the join_cols as an optional way for users to use upsert.

@mattmartin14
Copy link
Contributor

actually this doesnt respect identifier_field_ids columns' uniqueness

For example,

def test_upsert_with_identifier_fields(catalog: Catalog) -> None:

    identifier = "default.test_upsert_with_identifier_fields"

    _drop_table(catalog, identifier)



    schema = Schema(

        NestedField(1, "city", StringType(), required=True),

        NestedField(2, "inhabitants", IntegerType(), required=True),

        # Mark City as the identifier field, also known as the primary-key

        identifier_field_ids=[1],

    )



    tbl = catalog.create_table(identifier, schema=schema)



    arrow_schema = pa.schema(

        [

            pa.field("city", pa.string(), nullable=False),

            pa.field("inhabitants", pa.int32(), nullable=False),

        ]

    )



    # Write some data

    df = pa.Table.from_pylist(

        [

            {"city": "Amsterdam", "inhabitants": 921402},

            {"city": "San Francisco", "inhabitants": 808988},

            {"city": "Drachten", "inhabitants": 45019},

            {"city": "Paris", "inhabitants": 2103000},

        ],

        schema=arrow_schema,

    )

    tbl.append(df)



    df = pa.Table.from_pylist(

        [

            {"city": "Paris", "inhabitants": 921402},

        ],

        schema=arrow_schema,

    )

    upd = tbl.upsert(df, join_cols=["inhabitants"], when_not_matched_insert_all=True)



    print(tbl.scan().to_pandas())

@kevinjqliu ,

Not to sound blunt but the example above seems odd TBH. If I understand correctly, inhabitants is analogous to population count for a city. Thus, the join column should be city, not inhabitants. City identifies the unique record. Inhabitants is just an attribute of that record.

@kevinjqliu
Copy link
Contributor

Yes, that is an issue, but we don't respect this for any of the operations (append, etc). Doing this would make the operations expensive so we could leave this up to the user.

You're right, this is an issue for all the write operations, we dont take identifier_field_ids into account when writing... I'll raise a separate issue to track this. For now, Im ok with leaving this up to the user/external engine. When done correctly, the write operations will respect the uniqueness.
To quote the spec,

uniqueness of rows by this identifier is not guaranteed or required by Iceberg and it is the responsibility of processing engines or data providers to enforce.

As a followup, we can add a uniqueness check to upsert when identifier_field_ids is set, similar to checking for duplicates. I see this issue as a potential footgun so its better to verify the uniqueness and prevent data correctness problems.

@kevinjqliu
Copy link
Contributor

Not to sound blunt but the example above seems odd TBH

@mattmartin14 it is an odd example! I had a feeling this can break the uniqueness constraint so I crafted an example to show it. It's not something users will normally write, but it does show a data correctness issue. This can become a problem when interacting with the table again, since it is assumed that the identifier_field_ids should provide uniqueness guarantees.

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this approach and shift the responsibility to the user to provide the uniqueness guarantee when using identifier_field_ids.

I raised an issue to figure out the path forward for all write paths when identifier_field_ids is set (#1666)
And perhaps a uniqueness check for upsert when identifier_field_ids is set (#1667)

@Fokko
Copy link
Contributor Author

Fokko commented Feb 16, 2025

I honestly didn't even know about the iceberg specific identifier fields until you had recently mentioned it. I can't imagine many have. I see situations where teams have already built a ton of iceberg tables and it would be easier and more explicit for the user to understand if join_cols is an option they can call out.

Yes, this is also my concern. Keep in mind that the identifier-field-ids are referencing the columns by ID, so if you rename them, nothing breaks :)

Thanks for raising the issues @kevinjqliu, I've provided a way to cover this in #1667 (comment) LMKWYT

@kevinjqliu kevinjqliu merged commit 300b840 into apache:main Feb 16, 2025
8 checks passed
@kevinjqliu
Copy link
Contributor

LGTM! Thanks @Fokko and thanks @mattmartin14 for the review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants