Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete mid-level FlightClient #3402

Merged
merged 9 commits into from
Jan 5, 2023
Merged

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Dec 27, 2022

Draft as it builds on #3391

Which issue does this PR close?

re #3301
closes #3371

Rationale for this change

We are building a mid level flight client that takes care of encoding/decoding details (that we can build FlightSQL client on top of)

What changes are included in this PR?

  1. Implement remaining FlightClient APIs: do_put, do_exchange, get_schema, do_action, list_action, list_flights
  2. Documentation
  3. Tests

Are there any user-facing changes?

New APis

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Dec 27, 2022
@xudong963 xudong963 self-requested a review December 29, 2022 14:09
@alamb alamb force-pushed the alamb/more_flight_actions branch from 3d28ca5 to 133d5f0 Compare December 29, 2022 16:13
@alamb alamb changed the title Implement FlightClient::do_put and FlightClient::do_exchange Complete FlightClient: do_put, do_exchange, get_schema, do_action, list_action, list_flights Dec 31, 2022
@alamb alamb changed the title Complete FlightClient: do_put, do_exchange, get_schema, do_action, list_action, list_flights Complete mid-level FlightClient Dec 31, 2022
@alamb alamb force-pushed the alamb/more_flight_actions branch from 133d5f0 to 45701c3 Compare December 31, 2022 12:26
@alamb alamb force-pushed the alamb/more_flight_actions branch from 45701c3 to 352029a Compare December 31, 2022 12:58
@github-actions github-actions bot removed the arrow Changes to the arrow crate label Dec 31, 2022
/// .expect("error calling do_put");
/// # }
/// ```
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real value add of this API / client, in my opinion, is this docstring / example and the tests

@@ -76,26 +79,6 @@ async fn test_handshake_error() {
.await;
}

#[tokio::test]
async fn test_handshake_metadata() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consolidated metadata testing into all the existing tests, so there is no need for a standalone metadata test

@alamb alamb marked this pull request as ready for review December 31, 2022 13:00
@alamb alamb requested a review from tustvold December 31, 2022 13:19
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just some minor nits, I would change do_put and do_exchange to accept fallible streams and just blindly propagate through the error

/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
/// .build(futures::stream::iter(vec![Ok(batch)]))
/// // data encoder return Results, but do_put requires FlightData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should accept a Stream<Item=Result<FlightData>>, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess that the input is not fallible? I'm not sure what the semantics is on put if it encounters non-complete data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should accept a Stream<Item=Result>, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?

I agree -- I was thinking about this too -- I'll see what I can do (though maybe I will do as as a follow on PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking in #3462

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal in #3464

/// let mut client = FlightClient::new(channel);
///
/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a bit late now, but looking at this API in use, it is kind of a little strange Stream isn't mentioned at all in the name. i.e. s/Encoder/Stream or something...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code hasn't been released -- I think we can change the name

Also Codec seems to be a popular choice FlightDataCodec 🤔

arrow-flight/src/client.rs Outdated Show resolved Hide resolved
.await
.expect("error making request");

let response: Result<Vec<_>, _> = response_stream.try_collect().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap_err? Or is this some lack of Debug issue?

Copy link
Contributor Author

@alamb alamb Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is lack of debug in Error::External --maybe I can add a Debug impl 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried putzing with this -- the issue is not that FlightError is not Debug it is that BoxStream is not.

Here is the error when I used unwrap_err():

279  |           let response = client
     |  ________________________^
280  | |             .do_put(futures::stream::iter(input_flight_data.clone()))
281  | |             .await
     | |__________________^ `dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send` cannot be formatted using `{:?}` because it doesn't implement `Debug`
282  |               .unwrap_err();
     |                ---------- required by a bound introduced by this call
     |
     = help: the trait `Debug` is not implemented for `dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send`
     = help: the following other types implement trait `Debug`:
               (dyn Any + 'static)
               (dyn Any + Sync + std::marker::Send + 'static)
               (dyn Any + std::marker::Send + 'static)
               (dyn tracing_core::field::Value + 'static)
     = note: required for `Box<dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send>` to implement `Debug`
     = note: 1 redundant requirement hidden
     = note: required for `Pin<Box<dyn Stream<Item = std::result::Result<PutResult, FlightError>> + std::marker::Send>>` to implement `Debug`

I tried putzing with the types some but I could not get the traits to be happy

let response = client
.do_exchange(futures::stream::iter(input_flight_data.clone()))
.await;
let response = match response {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap_err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

.take()
.ok_or_else(|| Status::internal("No do_put response configured"))?;

let stream = futures::stream::iter(response).map_err(|e| e.into());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let stream = futures::stream::iter(response).map_err(|e| e.into());
let stream = futures::stream::iter(response).map_err(Into::into);


let stream = futures::stream::iter(response).map_err(|e| e.into());

Ok(Response::new(Box::pin(stream) as _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(Response::new(Box::pin(stream) as _))
Ok(Response::new(stream.boxed()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied these changes to the rest of this file

.expect("error making request");

let response: Result<Vec<_>, _> = response_stream.try_collect().await;
let response = match response {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap_err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue as above

Comment on lines 315 to 317
let stream = futures::stream::iter(response).map_err(|e| e.into());

Ok(Response::new(Box::pin(stream) as _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let stream = futures::stream::iter(response).map_err(|e| e.into());
Ok(Response::new(Box::pin(stream) as _))
let stream = futures::stream::iter(response).map_err(Into::into);
Ok(Response::new(stream.boxed()))

/// ```
pub async fn list_flights(
&mut self,
expression: impl Into<Bytes>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea a higher-level FlightSQL client will handle the Any encoding nonsense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was the idea -- flight doesn't really say anything about what "expression" should be (aka it doesn't say it will be an encoded protobuf message) FlightSQL does use protobuf encoded messages but I think that should be handled in higher level

arrow-flight/src/client.rs Outdated Show resolved Hide resolved
arrow-flight/src/client.rs Outdated Show resolved Hide resolved
arrow-flight/src/client.rs Outdated Show resolved Hide resolved
arrow-flight/src/client.rs Outdated Show resolved Hide resolved
arrow-flight/src/client.rs Outdated Show resolved Hide resolved
// TODO other methods
// list_flights
// get_schema
// do_put
// do_action
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think do_action is implemented now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed -- good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove

@ursabot
Copy link

ursabot commented Jan 5, 2023

Benchmark runs are scheduled for baseline = 81abc1a and contender = 42ffc3f. 42ffc3f is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb
Copy link
Contributor Author

alamb commented Jan 5, 2023

It appears I forgot some tests here -- added in #3463

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Mid-level ArrowFlight Client
4 participants