-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Complete mid-level FlightClient
#3402
Changes from all commits
31e39c3
352029a
63e38ae
e5331b2
5d38e87
1ff7513
e43b0c5
01bdadb
17777dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,17 @@ | |
// under the License. | ||
|
||
use crate::{ | ||
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, | ||
FlightDescriptor, FlightInfo, HandshakeRequest, Ticket, | ||
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, Action, | ||
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, | ||
HandshakeRequest, PutResult, Ticket, | ||
}; | ||
use arrow_schema::Schema; | ||
use bytes::Bytes; | ||
use futures::{future::ready, stream, StreamExt, TryStreamExt}; | ||
use futures::{ | ||
future::ready, | ||
stream::{self, BoxStream}, | ||
Stream, StreamExt, TryStreamExt, | ||
}; | ||
use tonic::{metadata::MetadataMap, transport::Channel}; | ||
|
||
use crate::error::{FlightError, Result}; | ||
|
@@ -160,19 +166,20 @@ impl FlightClient { | |
/// returning a [`FlightRecordBatchStream`] for reading | ||
/// [`RecordBatch`](arrow_array::RecordBatch)es. | ||
/// | ||
/// # Note | ||
/// | ||
/// To access the returned [`FlightData`] use | ||
/// [`FlightRecordBatchStream::into_inner()`] | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use bytes::Bytes; | ||
/// # use arrow_flight::FlightClient; | ||
/// # use arrow_flight::Ticket; | ||
/// # use arrow_array::RecordBatch; | ||
/// # use tonic::transport::Channel; | ||
/// # use futures::stream::TryStreamExt; | ||
/// # let channel = Channel::from_static("http://localhost:1234") | ||
/// # .connect() | ||
/// # .await | ||
/// # .expect("error connecting"); | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// # let ticket = Ticket { ticket: Bytes::from("foo") }; | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
|
@@ -199,8 +206,7 @@ impl FlightClient { | |
.do_get(request) | ||
.await? | ||
.into_inner() | ||
// convert to FlightError | ||
.map_err(|e| e.into()); | ||
.map_err(FlightError::Tonic); | ||
|
||
Ok(FlightRecordBatchStream::new_from_flight_data( | ||
response_stream, | ||
|
@@ -217,11 +223,7 @@ impl FlightClient { | |
/// # async fn run() { | ||
/// # use arrow_flight::FlightClient; | ||
/// # use arrow_flight::FlightDescriptor; | ||
/// # use tonic::transport::Channel; | ||
/// # let channel = Channel::from_static("http://localhost:1234") | ||
/// # .connect() | ||
/// # .await | ||
/// # .expect("error connecting"); | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // Send a 'CMD' request to the server | ||
|
@@ -256,13 +258,270 @@ impl FlightClient { | |
Ok(response) | ||
} | ||
|
||
// TODO other methods | ||
// list_flights | ||
// get_schema | ||
// do_put | ||
// do_action | ||
// list_actions | ||
// do_exchange | ||
/// Make a `DoPut` call to the server with the provided | ||
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a | ||
/// stream of [`PutResult`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use futures::{TryStreamExt, StreamExt}; | ||
/// # use std::sync::Arc; | ||
/// # use arrow_array::UInt64Array; | ||
/// # use arrow_array::RecordBatch; | ||
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult}; | ||
/// # use arrow_flight::encode::FlightDataEncoderBuilder; | ||
/// # let batch = RecordBatch::try_from_iter(vec![ | ||
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _) | ||
/// # ]).unwrap(); | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // encode the batch as a stream of `FlightData` | ||
/// let flight_data_stream = FlightDataEncoderBuilder::new() | ||
/// .build(futures::stream::iter(vec![Ok(batch)])) | ||
/// // data encoder return Results, but do_put requires FlightData | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps this should accept a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I guess that the input is not fallible? I'm not sure what the semantics is on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree -- I was thinking about this too -- I'll see what I can do (though maybe I will do as as a follow on PR) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracking in #3462 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Proposal in #3464 |
||
/// .map(|batch|batch.unwrap()); | ||
/// | ||
/// // send the stream and get the results as `PutResult` | ||
/// let response: Vec<PutResult>= client | ||
/// .do_put(flight_data_stream) | ||
/// .await | ||
/// .unwrap() | ||
/// .try_collect() // use TryStreamExt to collect stream | ||
/// .await | ||
/// .expect("error calling do_put"); | ||
/// # } | ||
/// ``` | ||
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The real value add of this API / client, in my opinion, is this docstring / example and the tests |
||
&mut self, | ||
request: S, | ||
) -> Result<BoxStream<'static, Result<PutResult>>> { | ||
let request = self.make_request(request); | ||
|
||
let response = self | ||
.inner | ||
.do_put(request) | ||
.await? | ||
.into_inner() | ||
.map_err(FlightError::Tonic); | ||
|
||
Ok(response.boxed()) | ||
} | ||
|
||
/// Make a `DoExchange` call to the server with the provided | ||
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a | ||
/// stream of [`FlightData`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use futures::{TryStreamExt, StreamExt}; | ||
/// # use std::sync::Arc; | ||
/// # use arrow_array::UInt64Array; | ||
/// # use arrow_array::RecordBatch; | ||
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult}; | ||
/// # use arrow_flight::encode::FlightDataEncoderBuilder; | ||
/// # let batch = RecordBatch::try_from_iter(vec![ | ||
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _) | ||
/// # ]).unwrap(); | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // encode the batch as a stream of `FlightData` | ||
/// let flight_data_stream = FlightDataEncoderBuilder::new() | ||
/// .build(futures::stream::iter(vec![Ok(batch)])) | ||
/// // data encoder return Results, but do_exchange requires FlightData | ||
/// .map(|batch|batch.unwrap()); | ||
/// | ||
/// // send the stream and get the results as `RecordBatches` | ||
/// let response: Vec<RecordBatch> = client | ||
/// .do_exchange(flight_data_stream) | ||
/// .await | ||
/// .unwrap() | ||
/// .try_collect() // use TryStreamExt to collect stream | ||
/// .await | ||
/// .expect("error calling do_exchange"); | ||
/// # } | ||
/// ``` | ||
pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>( | ||
&mut self, | ||
request: S, | ||
) -> Result<FlightRecordBatchStream> { | ||
let request = self.make_request(request); | ||
|
||
let response = self | ||
.inner | ||
.do_exchange(request) | ||
.await? | ||
.into_inner() | ||
.map_err(FlightError::Tonic); | ||
|
||
Ok(FlightRecordBatchStream::new_from_flight_data(response)) | ||
} | ||
|
||
/// Make a `ListFlights` call to the server with the provided | ||
/// critera and returning a [`Stream`](futures::Stream) of [`FlightInfo`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use futures::TryStreamExt; | ||
/// # use bytes::Bytes; | ||
/// # use arrow_flight::{FlightInfo, FlightClient}; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // Send 'Name=Foo' bytes as the "expression" to the server | ||
/// // and gather the returned FlightInfo | ||
/// let responses: Vec<FlightInfo> = client | ||
/// .list_flights(Bytes::from("Name=Foo")) | ||
/// .await | ||
/// .expect("error listing flights") | ||
/// .try_collect() // use TryStreamExt to collect stream | ||
/// .await | ||
/// .expect("error gathering flights"); | ||
/// # } | ||
/// ``` | ||
pub async fn list_flights( | ||
&mut self, | ||
expression: impl Into<Bytes>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the idea a higher-level FlightSQL client will handle the Any encoding nonsense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was the idea -- flight doesn't really say anything about what "expression" should be (aka it doesn't say it will be an encoded protobuf message) FlightSQL does use protobuf encoded messages but I think that should be handled in higher level |
||
) -> Result<BoxStream<'static, Result<FlightInfo>>> { | ||
let request = Criteria { | ||
expression: expression.into(), | ||
}; | ||
|
||
let request = self.make_request(request); | ||
|
||
let response = self | ||
.inner | ||
.list_flights(request) | ||
.await? | ||
.into_inner() | ||
.map_err(FlightError::Tonic); | ||
|
||
Ok(response.boxed()) | ||
} | ||
|
||
/// Make a `GetSchema` call to the server with the provided | ||
/// [`FlightDescriptor`] and returning the associated [`Schema`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use bytes::Bytes; | ||
/// # use arrow_flight::{FlightDescriptor, FlightClient}; | ||
/// # use arrow_schema::Schema; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // Request the schema result of a 'CMD' request to the server | ||
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); | ||
/// | ||
/// let schema: Schema = client | ||
/// .get_schema(request) | ||
/// .await | ||
/// .expect("error making request"); | ||
/// # } | ||
/// ``` | ||
pub async fn get_schema( | ||
&mut self, | ||
flight_descriptor: FlightDescriptor, | ||
) -> Result<Schema> { | ||
let request = self.make_request(flight_descriptor); | ||
|
||
let schema_result = self.inner.get_schema(request).await?.into_inner(); | ||
|
||
// attempt decode from IPC | ||
let schema: Schema = schema_result.try_into()?; | ||
|
||
Ok(schema) | ||
} | ||
|
||
/// Make a `ListActions` call to the server and returning a | ||
/// [`Stream`](futures::Stream) of [`ActionType`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use futures::TryStreamExt; | ||
/// # use arrow_flight::{ActionType, FlightClient}; | ||
/// # use arrow_schema::Schema; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// // List available actions on the server: | ||
/// let actions: Vec<ActionType> = client | ||
/// .list_actions() | ||
/// .await | ||
/// .expect("error listing actions") | ||
/// .try_collect() // use TryStreamExt to collect stream | ||
/// .await | ||
/// .expect("error gathering actions"); | ||
/// # } | ||
/// ``` | ||
pub async fn list_actions( | ||
&mut self, | ||
) -> Result<BoxStream<'static, Result<ActionType>>> { | ||
let request = self.make_request(Empty {}); | ||
|
||
let action_stream = self | ||
.inner | ||
.list_actions(request) | ||
.await? | ||
.into_inner() | ||
.map_err(FlightError::Tonic); | ||
|
||
Ok(action_stream.boxed()) | ||
} | ||
|
||
/// Make a `DoAction` call to the server and returning a | ||
/// [`Stream`](futures::Stream) of opaque [`Bytes`]. | ||
/// | ||
/// # Example: | ||
/// ```no_run | ||
/// # async fn run() { | ||
/// # use bytes::Bytes; | ||
/// # use futures::TryStreamExt; | ||
/// # use arrow_flight::{Action, FlightClient}; | ||
/// # use arrow_schema::Schema; | ||
/// # let channel: tonic::transport::Channel = unimplemented!(); | ||
/// let mut client = FlightClient::new(channel); | ||
/// | ||
/// let request = Action::new("my_action", "the body"); | ||
/// | ||
/// // Make a request to run the action on the server | ||
/// let results: Vec<Bytes> = client | ||
/// .do_action(request) | ||
/// .await | ||
/// .expect("error executing acton") | ||
/// .try_collect() // use TryStreamExt to collect stream | ||
/// .await | ||
/// .expect("error gathering action results"); | ||
/// # } | ||
/// ``` | ||
pub async fn do_action( | ||
&mut self, | ||
action: Action, | ||
) -> Result<BoxStream<'static, Result<Bytes>>> { | ||
let request = self.make_request(action); | ||
|
||
let result_stream = self | ||
.inner | ||
.do_action(request) | ||
.await? | ||
.into_inner() | ||
.map_err(FlightError::Tonic) | ||
.map(|r| { | ||
r.map(|r| { | ||
// unwrap inner bytes | ||
let crate::Result { body } = r; | ||
body | ||
}) | ||
}); | ||
|
||
Ok(result_stream.boxed()) | ||
} | ||
|
||
/// return a Request, adding any configured metadata | ||
fn make_request<T>(&self, t: T) -> tonic::Request<T> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably a bit late now, but looking at this API in use, it is kind of a little strange Stream isn't mentioned at all in the name. i.e.
s/Encoder/Stream
or something...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code hasn't been released -- I think we can change the name
Also
Codec
seems to be a popular choiceFlightDataCodec
🤔