-
Notifications
You must be signed in to change notification settings - Fork 861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: set FlightDescriptor on FlightDataEncoderBuilder #4101
feat: set FlightDescriptor on FlightDataEncoderBuilder #4101
Conversation
arrow-flight/src/encode.rs
Outdated
) | ||
} | ||
} | ||
|
||
/// Stream that encodes a stream of record batches to flight data. | ||
/// | ||
/// See [`FlightDataEncoderBuilder`] for details and example. | ||
#[warn(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
arrow-flight/src/encode.rs
Outdated
for mut data in datas { | ||
// The first message is the schema message need to set the descriptor | ||
if is_first { | ||
data.flight_descriptor = self.descriptor.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the ticket calls for sending a separate descriptor message, not overriding the first message
Error happened, but not by my pr
|
01d384b
to
c48c00f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Weijun-H
I think this PR needs:
- A test of this feature (as I don't think it is working as expected, but I may be mistaken. A test will show us one way or the other). Perhaps in
https://github.com/apache/arrow-rs/blob/master/arrow-flight/tests/encode_decode.rs - Some documentation describing when the flight descriptor is sent
arrow-flight/src/encode.rs
Outdated
); | ||
self.queue_message(descriptor_flight_data); | ||
// remember descriptor | ||
self.descriptor = Some(descriptor.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the descriptor remembered?
arrow-flight/src/encode.rs
Outdated
@@ -176,6 +189,8 @@ pub struct FlightDataEncoder { | |||
queue: VecDeque<FlightData>, | |||
/// Is this stream done (inner is empty or errored) | |||
done: bool, | |||
/// descriptor, set after the first batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "set after the first batch" mean?
I guess I expect this to be something more like "cleared after the first FlightData message"
arrow-flight/src/encode.rs
Outdated
}; | ||
|
||
// If schema is known up front, enqueue it immediately | ||
if let Some(schema) = schema { | ||
encoder.encode_schema(&schema); | ||
} | ||
|
||
// If descriptor is known up front, enqueue it immediately | ||
if let Some(descriptor) = descriptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand the intent, the idea is that the descriptor is set on the first message that is sent this code will potentially send a separate message after the schema message, if the schema is known up front.
Perhaps you could check self.descriptor
in enqueue_messages
and attach the descriptor there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THanks @Weijun-H -- this is looking close
arrow-flight/src/encode.rs
Outdated
}; | ||
|
||
// If schema is known up front, enqueue it immediately | ||
if let Some(schema) = schema { | ||
encoder.encode_schema(&schema); | ||
} | ||
|
||
encoder.descriptor = descriptor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be set before calling encoder.encode_schema()
so it is included on the first Schema message, if provided
arrow-flight/src/encode.rs
Outdated
fn queue_message(&mut self, data: FlightData) { | ||
if let Some(descriptor) = &self.descriptor { | ||
let mut data = data; | ||
data.flight_descriptor = Some(descriptor.clone()); | ||
self.descriptor = None; | ||
self.queue.push_back(data); | ||
return; | ||
} | ||
self.queue.push_back(data); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think you can write this using take()
a bit more concisely like:
fn queue_message(&mut self, data: FlightData) { | |
if let Some(descriptor) = &self.descriptor { | |
let mut data = data; | |
data.flight_descriptor = Some(descriptor.clone()); | |
self.descriptor = None; | |
self.queue.push_back(data); | |
return; | |
} | |
self.queue.push_back(data); | |
} | |
fn queue_message(&mut self, mut data: FlightData) { | |
if let Some(descriptor) = self.descriptor.take() { | |
data.flight_descriptor = Some(descriptor.clone()); | |
} | |
self.queue.push_back(data); | |
} |
arrow-flight/tests/encode_decode.rs
Outdated
let mut encoder = encoder.build(stream); | ||
|
||
// First batch should be the schema | ||
encoder.next().await.unwrap().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of https://arrow.apache.org/docs/format/Flight.html#uploading-data was that the first FlightData
should have the descriptor
, not the second 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I completely misunderstood it.
dd3c32e
to
99af00d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look good -- thank you @Weijun-H
Which issue does this PR close?
Closes #3855
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?