-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest/iceberg): update iceberg source to support newer versions of pyiceberg at runtime #10614
fix(ingest/iceberg): update iceberg source to support newer versions of pyiceberg at runtime #10614
Conversation
…duce a new property named partition-spec
…t. A simple copy-paste from .pyiceberg.yaml should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also make some changes to setup.py
- Can also change
"pyiceberg~=0.4",
-> to"pyiceberg>=0.4,<0.7",
and remove the pydantic_no_v2 requirement - Can add iceberg to the exception list around line 798
That way iceberg 0.6 can be installed natively
catalog: IcebergCatalogConfig = Field( | ||
description="Catalog configuration where to find Iceberg tables. See [pyiceberg's catalog configuration details](https://py.iceberg.apache.org/configuration/).", | ||
# The catalog configuration is using a dictionary to be open and flexible. All the keys and values are handled by pyiceberg. This will future-proof any configuration change done by pyiceberg. | ||
catalog: Dict[str, Dict[str, str]] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a breaking change - can we transparently upgrade folks from the old config to the new one, and then only issue a deprecation warning
I'd prefer to not break existing ingestion recipes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that since the iceberg source was still consider in testing
, it was ok to introduce a breaking change. I'll see what I can do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now support the old format. One thing that is left is to report the warning back to the user. What would be your recommendation? It currently print()
a message, but I will change this to what ever you recommend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hsheth2 Thanks for approving it, but don't merge it just yet. The warning back to the user needs to be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up simply using logging.warning()
. Do we have to state anywhere in the docs that the old format is deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warning should be fine here
metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py
Show resolved
Hide resolved
"source-type": str(table.schema().find_type(partition.source_id)), | ||
"field-id": partition.field_id, | ||
} | ||
for partition in table.spec().fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need any error handling around this method? or are we reasonably confident that it won't throw an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Odds are small, but you are right about this. A broken table could trigger an error, and an error at this stage should not prevent ingestion of the table. I'll add guards and a warning in the logs if an error occurs.
# description="Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.", | ||
# ) | ||
# `catalog` field to accept `Any` to handle both new and deprecated formats. Once deprecated format is not supported, we can remove this field and use the above `catalog` field. | ||
catalog: Any = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I think the type here should be only Dict[str, Dict[str, str]]
as you had previously
the validator will automatically convert the old format to the new one, but we don't want to show the old type in the docs, and we want the correct type annotation so that mypy can check our usages of the configs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely right! I made that change before implementing the validator.
Good news, thank you! Can't wait to see it in master |
…of pyiceberg at runtime (datahub-project#10614)
…of pyiceberg at runtime (#10614)
Goal
The goal of this PR is to let users manually install newer versions of pyiceberg at runtime, which will make ingesting tables from SQL catalog possible (SQLCatalog was introduced in pyiceberg 0.5.0.
How
There are dependency conflicts that occur when trying to upgrade the pyiceberg package dependency to something newer than 0.4.0. The biggest one is pydantic v2 (since pyiceberg 0.5.0) and there is another possible one with sqlalchemy v2 (depending on which extras you are pulling from pyiceberg). Not much has changed in the pyiceberg API since 0.4.0 and it is possible to make the Iceberg source compatible with newer version of pyiceberg.
Once this PR is merged, it will be possible for a user to ingest Iceberg tables with a newer version of pyiceberg (like 0.6.1) using the following flow:
where your recipe could look like this:
Extra contributions
.pyiceberg.yaml
, which makes configuration of your recipe easier.partition-spec
that will expose the Iceberg table partitioning spec. An example of such a property value could be:[{\"name\": \"timeperiod_loaded\", \"transform\": \"identity\", \"source\": \"timeperiod_loaded\", \"source-id\": 19, \"source-type\": \"date\", \"field-id\": 1000}]
. The structure of that JSON follows this following spec from Iceberg.Checklist