-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive partitioning tracking issue #15441
Comments
If I may, here's another one #14936 |
I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference? Ie. if you have multiple keys that are partitioned against but you specify only a subset of them, will it infer the rest? |
@stinodego why not extend the schema to the full table instead of just the partition columns? |
At this point it does not. You have to specify the full schema of the Hive partitions. Similar to other
At least in the case of Parquet, that part of the schema is already available from the data. Not sure a full |
@stinodego it is part of the parquet, but in situations with schema evolution, Polars would not be able to handle those situations. Also if I know the schema ahead, you can esentially skip reading the parquet metadata |
Can you give an example?
I don't know, there's other stuff in the metadata besides the schema. Not sure yet exactly what we're actually using. |
df = pl.DataFrame({
"foo": [1],
"bar": [2],
}).write_parquet("polars_parquet/test1.parquet")
df = pl.DataFrame({
"foo": [2],
"bar": [3],
"baz": ["hello world"]
}).write_parquet("polars_parquet/test2.parquet") When you read with Polars, it incorrectly assumes that the first parquet is the schema for all parquets in the table. So when you read you get only pl.read_parquet("polars_parquet/*.parquet")
shape: (2, 2)
┌─────┬─────┐
│ foo ┆ bar │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1 ┆ 2 │
│ 2 ┆ 3 │
└─────┴─────┘ Now let's write in the other order, and polars will panick because it cannot handle that a column is missing in a parquet file. See this issue I made a while ago #14980: df = pl.DataFrame({
"foo": [2],
"bar": [3],
"baz": ["hello world"]
}).write_parquet("polars_parquet/test1.parquet")
df = pl.DataFrame({
"foo": [1],
"bar": [2],
}).write_parquet("polars_parquet/test2.parquet")
pl.read_parquet("polars_parquet/*.parquet")
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace It's a common use case to evolve parquet tables without having to rewrite all the older files to conform to this new schema |
Having something akin to Pyarrow datasets: #13086, would make lot's of sense |
Ok, I see what you mean. We should support this. |
This might be interesting inspiration/source of ideas for a dataset abstraction in polars: |
Any chance you would reconsider this as part of the reworking of hive partition handling? #12041 |
Here's another one #15586. It's to change the default for |
As I understand adding partitioned fields to the schema supposed to enable hive partitions support. const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/*";
let mut schema = Schema::new();
schema.with_column("year".into(), DataType::Int8);
schema.with_column("month".into(), DataType::Int8);
let schema = Arc::new(schema);
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"eu-west-1".into()),
]);
let mut args = ScanArgsParquet::default();
args.hive_options.enabled = true;
args.hive_options.schema = Some(schema);
args.cloud_options = Some(cloud_options);
// Check time required to read the data.
let start = std::time::Instant::now();
let df = LazyFrame::scan_parquet(TEST_S3, args)?
.with_streaming(true)
.collect()?; the result is |
Enhancement request for "Support directory input": #14342 |
Thank you. To be honest I'm quite surprised. How anyone can use this tool in any serious work without ability to load data from a directory. All tables are partitioned multi file. 👀 |
You can already achieve this by appending Directory support will function slightly differently, as it will do some additional validation, but it's mostly the same. |
Yes, it is described in the referenced enhancement request (the /**/*.parquet part). |
Thank you but my parquet tiles do not have any extensions. And adding const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/**/*
Meanwhile if I manually set some specific combination of my partition values it works. const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/year=2024/month=5/*"; But is believe manually adding values it is not how HIVE partitioning supposed to work? Or I'm doing something wrong? If I'm adding extensions to all files the |
Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON |
It's on the list! |
Tossing in a suggestion to also support reading/writing Pyarrow/Spark compatible parquet _metadata files. See #7707 |
#15823 probably belongs here. |
@stinodego, Regarding this comment
Is there a github issue tracking this? It's not noted in the issue checklist here and, as far as I can see, the trail goes cold with the comment and #15508. For us, the lack of ability to explicitly set schemas for the table has prevented us using |
Not sure if this is the correct place to write this, but... For the native partitioned Parquet reader, would it be possible to support loading unions of columns from different partitions when they contain different sets of columns? This would correspond to "diagonal" concat. For example, when working with limit order book data, daily partitions of orderbook levels have varying amount of columns. The pyarrow reader silently drops colums which are not present in all partitions at the same time. I wonder if it would be possible to surface concatenation option to the top-level API in the native polars reader? |
Some addition to ion-elgreco's comment on schema evolution: import polars as pl
import os
# create directories:
path1 = './a/month_code=M202406'
if not os.path.exists(path1):
os.makedirs(path1)
path2 = './a/month_code=M202407'
if not os.path.exists(path2):
os.makedirs(path2)
# create different partitions:
df = pl.DataFrame({'a': [1,2,3], 'b': ['a','b','c']})
df.write_parquet('./a/month_code=M202406/part_0.parquet')
df2 = pl.DataFrame({'a': [1,2,3], 'b': ['a','a','b'], 'c': [22,33,44]})
df2.write_parquet('./a/month_code=M202407/part_0.parquet')
# try to read data:
df3 = pl.scan_parquet('./a', hive_partitioning=True)
df3.collect() And I get the error: This should be handled, as well (ideally with an option to fill those columns with null values, that do not exist in the current partition, but exists in some other partitions). |
Most tasks here have been completed, so I'll close this tracking issue. Feel free to open a new issue if there is a Hive partitioning feature you would like implemented. |
pyarrow
15.0.0 #13892polars.scan_parquet
method #12894try_parse_hive_dates
parameterfile_name
argument that adds the file name to theDataFrame
. (can use string interning here).scan_ipc
#14885The text was updated successfully, but these errors were encountered: