Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List-of-String Array panics in the presence of row filters #4365

Closed
AdamGS opened this issue Jun 5, 2023 · 4 comments · Fixed by #4376
Closed

List-of-String Array panics in the presence of row filters #4365

AdamGS opened this issue Jun 5, 2023 · 4 comments · Fixed by #4376
Labels
bug parquet Changes to the parquet crate

Comments

@AdamGS
Copy link
Contributor

AdamGS commented Jun 5, 2023

Describe the bug

Reading Parquet files that have a List array with items of type Utf8, panic in the presence of a filter on another column due to a length mismatch of arrays when building the underlying StructArray. The error message is:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError("Parquet argument error: Parquet error: Not all children array length are the same!")', src/main.rs:83:21

To Reproduce

This is a pretty small example I built:

use arrow::array::{ListBuilder, StringBuilder, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use std::sync::Arc;

use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::{ArrowWriter, ProjectionMask};
use parquet::file::properties::WriterProperties;
use std::error::Error;

use tempfile::NamedTempFile;

const BATCH_SIZE: usize = 1024;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let schema = Arc::new(Schema::new(vec![
        Field::new(
            "list",
            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
            false,
        ),
        Field::new("numbers", DataType::UInt32, false),
    ]));
    let temp_file = NamedTempFile::new()?;

    let mut writer = ArrowWriter::try_new(
        temp_file.reopen()?,
        schema.clone(),
        Some(WriterProperties::builder().build()),
    )?;
    for _ in 0..2 {
        let mut list_a_builder = ListBuilder::new(StringBuilder::new());
        for i in 0..1024 {
            list_a_builder.values().append_value(format!("{i}"));

            list_a_builder.append(true);
        }
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(list_a_builder.finish()),
                Arc::new(UInt32Array::from_iter_values(
                    (0..BATCH_SIZE).map(|n| n as u32),
                )),
            ],
        )?;
        writer.write(&batch)?;
    }
    let _metadata = writer.close()?;

    let mut file = tokio::fs::File::open(temp_file.path()).await.unwrap();

    let parquet_metadata = file.get_metadata().await.unwrap();
    let file_metadata = parquet_metadata.file_metadata();
    let schema_descriptor = file_metadata.schema_descr();

    // We filter on the numerical column
    let row_filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
        ProjectionMask::leaves(schema_descriptor, vec![schema.fields().len() - 1]),
        |batch| arrow::compute::gt_dyn_scalar(batch.column(0), 100),
    ))]);

    // This is the key section - whether we materialize the list-of-strings column alone or with another column that was filtered
    #[cfg(feature = "with_bug")]
    let projection_mask = ProjectionMask::roots(schema_descriptor, [0, 1]); // Both columns

    #[cfg(not(feature = "with_bug"))]
    let projection_mask = ProjectionMask::roots(schema_descriptor, [0]); // Just the list of strings columns

    let mut reader = ParquetRecordBatchStreamBuilder::new(file)
        .await
        .unwrap()
        .with_row_filter(row_filter)
        .with_projection(projection_mask)
        .build()
        .unwrap();

    while let Some(rb) = reader.next().await {
        let rb = rb.unwrap();
        println!("count = {}", rb.num_rows())
    }

    Ok(())
}

the Cargo.toml file here looks like:

[package]
name = "reproduce"
version = "0.1.0"
edition = "2021"

[features]
with_bug = []

[dependencies]
arrow = { version = "40", features = ["simd"] }
parquet = { version = "40", features = ["async"] }
tempfile = "*"
tokio = { version = "1.16.1", features = [
    "macros",
    "rt-multi-thread",
    "time",
    "fs",
] }
futures = "0.3.18"

run it with cargo run --features with_bug

Expected behavior

My understanding is that it should work, and the example even contains a case where it works.

Additional context

Would be glad to give a hand, I'm just not exactly sure what's the right way to tackle this issue and how big it is. Seems like it's somewhere relatively deep the the parquet logic, but I couldn't pinpoint the exact cause.

@AdamGS AdamGS added the bug label Jun 5, 2023
@tustvold
Copy link
Contributor

tustvold commented Jun 5, 2023

Thank you for the report, yes I would expect this to work correctly

A reduced example:

use arrow::array::{ListBuilder, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

use bytes::Bytes;
use parquet::arrow::arrow_reader::{
    ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
    let schema = Arc::new(Schema::new(vec![Field::new_list(
        "list",
        Field::new("item", DataType::Utf8, true),
        false,
    )]));
    let mut buf = Vec::with_capacity(1024);

    let mut writer = ArrowWriter::try_new(
        &mut buf,
        schema.clone(),
        Some(WriterProperties::builder().build()),
    )?;
    for _ in 0..2 {
        let mut list_a_builder = ListBuilder::new(StringBuilder::new());
        for i in 0..1024 {
            list_a_builder.values().append_value(format!("{i}"));
            list_a_builder.append(true);
        }
        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![Arc::new(list_a_builder.finish())],
        )?;
        writer.write(&batch)?;
    }
    let _metadata = writer.close()?;

    let buf = Bytes::from(buf);
    let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
        .unwrap()
        .with_row_selection(RowSelection::from(vec![
            RowSelector::skip(100),
            RowSelector::select(924),
            RowSelector::skip(100),
            RowSelector::select(924),
        ]))
        .build()
        .unwrap();

    let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
    assert_eq!(total_rows, 924 * 2);

    Ok(())
}

It also does not seem to be specific to lists of strings.

I will continue digging

@tustvold
Copy link
Contributor

tustvold commented Jun 5, 2023

Thank you for this, I've filed #4368 with the cause of this. Will need to take some time to work out how best to fix this

tustvold added a commit to tustvold/arrow-rs that referenced this issue Jun 6, 2023
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jun 6, 2023
tustvold added a commit that referenced this issue Jun 8, 2023
* Move record delimiting into ColumnReader (#4365)

* Misc tweaks

* More tests

* Clippy

* Review feedback
@AdamGS
Copy link
Contributor Author

AdamGS commented Jun 8, 2023

@tustvold thank you so much for the quick turnaround!

@alamb
Copy link
Contributor

alamb commented Jun 16, 2023

label_issue.py automatically added labels {'parquet'} from #4376

@alamb alamb added the parquet Changes to the parquet crate label Jun 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants