Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set FlightDescriptor on FlightDataEncoderBuilder #4101

Merged

Conversation

Weijun-H
Copy link
Member

Which issue does this PR close?

Closes #3855

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Apr 19, 2023
)
}
}

/// Stream that encodes a stream of record batches to flight data.
///
/// See [`FlightDataEncoderBuilder`] for details and example.
#[warn(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

for mut data in datas {
// The first message is the schema message need to set the descriptor
if is_first {
data.flight_descriptor = self.descriptor.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ticket calls for sending a separate descriptor message, not overriding the first message

@Weijun-H
Copy link
Member Author

Error happened, but not by my pr

Run cargo fmt --all -- --check
Diff in /__w/arrow-rs/arrow-rs/arrow-string/src/like.rs at line 1244:
             "ffkoß",
             "😃sadlksFFkoSSsh😃klF", // Original was case insensitive "😃sadlksffkosSsh😃klF"
             "😱slgFFkoSSsh😃klF",    // Original was case insensitive "😱slgffkosSsh😃klF"
-            "FFkoSS",                    // "FFKoSS"
+            "FFkoSS",                // "FFKoSS"
         ],
         "FFkoSS",
         contains_utf[8](https://github.com/apache/arrow-rs/actions/runs/4758784682/jobs/8457275692?pr=4101#step:6:9)_scalar,

@Weijun-H Weijun-H force-pushed the set-FlightDescriptor-on-FlightDataEncoderBuilder branch from 01d384b to c48c00f Compare April 21, 2023 08:42
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Weijun-H

I think this PR needs:

  1. A test of this feature (as I don't think it is working as expected, but I may be mistaken. A test will show us one way or the other). Perhaps in
    https://github.com/apache/arrow-rs/blob/master/arrow-flight/tests/encode_decode.rs
  2. Some documentation describing when the flight descriptor is sent

arrow-flight/src/encode.rs Show resolved Hide resolved
);
self.queue_message(descriptor_flight_data);
// remember descriptor
self.descriptor = Some(descriptor.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the descriptor remembered?

@@ -176,6 +189,8 @@ pub struct FlightDataEncoder {
queue: VecDeque<FlightData>,
/// Is this stream done (inner is empty or errored)
done: bool,
/// descriptor, set after the first batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "set after the first batch" mean?

I guess I expect this to be something more like "cleared after the first FlightData message"

};

// If schema is known up front, enqueue it immediately
if let Some(schema) = schema {
encoder.encode_schema(&schema);
}

// If descriptor is known up front, enqueue it immediately
if let Some(descriptor) = descriptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the intent, the idea is that the descriptor is set on the first message that is sent this code will potentially send a separate message after the schema message, if the schema is known up front.

Perhaps you could check self.descriptor in enqueue_messages and attach the descriptor there

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks @Weijun-H -- this is looking close

};

// If schema is known up front, enqueue it immediately
if let Some(schema) = schema {
encoder.encode_schema(&schema);
}

encoder.descriptor = descriptor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be set before calling encoder.encode_schema() so it is included on the first Schema message, if provided

Comment on lines 228 to 231
fn queue_message(&mut self, data: FlightData) {
if let Some(descriptor) = &self.descriptor {
let mut data = data;
data.flight_descriptor = Some(descriptor.clone());
self.descriptor = None;
self.queue.push_back(data);
return;
}
self.queue.push_back(data);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think you can write this using take() a bit more concisely like:

Suggested change
fn queue_message(&mut self, data: FlightData) {
if let Some(descriptor) = &self.descriptor {
let mut data = data;
data.flight_descriptor = Some(descriptor.clone());
self.descriptor = None;
self.queue.push_back(data);
return;
}
self.queue.push_back(data);
}
fn queue_message(&mut self, mut data: FlightData) {
if let Some(descriptor) = self.descriptor.take() {
data.flight_descriptor = Some(descriptor.clone());
}
self.queue.push_back(data);
}

let mut encoder = encoder.build(stream);

// First batch should be the schema
encoder.next().await.unwrap().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of https://arrow.apache.org/docs/format/Flight.html#uploading-data was that the first FlightData should have the descriptor, not the second 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I completely misunderstood it.

@Weijun-H Weijun-H force-pushed the set-FlightDescriptor-on-FlightDataEncoderBuilder branch from dd3c32e to 99af00d Compare April 24, 2023 20:20
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good -- thank you @Weijun-H

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable setting FlightDescriptor on FlightDataEncoderBuilder
3 participants