Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RecordBatch <--> FlightData encode/decode + tests #3391

Merged
merged 18 commits into from
Dec 31, 2022

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Dec 25, 2022

Which issue does this PR close?

Closes #3371

(note a large amount of this PR is test code)

Rationale for this change

The details of encoding / sending / receiving / reconstructiing RecordBatches over flight is common and somewhat duplicated across every implementation of flight. The mid level flight client aims to automate much of the common piece of this

What changes are included in this PR?

  1. Add encode.rs logic in FlightDataEncoderBuilder (based on the code from IOx in https://github.com/influxdata/influxdb_iox/pull/6460)
  2. Move decode logic out of client and into decode.rs
  3. Make the decoders more generic
  4. Tests: round trip tests of encoding/decoding
  5. Tests for FlightClient::do_get

Are there any user-facing changes?

yes, encoders and decoders

Next planned PRs:

  • Rewrite FlightSQL client in terms of the mid level client.

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Dec 25, 2022
/// calling [`Self::into_inner`] and using the [`FlightDataStream`]
/// directly.
#[derive(Debug)]
pub struct FlightRecordBatchStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved / renamed / tested in decode.rs

// specific language governing permissions and limitations
// under the License.

use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is code based on https://github.com/influxdata/influxdb_iox/pull/6460,

It handles encoding RecordBatches into FlightData and the details of dictionaries, etc.

/// Ipc writer options
options: IpcWriteOptions,
/// Metadata to add to the schema message
app_metadata: Bytes,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope eventually there can be options related to dictionary encoding here -- like "try and match dictionaries" for example

// specific language governing permissions and limitations
// under the License.

//! Tests for round trip encoding / decoding
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the round trip tests for encode/decode (that ensure that RecordBatches sent via Flight get through correctly

I expect that we can use this framework as we sort out how to properly encode dictionaries (e.g. #3389)

@@ -37,7 +37,7 @@ use arrow_schema::*;
use crate::compression::CompressionCodec;
use crate::CONTINUATION_MARKER;

/// IPC write options used to control the behaviour of the writer
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by doc fixes

@@ -173,7 +174,90 @@ async fn test_get_flight_info_metadata() {

// TODO more negative tests (like if there are endpoints defined, etc)

// TODO test for do_get
#[tokio::test]
async fn test_do_get() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here are the tests for do_get promised in #3378

@alamb
Copy link
Contributor Author

alamb commented Dec 25, 2022

cc @avantgardnerio @nevi-me and @tustvold

Comment on lines +144 to +150
DecodedPayload::Schema(_) => {
self.got_schema = true;
// Need next message, poll inner again
}
DecodedPayload::RecordBatch(batch) => {
return Poll::Ready(Some(Ok(batch)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to verify the schema of record batch against received schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually would like to avoid verifying the schema at the lower level decoder so that clients can send other schema messages if they so wish (we actually do this in IOx).

The higher level record batch decoder, however, should verify the I think

I added tests to illustrate these behaviors in test_chained_streams_batch_decoder and test_chained_streams_data_decoder

Copy link
Contributor Author

@alamb alamb Dec 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added test_mismatched_schema_message to test sending an incorrect schema message. Currently it panics but we can probably turn that into a useful message at some point I improved the message as well

arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
@@ -298,6 +298,12 @@ fn create_array(
make_array(data)
}
_ => {
if nodes.len() <= node_index {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code panic's at nodes.get(node_index) below -- so this change just makes an error rather than a panic, which is a slightly better user experience

@alamb alamb requested a review from tustvold December 27, 2022 15:50
@alamb
Copy link
Contributor Author

alamb commented Dec 27, 2022

@tustvold if you have a chance, I would appreciate a review of this PR.

I am particularly interested in your opinion of how we should handle sending RecordBatch'es and retain the dictionary encoding.

One way might be to have the encode stream match / assign dictionary ids prior to sending. I realize this would be less efficient than if the creators of the dictionaries were to set the ids, but it would likely be more efficient than hydrating the dictionaries into actual data.

@tustvold
Copy link
Contributor

I am particularly interested in your opinion of how we should handle sending RecordBatch'es and retain the dictionary encoding.

I think we could just assign a unique ID to each dictionary encoded field, and then send a new DictionaryBatch for each field for each RecordBatch. We could then potentially retain the previous dictionary and use ArrayData::ptr_eq to elide sending the exact same dictionary again.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just some minor nits, I personally would avoid upstreaming the hydration of dictionaries so that we aren't committed to supporting it long term, I think it should be relatively straightforward to properly support dictionaries in a somewhat sane manner

Comment on lines 343 to 349
if i == i / 2 {
None
} else {
// repeat some values for low cardinality
let v = i / 3;
Some(format!("value{v}"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if i == i / 2 {
None
} else {
// repeat some values for low cardinality
let v = i / 3;
Some(format!("value{v}"))
}
// repeat some values for low cardinality
(i != i / 2).then(|| format!("value{}", i / 3))

I also wonder if i == i / 2 is what you meant to write, as that will only be true for 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right -- I meant num_rows / 2 -- thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, I think the then formulation is harder to read so I would like to leave the current less concise formulaton

fn make_primative_batch(num_rows: usize) -> RecordBatch {
let i: UInt8Array = (0..num_rows)
.map(|i| {
if i == num_rows / 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be rewritten with Option::then as below, to make it significantly more concise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true -- this would be more concise. However after trying it (see below) I think the current formulation is easier to read, so I plan to leave it as is.

Thank you for the suggestion

     let i: UInt8Array = (0..num_rows)
-        .map(|i| {
-            if i == num_rows / 2 {
-                None
-            } else {
-                Some(i.try_into().unwrap())
-            }
-        })
+        .map(|i| (i != num_rows / 2).then(|| i.try_into().unwrap()))
         .collect();

arrow-flight/tests/encode_decode.rs Show resolved Hide resolved
/// 1. Hydrates any dictionaries to its underlying type. See
/// hydrate_dictionary for more information.
///
pub fn prepare_batch_for_flight(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is stateless which means it necessarily will never be able to properly handle dictionaries, given we just deprecated a similar method in the IPC reader, perhaps we should avoid doing this in favour of the stateful encoder above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure what you are suggesting here. I removed the pub in 2267d13 so that we can have more flexibility with non breaking API changes in the future.

/// * <https://github.com/apache/arrow-rs/issues/1206>
///
/// For now we just hydrate the dictionaries to their underlying type
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should keep this as a workaround in IOx, and only upstream the proper dictionary support once it is ready? I just wonder if this is a mode we really want to support long-term, especially if we have it the default behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #3391 (comment)

I will attempt to get rid of this workaround prior to the arrow 31 release

arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Show resolved Hide resolved
@alamb
Copy link
Contributor Author

alamb commented Dec 28, 2022

Mostly just some minor nits, I personally would avoid upstreaming the hydration of dictionaries so that we aren't committed to supporting it long term, I think it should be relatively straightforward to properly support dictionaries in a somewhat sane manner

Thank you -- I will investigate how much effort it will take to do so. I am almost out of time today so I will likely not get a chance to work on this until later in the week

arrow-flight/src/decode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
arrow-flight/src/encode.rs Outdated Show resolved Hide resolved
Comment on lines 246 to 249
None => {
// inner is done
self.done = true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once reaching here, I guess the queue is guaranteed to be empty? If so, seems we can just return Poll::Ready(None); here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct. This case is also handled on the next loop iteration, but I think making it explicit is good too. I did so in 4f30ab7

@alamb
Copy link
Contributor Author

alamb commented Dec 29, 2022

Mostly just some minor nits, I personally would avoid upstreaming the hydration of dictionaries so that we aren't committed to supporting it long term

I have pondered this for a while and my plan is to proceed with merging this PR (with the temporary dictionary hydration) and work on proper dictionary support as a follow on PR for the following reasons:

  1. I want to make sure that the rust dictionary handling is correct and for that I would like to use the existing flight integration tests but they need work to update them to the new client too
  2. I have one PR (and about to be more than one) queued up waiting on this PR

In order to minimize the API churn however, I plan to hold this PR until after we have released 30.0.0 (#3336 ) so we can minimize the chance of releasing the dictionary hydration code

@alamb alamb merged commit dc09b0b into apache:master Dec 31, 2022
@alamb alamb deleted the alamb/flight_data_transfer branch December 31, 2022 12:58
@ursabot
Copy link

ursabot commented Dec 31, 2022

Benchmark runs are scheduled for baseline = 9398af6 and contender = dc09b0b. dc09b0b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb
Copy link
Contributor Author

alamb commented Dec 31, 2022

The follow on PR is ready: #3402

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Mid-level ArrowFlight Client
4 participants