Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet reading hangs when row_group contains more than 2048 rows of data #349

Closed
garyanaplan opened this issue May 25, 2021 · 13 comments · Fixed by #443
Closed

parquet reading hangs when row_group contains more than 2048 rows of data #349

garyanaplan opened this issue May 25, 2021 · 13 comments · Fixed by #443
Labels

Comments

@garyanaplan
Copy link
Contributor

garyanaplan commented May 25, 2021

Describe the bug
Reading an apparently valid parquet file (which can be read by java tools such as parquet-tools) from any rust program will hang. CPU load goes to 100%. Reproduced on both 4.0.0 and 4.1.0. rustc: 1.51.0

To Reproduce
Create a parquet file with at least 1 row group (e.g.: 1). Each row group must have > 2048 rows (e.g.: 2049). Run a (rust) program to read the file and it will hang when visiting the 2048th row. Java program (parquet-tools) reads with no issue.

This test program can be used to produce a file that can then be read using parquet-read to reproduce:

    #[test]
    fn it_writes_data() {
        let path = Path::new("sample.parquet");

        let message_type = "
  message ItHangs {
    REQUIRED INT64 DIM0;
    REQUIRED DOUBLE DIM1;
    REQUIRED BYTE_ARRAY DIM2;
    REQUIRED BOOLEAN DIM3;
  }
";
        let schema = Arc::new(parse_message_type(message_type).unwrap());
        let props = Arc::new(WriterProperties::builder().build());
        let file = fs::File::create(&path).unwrap();
        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
        for _group in 0..1 {
            let mut row_group_writer = writer.next_row_group().unwrap();
            let values: Vec<i64> = vec![0; 2049];
            let my_values: Vec<i64> = values
                .iter()
                .enumerate()
                .map(|(count, _x)| count.try_into().unwrap())
                .collect();
            let my_double_values: Vec<f64> = values
                .iter()
                .enumerate()
                .map(|(count, _x)| count as f64)
                .collect();
            let my_bool_values: Vec<bool> = values
                .iter()
                .enumerate()
                .map(|(count, _x)| count % 2 == 0)
                .collect();
            let my_ba_values: Vec<ByteArray> = values
                .iter()
                .enumerate()
                .map(|(count, _x)| {
                    let s = format!("{}", count);
                    ByteArray::from(s.as_ref())
                })
                .collect();
            while let Some(mut col_writer) = row_group_writer.next_column().expect("next column") {
                match col_writer {
                    // ... write values to a column writer
                    // You can also use `get_typed_column_writer` method to extract typed writer.
                    ColumnWriter::Int64ColumnWriter(ref mut typed_writer) => {
                        typed_writer
                            .write_batch(&my_values, None, None)
                            .expect("writing int column");
                    }
                    ColumnWriter::DoubleColumnWriter(ref mut typed_writer) => {
                        typed_writer
                            .write_batch(&my_double_values, None, None)
                            .expect("writing double column");
                    }
                    ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
                        typed_writer
                            .write_batch(&my_bool_values, None, None)
                            .expect("writing bool column");
                    }
                    ColumnWriter::ByteArrayColumnWriter(ref mut typed_writer) => {
                        typed_writer
                            .write_batch(&my_ba_values, None, None)
                            .expect("writing bytes column");
                    }
                    _ => {
                        println!("huh:!");
                    }
                }
                row_group_writer
                    .close_column(col_writer)
                    .expect("close column");
            }
            let rg_md = row_group_writer.close().expect("close row group");
            println!("total rows written: {}", rg_md.num_rows());
            writer
                .close_row_group(row_group_writer)
                .expect("close row groups");
        }
        writer.close().expect("close writer");

        let bytes = fs::read(&path).unwrap();
        assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
    }

Expected behavior
The read will complete without hanging.

Additional context
My development system is Mac OS X, so only tested on OS X.

rustup reports:
active toolchain

1.51.0-x86_64-apple-darwin (default)
rustc 1.51.0 (2fd73fabe 2021-03-23)

@alamb
Copy link
Contributor

alamb commented May 25, 2021

Thanks for the report @garyanaplan !

@garyanaplan
Copy link
Contributor Author

yw.

Extra Info: It happens with debug or release builds and I reproduced it with 1.51.0 on a linux system.

@k-stanislawek
Copy link
Contributor

k-stanislawek commented Jun 9, 2021

I've also just encountered it. Common element with this reproduction is BOOLEAN field. It worked without BOOLEAN as well.

After quick investigation of the looping code, I've found something suspect, but it's just about naming - not sure if it's actual bug.

This function returns something initialized as input's length and called values_to_read:

values_to_read

Meanwhile calling site (which I can't find on Github, because admittedly I'm using older version - will add it later) assigns the return value to values_read.

Btw it loops because after reading 2048 values, this returned value is 0.

@garyanaplan
Copy link
Contributor Author

Yep. If I update my test to remove BOOLEAN from the schema, the problem goes away. I've done some digging around today and noticed that it looks like the problem might lie in the generation of the file.

I previously reported that parquet-tools dump would happily process the file, however I trimmed down the example to just include BOOLEAN field in schema and increased the number of rows in the group and noted the following when dumping:

value 2039: R:0 D:0 V:true value 2040: R:0 D:0 V:false value 2041: R:0 D:0 V:true value 2042: R:0 D:0 V:false value 2043: R:0 D:0 V:true value 2044: R:0 D:0 V:false value 2045: R:0 D:0 V:true value 2046: R:0 D:0 V:false value 2047: R:0 D:0 V:true value 2048: R:0 D:0 V:false value 2049: R:0 D:0 V:false value 2050: R:0 D:0 V:false value 2051: R:0 D:0 V:false value 2052: R:0 D:0 V:false value 2053: R:0 D:0 V:false value 2054: R:0 D:0 V:false value 2055: R:0 D:0 V:false
All the values after 2048 are false and they continue to be false until the end of the file.
It looks like the generated input file is invalid, so I'm going to poke around there a little next.

@garyanaplan
Copy link
Contributor Author

More poking reveals that PlainEncoder has a bit_writer with a hard-coded size of 256 (big enough to hold 2048 bits...).
src/encodings/encoding.rs: line bit_writer: BitWriter::new(256),
If you adjust that value up or down you trip the error at different times. So, that looks like it's a contributing factor. I'm now trying to understand the logic around buffer flushing and re-use. Feel, like I'm getting close to the root cause.

@garyanaplan
Copy link
Contributor Author

garyanaplan commented Jun 10, 2021

Looks like that hard-coded value (256) in the bit-writer is the root cause. When writing, if we try to put > 2048 boolean values, then the writer just "ignores" the writes. This is caused by the fact that bool encoder silently ignores calls to put_value that return false.

I have a fix for this which works by extending the size of the BitWriter (in 256 byte) increments and also checks the return of put_value in BoolType::encode() and raises an error if the call fails.

Can anyone comment on this approach?

(diff attached)

a.diff.txt

@alamb
Copy link
Contributor

alamb commented Jun 10, 2021

@garyanaplan -- I think the best way to get feedback on the approach would be to open a pull request

@garyanaplan
Copy link
Contributor Author

Yeah. I'm not really happy with it, because I don't love the special handling for Booleans via the BitWriter. Just growing the buffer indefinitely seems "wrong", but I think any other kind of fix would be much more extensive/intrusive.

I'll file the PR and see what feedback I get.

garyanaplan added a commit to garyanaplan/arrow-rs that referenced this issue Jun 10, 2021
When writing BOOLEAN data, writing more than 2048 rows of data will
overflow the hard-coded 256 buffer set for the bit-writer in the
PlainEncoder. Once this occurs, further attempts to write to the encoder
fail, becuase capacity is exceeded, but the errors are silently ignored.

This fix improves the error detection and reporting at the point of
encoding and modifies the logic for bit_writing (BOOLEANS). The
bit_writer is initially allocated 256 bytes (as at present), then each
time the capacity is exceeded the capacity is incremented by another
256 bytes.

This certainly resolves the current problem, but it's not exactly a
great fix because the capacity of the bit_writer could now grow
substantially.

Other data types seem to have a more sophisticated mechanism for writing
data which doesn't involve growing or having a fixed size buffer. It
would be desirable to make the BOOLEAN type use this same mechanism if
possible, but that level of change is more intrusive and probably
requires greater knowledge of the implementation than I possess.

resolves: apache#349
alamb pushed a commit that referenced this issue Jun 16, 2021
…ail (#443)

* improve BOOLEAN writing logic and report error on encoding fail

When writing BOOLEAN data, writing more than 2048 rows of data will
overflow the hard-coded 256 buffer set for the bit-writer in the
PlainEncoder. Once this occurs, further attempts to write to the encoder
fail, becuase capacity is exceeded, but the errors are silently ignored.

This fix improves the error detection and reporting at the point of
encoding and modifies the logic for bit_writing (BOOLEANS). The
bit_writer is initially allocated 256 bytes (as at present), then each
time the capacity is exceeded the capacity is incremented by another
256 bytes.

This certainly resolves the current problem, but it's not exactly a
great fix because the capacity of the bit_writer could now grow
substantially.

Other data types seem to have a more sophisticated mechanism for writing
data which doesn't involve growing or having a fixed size buffer. It
would be desirable to make the BOOLEAN type use this same mechanism if
possible, but that level of change is more intrusive and probably
requires greater knowledge of the implementation than I possess.

resolves: #349

* only manipulate the bit_writer for BOOLEAN data

Tacky, but I can't think of better way to do this without
specialization.

* better isolation of changes

Remove the byte tracking from the PlainEncoder and use the existing
bytes_written() method in BitWriter.

This is neater.

* add test for boolean writer

The test ensures that we can write > 2048 rows to a parquet file and
that when we read the data back, it finishes without hanging (defined as
taking < 5 seconds).

If we don't want that extra complexity, we could remove the
thread/channel stuff and just try to read the file and let the test
runner terminate hanging tests.

* fix capacity calculation error in bool encoding

The values.len() reports the number of values to be encoded and so must
be divided by 8 (bits in a bytes) to determine the effect on the byte
capacity of the bit_writer.
nevi-me pushed a commit that referenced this issue Jul 5, 2021
* simplify interactions with arrow flight APIs

Initial work to implement some basic traits

* more polishing and introduction of a couple of wrapper types

Some more polishing of the basic code I provided last week.

* More polishing

Add support for representing tickets as base64 encoded strings.

Also: more polishing of Display, etc...

* improve BOOLEAN writing logic and report error on encoding fail

When writing BOOLEAN data, writing more than 2048 rows of data will
overflow the hard-coded 256 buffer set for the bit-writer in the
PlainEncoder. Once this occurs, further attempts to write to the encoder
fail, becuase capacity is exceeded, but the errors are silently ignored.

This fix improves the error detection and reporting at the point of
encoding and modifies the logic for bit_writing (BOOLEANS). The
bit_writer is initially allocated 256 bytes (as at present), then each
time the capacity is exceeded the capacity is incremented by another
256 bytes.

This certainly resolves the current problem, but it's not exactly a
great fix because the capacity of the bit_writer could now grow
substantially.

Other data types seem to have a more sophisticated mechanism for writing
data which doesn't involve growing or having a fixed size buffer. It
would be desirable to make the BOOLEAN type use this same mechanism if
possible, but that level of change is more intrusive and probably
requires greater knowledge of the implementation than I possess.

resolves: #349

* only manipulate the bit_writer for BOOLEAN data

Tacky, but I can't think of better way to do this without
specialization.

* better isolation of changes

Remove the byte tracking from the PlainEncoder and use the existing
bytes_written() method in BitWriter.

This is neater.

* add test for boolean writer

The test ensures that we can write > 2048 rows to a parquet file and
that when we read the data back, it finishes without hanging (defined as
taking < 5 seconds).

If we don't want that extra complexity, we could remove the
thread/channel stuff and just try to read the file and let the test
runner terminate hanging tests.

* fix capacity calculation error in bool encoding

The values.len() reports the number of values to be encoded and so must
be divided by 8 (bits in a bytes) to determine the effect on the byte
capacity of the bit_writer.

* make BasicAuth accessible

Following merge with master, make sure this is exposed so that
integration tests work.

also: there has been a release since I last looked at this so update the
deprecation warnings.

* fix documentation for ipc_message_from_arrow_schema

TryFrom, not From

* replace deprecated functions in integrations tests with traits

clippy complains about using deprecated functions, so replace them with
the new trait support.

also: fix the trait documentation

* address review comments

 - update deprecated warnings
 - improve TryFrom for DescriptorType
@MichaelBitard
Copy link

MichaelBitard commented Jul 13, 2021

This still happens with parquet 4.4.0, it may be related to an other type, I'll try to reproduce it with a minimal example, but right now, it always hangs after reading 2046 rows.

EDIT: I just saw this was on master but not released yet. Is there a way to have this fix on a 4.4.1 or is this too much relying on the 5.0.0 SNAPSHOT? I can try and make a PR based on the 4.4.0 if you think you could release it

@MichaelBitard
Copy link

MichaelBitard commented Jul 13, 2021

I just updated prqs to use the latest version of parquet-rs and arrow (commit 6698eed) and the issue still happens with the example you provided @garyanaplan. It is stuck at 2046 rows read.

To reproduce:

  • clone https://github.com/MichaelBitard/pqrs (I pushed the sample.parquet in the repository)
  • launch cargo run rowcount sample.parquet --> you'll see 2049 lines
  • launch cargo run cat sample.parquet
    • It'll hang at the 2046th line:
CurrentRow 2046 0
{DIM0: 2046, DIM1: 2046.0, DIM2: [50, 48, 52, 54], DIM3: true}

It is stuck in the print_rows function: https://github.com/MichaelBitard/pqrs/blob/master/src/utils.rs#L55

@garyanaplan
Copy link
Contributor Author

Hi @MichaelBitard,

Unfortunately, the problem was caused by writing a parquet file. I imagine you created your sample.parquet file with the unfixed version. That would mean you would still hit the problem when reading.

Can you confirm that sample.parquet was created with the fixed code and then verify that it will read ok?

Gary

@MichaelBitard
Copy link

Oops, you are right, sorry.

If I generate the sample.parquet with the latest version, it not longer hangs during reading.

Thanks for noticing and sorry again!

@alamb
Copy link
Contributor

alamb commented Jul 14, 2021

Thank you @MichaelBitard for taking the time to report it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants