-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement RecordBatch
<--> FlightData
encode/decode + tests
#3391
Conversation
/// calling [`Self::into_inner`] and using the [`FlightDataStream`] | ||
/// directly. | ||
#[derive(Debug)] | ||
pub struct FlightRecordBatchStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved / renamed / tested in decode.rs
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is code based on https://github.com/influxdata/influxdb_iox/pull/6460,
It handles encoding RecordBatches into FlightData and the details of dictionaries, etc.
/// Ipc writer options | ||
options: IpcWriteOptions, | ||
/// Metadata to add to the schema message | ||
app_metadata: Bytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope eventually there can be options related to dictionary encoding here -- like "try and match dictionaries" for example
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Tests for round trip encoding / decoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the round trip tests for encode/decode (that ensure that RecordBatch
es sent via Flight get through correctly
I expect that we can use this framework as we sort out how to properly encode dictionaries (e.g. #3389)
@@ -37,7 +37,7 @@ use arrow_schema::*; | |||
use crate::compression::CompressionCodec; | |||
use crate::CONTINUATION_MARKER; | |||
|
|||
/// IPC write options used to control the behaviour of the writer | |||
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive by doc fixes
@@ -173,7 +174,90 @@ async fn test_get_flight_info_metadata() { | |||
|
|||
// TODO more negative tests (like if there are endpoints defined, etc) | |||
|
|||
// TODO test for do_get | |||
#[tokio::test] | |||
async fn test_do_get() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here are the tests for do_get
promised in #3378
cc @avantgardnerio @nevi-me and @tustvold |
DecodedPayload::Schema(_) => { | ||
self.got_schema = true; | ||
// Need next message, poll inner again | ||
} | ||
DecodedPayload::RecordBatch(batch) => { | ||
return Poll::Ready(Some(Ok(batch))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to verify the schema of record batch against received schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually would like to avoid verifying the schema at the lower level decoder so that clients can send other schema messages if they so wish (we actually do this in IOx).
The higher level record batch decoder, however, should verify the I think
I added tests to illustrate these behaviors in test_chained_streams_batch_decoder
and test_chained_streams_data_decoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added test_mismatched_schema_message
to test sending an incorrect schema message. Currently it panics but we can probably turn that into a useful message at some point I improved the message as well
Co-authored-by: Liang-Chi Hsieh <[email protected]>
…s into alamb/flight_data_transfer
@@ -298,6 +298,12 @@ fn create_array( | |||
make_array(data) | |||
} | |||
_ => { | |||
if nodes.len() <= node_index { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code panic's at nodes.get(node_index)
below -- so this change just makes an error rather than a panic, which is a slightly better user experience
@tustvold if you have a chance, I would appreciate a review of this PR. I am particularly interested in your opinion of how we should handle sending RecordBatch'es and retain the dictionary encoding. One way might be to have the encode stream match / assign dictionary ids prior to sending. I realize this would be less efficient than if the creators of the dictionaries were to set the ids, but it would likely be more efficient than hydrating the dictionaries into actual data. |
I think we could just assign a unique ID to each dictionary encoded field, and then send a new DictionaryBatch for each field for each RecordBatch. We could then potentially retain the previous dictionary and use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just some minor nits, I personally would avoid upstreaming the hydration of dictionaries so that we aren't committed to supporting it long term, I think it should be relatively straightforward to properly support dictionaries in a somewhat sane manner
arrow-flight/tests/encode_decode.rs
Outdated
if i == i / 2 { | ||
None | ||
} else { | ||
// repeat some values for low cardinality | ||
let v = i / 3; | ||
Some(format!("value{v}")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if i == i / 2 { | |
None | |
} else { | |
// repeat some values for low cardinality | |
let v = i / 3; | |
Some(format!("value{v}")) | |
} | |
// repeat some values for low cardinality | |
(i != i / 2).then(|| format!("value{}", i / 3)) |
I also wonder if i == i / 2
is what you meant to write, as that will only be true for 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right -- I meant num_rows / 2
-- thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I think the then
formulation is harder to read so I would like to leave the current less concise formulaton
fn make_primative_batch(num_rows: usize) -> RecordBatch { | ||
let i: UInt8Array = (0..num_rows) | ||
.map(|i| { | ||
if i == num_rows / 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be rewritten with Option::then
as below, to make it significantly more concise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is true -- this would be more concise. However after trying it (see below) I think the current formulation is easier to read, so I plan to leave it as is.
Thank you for the suggestion
let i: UInt8Array = (0..num_rows)
- .map(|i| {
- if i == num_rows / 2 {
- None
- } else {
- Some(i.try_into().unwrap())
- }
- })
+ .map(|i| (i != num_rows / 2).then(|| i.try_into().unwrap()))
.collect();
arrow-flight/src/encode.rs
Outdated
/// 1. Hydrates any dictionaries to its underlying type. See | ||
/// hydrate_dictionary for more information. | ||
/// | ||
pub fn prepare_batch_for_flight( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is stateless which means it necessarily will never be able to properly handle dictionaries, given we just deprecated a similar method in the IPC reader, perhaps we should avoid doing this in favour of the stateful encoder above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure what you are suggesting here. I removed the pub
in 2267d13 so that we can have more flexibility with non breaking API changes in the future.
/// * <https://github.com/apache/arrow-rs/issues/1206> | ||
/// | ||
/// For now we just hydrate the dictionaries to their underlying type | ||
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should keep this as a workaround in IOx, and only upstream the proper dictionary support once it is ready? I just wonder if this is a mode we really want to support long-term, especially if we have it the default behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #3391 (comment)
I will attempt to get rid of this workaround prior to the arrow 31 release
Thank you -- I will investigate how much effort it will take to do so. I am almost out of time today so I will likely not get a chance to work on this until later in the week |
arrow-flight/src/encode.rs
Outdated
None => { | ||
// inner is done | ||
self.done = true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once reaching here, I guess the queue is guaranteed to be empty? If so, seems we can just return Poll::Ready(None);
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is correct. This case is also handled on the next loop iteration, but I think making it explicit is good too. I did so in 4f30ab7
I have pondered this for a while and my plan is to proceed with merging this PR (with the temporary dictionary hydration) and work on proper dictionary support as a follow on PR for the following reasons:
In order to minimize the API churn however, I plan to hold this PR until after we have released 30.0.0 (#3336 ) so we can minimize the chance of releasing the dictionary hydration code |
Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]>
Benchmark runs are scheduled for baseline = 9398af6 and contender = dc09b0b. dc09b0b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
The follow on PR is ready: #3402 |
Which issue does this PR close?
Closes #3371
(note a large amount of this PR is test code)
Rationale for this change
The details of encoding / sending / receiving / reconstructiing RecordBatches over flight is common and somewhat duplicated across every implementation of flight. The mid level flight client aims to automate much of the common piece of this
What changes are included in this PR?
encode.rs
logic inFlightDataEncoderBuilder
(based on the code from IOx in https://github.com/influxdata/influxdb_iox/pull/6460)decode.rs
FlightClient::do_get
Are there any user-facing changes?
yes, encoders and decoders
Next planned PRs: