-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Complete mid-level FlightClient
#3402
Conversation
3d28ca5
to
133d5f0
Compare
FlightClient::do_put
and FlightClient::do_exchange
FlightClient
: do_put
, do_exchange
, get_schema
, do_action
, list_action
, list_flights
FlightClient
: do_put
, do_exchange
, get_schema
, do_action
, list_action
, list_flights
FlightClient
133d5f0
to
45701c3
Compare
45701c3
to
352029a
Compare
/// .expect("error calling do_put"); | ||
/// # } | ||
/// ``` | ||
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real value add of this API / client, in my opinion, is this docstring / example and the tests
@@ -76,26 +79,6 @@ async fn test_handshake_error() { | |||
.await; | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_handshake_metadata() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consolidated metadata testing into all the existing tests, so there is no need for a standalone metadata test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just some minor nits, I would change do_put and do_exchange to accept fallible streams and just blindly propagate through the error
/// // encode the batch as a stream of `FlightData` | ||
/// let flight_data_stream = FlightDataEncoderBuilder::new() | ||
/// .build(futures::stream::iter(vec![Ok(batch)])) | ||
/// // data encoder return Results, but do_put requires FlightData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should accept a Stream<Item=Result<FlightData>>
, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess that the input is not fallible? I'm not sure what the semantics is on put
if it encounters non-complete data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should accept a Stream<Item=Result>, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?
I agree -- I was thinking about this too -- I'll see what I can do (though maybe I will do as as a follow on PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking in #3462
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposal in #3464
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // encode the batch as a stream of `FlightData` | ||
/// let flight_data_stream = FlightDataEncoderBuilder::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably a bit late now, but looking at this API in use, it is kind of a little strange Stream isn't mentioned at all in the name. i.e. s/Encoder/Stream
or something...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code hasn't been released -- I think we can change the name
Also Codec
seems to be a popular choice FlightDataCodec
🤔
.await | ||
.expect("error making request"); | ||
|
||
let response: Result<Vec<_>, _> = response_stream.try_collect().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap_err? Or is this some lack of Debug issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is lack of debug in Error::External
--maybe I can add a Debug impl 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried putzing with this -- the issue is not that FlightError is not Debug
it is that BoxStream
is not.
Here is the error when I used unwrap_err()
:
279 | let response = client
| ________________________^
280 | | .do_put(futures::stream::iter(input_flight_data.clone()))
281 | | .await
| |__________________^ `dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send` cannot be formatted using `{:?}` because it doesn't implement `Debug`
282 | .unwrap_err();
| ---------- required by a bound introduced by this call
|
= help: the trait `Debug` is not implemented for `dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send`
= help: the following other types implement trait `Debug`:
(dyn Any + 'static)
(dyn Any + Sync + std::marker::Send + 'static)
(dyn Any + std::marker::Send + 'static)
(dyn tracing_core::field::Value + 'static)
= note: required for `Box<dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send>` to implement `Debug`
= note: 1 redundant requirement hidden
= note: required for `Pin<Box<dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send>>` to implement `Debug`
I tried putzing with the types some but I could not get the traits to be happy
let response = client | ||
.do_exchange(futures::stream::iter(input_flight_data.clone())) | ||
.await; | ||
let response = match response { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap_err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
arrow-flight/tests/common/server.rs
Outdated
.take() | ||
.ok_or_else(|| Status::internal("No do_put response configured"))?; | ||
|
||
let stream = futures::stream::iter(response).map_err(|e| e.into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let stream = futures::stream::iter(response).map_err(|e| e.into()); | |
let stream = futures::stream::iter(response).map_err(Into::into); |
arrow-flight/tests/common/server.rs
Outdated
|
||
let stream = futures::stream::iter(response).map_err(|e| e.into()); | ||
|
||
Ok(Response::new(Box::pin(stream) as _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok(Response::new(Box::pin(stream) as _)) | |
Ok(Response::new(stream.boxed())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied these changes to the rest of this file
.expect("error making request"); | ||
|
||
let response: Result<Vec<_>, _> = response_stream.try_collect().await; | ||
let response = match response { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap_err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same issue as above
arrow-flight/tests/common/server.rs
Outdated
let stream = futures::stream::iter(response).map_err(|e| e.into()); | ||
|
||
Ok(Response::new(Box::pin(stream) as _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let stream = futures::stream::iter(response).map_err(|e| e.into()); | |
Ok(Response::new(Box::pin(stream) as _)) | |
let stream = futures::stream::iter(response).map_err(Into::into); | |
Ok(Response::new(stream.boxed())) |
/// ``` | ||
pub async fn list_flights( | ||
&mut self, | ||
expression: impl Into<Bytes>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea a higher-level FlightSQL client will handle the Any encoding nonsense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was the idea -- flight doesn't really say anything about what "expression" should be (aka it doesn't say it will be an encoded protobuf message) FlightSQL does use protobuf encoded messages but I think that should be handled in higher level
arrow-flight/src/client.rs
Outdated
// TODO other methods | ||
// list_flights | ||
// get_schema | ||
// do_put | ||
// do_action |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think do_action
is implemented now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed -- good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove
Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]>
… into alamb/more_flight_actions
Benchmark runs are scheduled for baseline = 81abc1a and contender = 42ffc3f. 42ffc3f is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
It appears I forgot some tests here -- added in #3463 |
Draft as it builds on #3391Which issue does this PR close?
re #3301
closes #3371
Rationale for this change
We are building a mid level flight client that takes care of encoding/decoding details (that we can build FlightSQL client on top of)
What changes are included in this PR?
FlightClient
APIs:do_put
,do_exchange
,get_schema
,do_action
,list_action
,list_flights
Are there any user-facing changes?
New APis