Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/iceberg): update iceberg source to support newer versions of pyiceberg at runtime #10614

Merged
merged 8 commits into from
Jun 4, 2024

Conversation

cccs-eric
Copy link
Contributor

Goal

The goal of this PR is to let users manually install newer versions of pyiceberg at runtime, which will make ingesting tables from SQL catalog possible (SQLCatalog was introduced in pyiceberg 0.5.0.

How

There are dependency conflicts that occur when trying to upgrade the pyiceberg package dependency to something newer than 0.4.0. The biggest one is pydantic v2 (since pyiceberg 0.5.0) and there is another possible one with sqlalchemy v2 (depending on which extras you are pulling from pyiceberg). Not much has changed in the pyiceberg API since 0.4.0 and it is possible to make the Iceberg source compatible with newer version of pyiceberg.

Once this PR is merged, it will be possible for a user to ingest Iceberg tables with a newer version of pyiceberg (like 0.6.1) using the following flow:

# Notice we are not installing the iceberg extra here since it will pull pyiceberg==0.4.0
pip install acryl-datahub
# Install the latest pyiceberg version and the required extras.  For example, ingesting from a SQLCatalog in Azure would require
# pip install pyiceberg[sql-postgres,adlfs]
pip install pyiceberg[<required extras>]

datahub ingest run --config <path to your recipe.yaml>

where your recipe could look like this:

source:
  type: "iceberg"
  config:
    env: PROD
    catalog:
      # REST catalog configuration example using S3 storage
      my_rest_catalog:
        type: rest
        # Catalog configuration follows pyiceberg's documentation (https://py.iceberg.apache.org/configuration)
        uri: http://localhost:8181
        s3.access-key-id: admin
        s3.secret-access-key: password
        s3.region: us-east-1
        warehouse: s3a://warehouse/wh/
        s3.endpoint: http://localhost:9000
      # SQL catalog configuration example using Azure datalake storage and a PostgreSQL database
      # my_sql_catalog:
      #   type: sql
      #   uri: postgresql+psycopg2://user:[email protected]:5432/icebergcatalog
      #   adlfs.tenant-id: <Azure tenant ID>
      #   adlfs.account-name: <Azure storage account name>
      #   adlfs.client-id: <Azure Client/Application ID>
      #   adlfs.client-secret: <Azure Client Secret>
    platform_instance: my_rest_catalog
    table_pattern:
      allow:
        - marketing.*
    profiling:
      enabled: true

sink:
  # sink configs

Extra contributions

  • This PR also changes how to configure the pyiceberg catalog inside a recipe. The format used by the Datahub source configuration is now the same as what you would find in your .pyiceberg.yaml, which makes configuration of your recipe easier.
  • This PR adds a new dataset property named partition-spec that will expose the Iceberg table partitioning spec. An example of such a property value could be: [{\"name\": \"timeperiod_loaded\", \"transform\": \"identity\", \"source\": \"timeperiod_loaded\", \"source-id\": 19, \"source-type\": \"date\", \"field-id\": 1000}]. The structure of that JSON follows this following spec from Iceberg.

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels May 29, 2024
Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also make some changes to setup.py

  • Can also change "pyiceberg~=0.4", -> to "pyiceberg>=0.4,<0.7", and remove the pydantic_no_v2 requirement
  • Can add iceberg to the exception list around line 798

That way iceberg 0.6 can be installed natively

catalog: IcebergCatalogConfig = Field(
description="Catalog configuration where to find Iceberg tables. See [pyiceberg's catalog configuration details](https://py.iceberg.apache.org/configuration/).",
# The catalog configuration is using a dictionary to be open and flexible. All the keys and values are handled by pyiceberg. This will future-proof any configuration change done by pyiceberg.
catalog: Dict[str, Dict[str, str]] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a breaking change - can we transparently upgrade folks from the old config to the new one, and then only issue a deprecation warning

I'd prefer to not break existing ingestion recipes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that since the iceberg source was still consider in testing, it was ok to introduce a breaking change. I'll see what I can do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now support the old format. One thing that is left is to report the warning back to the user. What would be your recommendation? It currently print() a message, but I will change this to what ever you recommend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsheth2 Thanks for approving it, but don't merge it just yet. The warning back to the user needs to be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up simply using logging.warning(). Do we have to state anywhere in the docs that the old format is deprecated?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warning should be fine here

"source-type": str(table.schema().find_type(partition.source_id)),
"field-id": partition.field_id,
}
for partition in table.spec().fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need any error handling around this method? or are we reasonably confident that it won't throw an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odds are small, but you are right about this. A broken table could trigger an error, and an error at this stage should not prevent ingestion of the table. I'll add guards and a warning in the logs if an error occurs.

@cccs-eric cccs-eric requested a review from hsheth2 May 31, 2024 16:59
# description="Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.",
# )
# `catalog` field to accept `Any` to handle both new and deprecated formats. Once deprecated format is not supported, we can remove this field and use the above `catalog` field.
catalog: Any = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think the type here should be only Dict[str, Dict[str, str]] as you had previously

the validator will automatically convert the old format to the new one, but we don't want to show the old type in the docs, and we want the correct type annotation so that mypy can check our usages of the configs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right! I made that change before implementing the validator.

@usmanovbf
Copy link

Good news, thank you! Can't wait to see it in master

@hsheth2 hsheth2 merged commit c04b3bc into datahub-project:master Jun 4, 2024
58 checks passed
@cccs-eric cccs-eric deleted the pyiceberg_upgrade branch June 21, 2024 14:08
sleeperdeep pushed a commit to sleeperdeep/datahub that referenced this pull request Jun 25, 2024
yoonhyejin pushed a commit that referenced this pull request Jul 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants