-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet Predicate Pushdown Does Not Handle Type Coercion #7925
Comments
@tustvold do you have a reproducer for this one? I tried a query like you described and it does not error:
|
You have an explicit cast in there, the issue originates when the schema provided to the ParquetExec and used for planning doesn't match that of the underlying parquet file. The casts are internal to SchemaAdapter and will not show up in the plan |
I see -- thank you. |
@mhilton reminded me that we saw this in IOx when we tried to change from |
I have been trying to make a reproducer for this issue, but I can not seem to. -- Create a parquet file to write two distinct row groups
copy (
values
(arrow_cast('2024-12-19', 'Timestamp(Nanosecond, Some("UTC"))')),
(arrow_cast('2024-12-20', 'Timestamp(Nanosecond, Some("UTC"))'))
)
to '/tmp/example.parquet'
(
'MAX_ROW_GROUP_SIZE' 1,
DATA_PAGESIZE_LIMIT 1
);
+-------+
| count |
+-------+
| 2 |
+-------+
1 row in set. Query took 0.016 seconds. This shows there are two row groups with UTC timestamp: ❯ describe '/tmp/example.parquet';
+-------------+------------------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+------------------------------------+-------------+
| column1 | Timestamp(Nanosecond, Some("UTC")) | YES |
+-------------+------------------------------------+-------------+
1 row in set. Query took 0.002 seconds.
❯ select * from parquet_metadata('/tmp/example.parquet');
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| /tmp/example.parquet | 0 | 1 | 1 | 71 | 0 | 93 | 1 | "column1" | INT64 | 1734566400000000000 | 1734566400000000000 | 0 | | 1734566400000000000 | 1734566400000000000 | ZSTD(ZstdLevel(1)) | [PLAIN, RLE, RLE_DICTIONARY] | | 4 | 35 | 89 | 71 |
| /tmp/example.parquet | 1 | 1 | 1 | 71 | 0 | 258 | 1 | "column1" | INT64 | 1734652800000000000 | 1734652800000000000 | 0 | | 1734652800000000000 | 1734652800000000000 | ZSTD(ZstdLevel(1)) | [PLAIN, RLE, RLE_DICTIONARY] | | 169 | 200 | 89 | 71 |
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+ Now, when I create a table that requires type coercion (in this case defining ❯ create external table utc_table(column1 timestamp) stored as parquet location '/tmp/example.parquet';
0 rows in set. Query took 0.001 seconds. (note the predicate ❯ explain analyze select * from utc_table where column1='2024-12-19'::timestamp;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=3.302µs] |
| | FilterExec: column1@0 = 1734566400000000000, metrics=[output_rows=1, elapsed_compute=46.39µs] |
| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[fetch_time=1.081666ms, repart_time=1ns, send_time=2.792µs] |
| | ParquetExec: file_groups={1 group: [[tmp/example.parquet]]}, projection=[column1], predicate=column1@0 = 1734566400000000000, pruning_predicate=column1_min@0 <= 1734566400000000000 AND 1734566400000000000 <= column1_max@1, required_guarantees=[column1 in (1734566400000000000)], metrics=[output_rows=1, elapsed_compute=1ns, file_open_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, row_groups_pruned_statistics=1, num_predicate_creation_errors=0, bytes_scanned=172, pushdown_rows_filtered=0, row_groups_pruned_bloom_filter=0, file_scan_errors=0, time_elapsed_scanning_until_data=369.125µs, pushdown_eval_time=2ns, time_elapsed_opening=678.666µs, time_elapsed_scanning_total=389.041µs, time_elapsed_processing=585.499µs, page_index_eval_time=19.251µs] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.012 seconds. The same result comes from ❯ explain analyze select * from '/tmp/example.parquet' where column1='2024-12-19'::timestamp;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=4.042µs] |
| | FilterExec: column1@0 = 1734566400000000000, metrics=[output_rows=1, elapsed_compute=69µs] |
| | ParquetExec: file_groups={1 group: [[tmp/example.parquet]]}, projection=[column1], predicate=column1@0 = 1734566400000000000, pruning_predicate=column1_min@0 <= 1734566400000000000 AND 1734566400000000000 <= column1_max@1, required_guarantees=[column1 in (1734566400000000000)], metrics=[output_rows=1, elapsed_compute=1ns, file_open_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, row_groups_pruned_statistics=1, num_predicate_creation_errors=0, bytes_scanned=172, pushdown_rows_filtered=0, row_groups_pruned_bloom_filter=0, file_scan_errors=0, time_elapsed_scanning_until_data=510.375µs, pushdown_eval_time=2ns, time_elapsed_opening=736µs, time_elapsed_scanning_total=596.125µs, time_elapsed_processing=554.584µs, page_index_eval_time=19.126µs] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.015 seconds. Maybe the problem has something to do with constructing parquet exec directly as we do in InfluxDB |
I can pick this up as I believe we are impacted by this as well. Going to work on creating reproducer first and will provide an update then. |
As an update, this wasnt on our critical path so its been on the back burner. Unclear if ill be looking into this in the short term. |
@matthewmturner did you ever find a reproducer that shows the problem? |
@alamb I didnt, but here is the python script I was working on in case it helps anyone.
|
The case we're running into in InfluxDB when enabling timezones is slightly different. It is a parquet file with Timestamp without a timezone and then querying with either a timezone or in UTC. The odd part is that we have manually set the schema to be 'UTC', even though the backing parquet file is not.
I agree with the suggestion that this is the cause of the issue we're encountering. I have not been able to reproduce the issue natively in DataFusion using the datafusion-cli, as there is no way (that I can see) to change the schema. It does behave as I would expect when the schema matches the files. This is the script I'm using to generate some parquet files. import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df = pd.DataFrame({
'time': [
pd.Timestamp(year=2024,month=1,day=1,second=0),
pd.Timestamp(year=2024,month=1,day=1,second=1),
pd.Timestamp(year=2024,month=1,day=1,second=2),
pd.Timestamp(year=2024,month=1,day=1,second=3),
],
})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example_no_tz.parquet')
df = pd.DataFrame({
'time': [
pd.Timestamp(year=2024,month=1,day=1,second=0,tz='UTC'),
pd.Timestamp(year=2024,month=1,day=1,second=1,tz='UTC'),
pd.Timestamp(year=2024,month=1,day=1,second=2,tz='UTC'),
pd.Timestamp(year=2024,month=1,day=1,second=3,tz='UTC'),
],
})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example_with_tz_utc.parquet') Some sample queries:
If you perform any queries against the example with timezone, where the predicate is not in the same timezone, you get a type coercion error, which I believe makes sense.
|
take |
Describe the bug
Following #6458 SchemaAdapter as used by ParquetExec will automatically coerce a parquet file's schema to that of the table.
This logic does not currently extend to either
build_row_filter
orPruningPredicate
To Reproduce
Pushdown a predicate on a column that is relying on type coercion and get type errors
Expected behavior
The pruning logic should perform type coercion
Additional context
No response
The text was updated successfully, but these errors were encountered: