Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): Use polars parquet reader for delta scan #19103

Merged

Conversation

ion-elgreco
Copy link
Contributor

This an intermediate stage until we have something working with delta-kernel-rs.

Couple odd things:

  • Hive schema is still required even though we have the new schema param, probably best to merge these things.
  • Polars Schema doesn't have from_arrow() method, so currently creating empty arrow table and going through DataFrame :)

@ritchie46
Copy link
Member

Nice, I didn't know we could bring our own readers.

@ion-elgreco ion-elgreco changed the title refactor(python): use polars parquet reader for delta read/scan refactor(python): use polars parquet reader for delta scan Oct 5, 2024
@ion-elgreco
Copy link
Contributor Author

Nice, I didn't know we could bring our own readers.

Absolutely we can, this is actually encouraged. Polars parquet reader is also lots faster than pyarrow.

The only thing is, this intermediate stage will be bound to same protocol support as the pyarrow scanner. At some point we need to finish a full native reader polars and delta-kernel-rs, some preliminary work I did here: https://github.com/ion-elgreco/polars-deltalake/tree/feat/delta_io_plugin

Just hard to find time nowadays. A dev from the core team who is working on parquet could probably do this easier since that dev is deep into the polars rust code ^^

@ion-elgreco ion-elgreco changed the title refactor(python): use polars parquet reader for delta scan refactor(python): Use polars parquet reader for delta scan Oct 5, 2024
@github-actions github-actions bot added internal An internal refactor or improvement python Related to Python Polars and removed title needs formatting labels Oct 5, 2024
Copy link

codecov bot commented Oct 5, 2024

Codecov Report

Attention: Patch coverage is 69.04762% with 13 lines in your changes missing coverage. Please review.

Project coverage is 79.50%. Comparing base (e52a598) to head (6cdc5e7).
Report is 84 commits behind head on main.

Files with missing lines Patch % Lines
py-polars/polars/io/delta.py 69.04% 9 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #19103      +/-   ##
==========================================
- Coverage   79.91%   79.50%   -0.42%     
==========================================
  Files        1536     1547      +11     
  Lines      211659   213602    +1943     
  Branches     2445     2451       +6     
==========================================
+ Hits       169156   169830     +674     
- Misses      41948    43218    +1270     
+ Partials      555      554       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ion-elgreco
Copy link
Contributor Author

ion-elgreco commented Oct 5, 2024

@ritchie46 one test fails on windows: https://github.com/pola-rs/polars/actions/runs/11192235502/job/31116136984?pr=19103#step:11:202 when it encounters a hive path, I am not seeing this on linux though

@ritchie46
Copy link
Member

@nameexhaustion can you take a look at this one later?

@nameexhaustion
Copy link
Collaborator

The Windows test should be fixed after a rebase on main

@ion-elgreco
Copy link
Contributor Author

ion-elgreco commented Oct 18, 2024

The Windows test should be fixed after a rebase on main

Great, I'm currently out so will take a look at it somewhere mid November

@deanm0000
Copy link
Collaborator

Am I correct that this doesn't use the delta log for partition pruning?

It seems we ought to go through the io plugin framework and then use dt.get_add_actions() to do that pruning.

I haven't worked with that io plugin framework to figure out how to avoid the following being a two step process.

For instance suppose what I want is

df = scan_delta(dt).filter(pl.col('node_id')==something).collect()

I can two step that like this

files = pl.from_arrow(dt.get_add_actions()).filter((pl.col('min').struct.field('node_id')<=something) & (pl.col('max').struct.field('node_id')>=something))['path']

df=pl.scan_parquet([dt.table_uri+'/'+x for x in files]).filter(pl.col('node_id')==something).collect()

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I've left a few comments, but we can get this in very shortly.

py-polars/polars/io/delta.py Outdated Show resolved Hide resolved
py-polars/polars/io/delta.py Show resolved Hide resolved
py-polars/polars/io/delta.py Show resolved Hide resolved
@ritchie46 ritchie46 merged commit 87b5bca into pola-rs:main Nov 16, 2024
13 checks passed
@TinoSM
Copy link

TinoSM commented Nov 18, 2024

@ion-elgreco

This seems to be a breaking change/broke support for S3 (actually in my case i also need AWS_ENDPOINT)

E       polars.exceptions.ComputeError: unknown configuration key: AWS_ACCESS_KEY_ID

I was doing this (as seen in polars docs https://docs.pola.rs/api/python/stable/reference/api/polars.scan_delta.html)

table_path = "s3://bucket/path/to/delta-table/"
storage_options = {
    "AWS_ENDPOINT": "https://blalblabla",
    "AWS_REGION": "eu-central-1",
    "AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID",
    "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY",
}
pl.scan_delta(
    table_path, storage_options=storage_options
).collect()  

I tried moving to the lower-case version I was using for parquets

    return {
        "aws_access_key_id": ...
        "aws_secret_access_key":...
        "aws_session_token":...
        "aws_region": ....
    }

but it seems to be stuck (?)

@ion-elgreco
Copy link
Contributor Author

Delta-rs and Pola-rs use both object store but perhaps delta-rs is more lenient in whether it's uppercase or not.

Can you try passing lowercase keys?

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

@ion-elgreco yeah lowercase works, Actually I had this in my wrappers (lowercasing for non-delta).

docs need to be updated in that case.

However something broke in my end, I have a small table with lots and lots of files and fragmentation (its a unit-test-level table so it's acumulating metadata, delta_log has 2106 jsons)

Im doing

scan_delta().limit(4).collect()

this works and finishes in a few seconds

while

scan_delta().limit(4).collect(streaming=True)

is sloooooooow (aka I don't know if it ends), maybe this change broke something?

@ion-elgreco
Copy link
Contributor Author

ion-elgreco commented Nov 18, 2024

@ion-elgreco yeah lowercase works, Actually I had this in my wrappers (lowercasing for non-delta).

docs need to be updated in that case.

However something broke in my end, I have a small table with lots and lots of files and fragmentation (its a unit-test-level table so it's acumulating metadata, delta_log has 2106 jsons)

Im doing

scan_delta().limit(4).collect()

this works and finishes in a few seconds

while

scan_delta().limit(4).collect(streaming=True)

is sloooooooow (aka I don't know if it ends), maybe this change broke something?

You should create a separate issue for showcasing that uppercase storage keys are not used.

Regarding your slowness with streaming that an issue in Pola-rs directly, not related to this change since this change only dispatches the reading to Pola-rs instead of pyarrow

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

@ion-elgreco kk thanks

I had another issue because I was using the same parameters to writer and reader, and "aws_s3_allow_unsafe_rename" is not supported in reader aswell (I'm not using plain s3, alternative impl).

With that taken care and lower-case everything works (removing the streaming=True, it was there because of a mistake back in the day)

@ion-elgreco
Copy link
Contributor Author

@ion-elgreco kk thanks

I had another issue because I was using the same parameters to writer and reader, and "aws_s3_allow_unsafe_rename" is not supported in reader aswell (I'm not using plain s3, alternative impl).

With that taken care and lower-case everything works (removing the streaming=True, it was there because of a mistake back in the day)

aws_s3_allow_unsafe_rename is a custom storage option, specific to delta-rs. But Pola-rs should ideally filter those ones out or return some logs with a warning

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

@ion-elgreco kk thanks
I had another issue because I was using the same parameters to writer and reader, and "aws_s3_allow_unsafe_rename" is not supported in reader aswell (I'm not using plain s3, alternative impl).
With that taken care and lower-case everything works (removing the streaming=True, it was there because of a mistake back in the day)

aws_s3_allow_unsafe_rename is a custom storage option, specific to delta-rs. But Pola-rs should ideally filter those ones out or return some logs with a warning

Yeah I realized its because of that, thanks for the fast answer

I created the ticket for docs:
#19849

However given that 1.14.0 is not announcing it as a breaking change (even if polars is still early days) i'm not sure if a "deprecation-key-translator" should be added until 2.0.0 is there. Not my call though

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

@ion-elgreco

sorry to ping again but I'm struggling at not using partition_options

I'm moving to

pl.scan_delta(
        table_path,....
    ).filter("active")==pl.lit(1))

from


pl.scan_delta(
        table_path,
        pyarrow_options={"partitions": [("active", "=", str(1))]},
         ....
    )

and it complains that there's no active "column", I do "active" in the hive_schema but I guess the one read is "main_schema"

How are we supossed to filter partitions in the new API?

scan_parquet(
        dl_tbl.file_uris(),
        schema=main_schema,
        hive_schema=hive_schema,
        allow_missing_columns=True,
        hive_partitioning=len(partition_columns) > 0,
        storage_options=storage_options,
        rechunk=rechunk or False,
    ).collect_schema()

returns a Lazyframe without "active"

image

@ion-elgreco
Copy link
Contributor Author

Strange, looks like an issue in the parquet reader I guess,

Can you share you like first couple ADD actions paths?

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

there's no data/parquet files in the table, maybe thats what is unexpected?

this is the __delta_log initial json (the only thing in the path as of now)

I can't share the full schema though, I manually removed over 50 columns (from my company) :/, but its like this

{"commitInfo":{"timestamp":1731940340772,"operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[\"active\"]","properties":"{\"column_mapping_mode\":\"NAME\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.4.4 Delta-Lake/2.4.0","txnId":"3d1d5fa2-e87e-4e44-806a-04b3b3ddffec"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"35f5a7d6-1b36-4cda-89ad-0866dd895e88","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[
{\"name\":\"active\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"comment\":\"{'description': 'Identifies if a partiion is active or inactive', 'alias': '', 'enabled': True, 'taxonomy': None, 'examples': []}\"}}, 
{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"comment\":\"{'description': 'columns that specify discard fields', 'alias': '', 'enabled': True, 'taxonomy': None, 'examples':  []}\"}}
]}", "partitionColumns":["active"],"configuration":{"column_mapping_mode":"NAME"},"createdTime":1731940340740}}

This is the full table as of now:
image

@ion-elgreco
Copy link
Contributor Author

there's no data/parquet files in the table, maybe thats what is unexpected?

this is the __delta_log initial json (the only thing in the path as of now)

I can't share the full schema though, I manually removed over 50 columns (from my company) :/, but its like this

{"commitInfo":{"timestamp":1731940340772,"operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[\"active\"]","properties":"{\"column_mapping_mode\":\"NAME\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.4.4 Delta-Lake/2.4.0","txnId":"3d1d5fa2-e87e-4e44-806a-04b3b3ddffec"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"35f5a7d6-1b36-4cda-89ad-0866dd895e88","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[
{\"name\":\"active\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"comment\":\"{'description': 'Identifies if a partiion is active or inactive', 'alias': '', 'enabled': True, 'taxonomy': None, 'examples': []}\"}}, 
{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"comment\":\"{'description': 'columns that specify discard fields', 'alias': '', 'enabled': True, 'taxonomy': None, 'examples':  []}\"}}
]}", "partitionColumns":["active"],"configuration":{"column_mapping_mode":"NAME"},"createdTime":1731940340740}}

This is the full table as of now: image

That's likely the issue yeah, that there is no data. Polars should always return that column in the frame regardless whether there is data or not. You should create a separate issue for that

@TinoSM
Copy link

TinoSM commented Nov 18, 2024

Done #19854

thanks!

Also attached a manually crafted delta table which shows the issue

@Ajay95
Copy link

Ajay95 commented Nov 19, 2024

Hey @ion-elgreco,

I was previously using pl.scan_delta to read partitioned files (2 levels of hive partitoning ) as a PyArrow dataset, and I was getting good performance. However, after testing with the new build, I've noticed a significant performance degradation:

  • Before v1.14.0: Single record with filter on partitioned files took 0.11s
  • After v1.14.0: Same query ith filter on partitioned files takes 4.94s

Both of them are across 17 million records

Have you done any benchmarking on this change in the new fix?

Thank you! 😊

@ion-elgreco
Copy link
Contributor Author

@Ajay95 usually it was 4-5x faster.

I would create a reproducible example and make an issue so that Pola-rs devs can look into it.

The delta reader heavy lifting is now mostly done by the parquet scanner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
internal An internal refactor or improvement python Related to Python Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants