Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete mid-level FlightClient #3402

Merged
merged 9 commits into from
Jan 5, 2023
303 changes: 281 additions & 22 deletions arrow-flight/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
// under the License.

use crate::{
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
FlightDescriptor, FlightInfo, HandshakeRequest, Ticket,
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, PutResult, Ticket,
};
use arrow_schema::Schema;
use bytes::Bytes;
use futures::{future::ready, stream, StreamExt, TryStreamExt};
use futures::{
future::ready,
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use tonic::{metadata::MetadataMap, transport::Channel};

use crate::error::{FlightError, Result};
Expand Down Expand Up @@ -160,19 +166,20 @@ impl FlightClient {
/// returning a [`FlightRecordBatchStream`] for reading
/// [`RecordBatch`](arrow_array::RecordBatch)es.
///
/// # Note
///
/// To access the returned [`FlightData`] use
/// [`FlightRecordBatchStream::into_inner()`]
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use arrow_flight::FlightClient;
/// # use arrow_flight::Ticket;
/// # use arrow_array::RecordBatch;
/// # use tonic::transport::Channel;
/// # use futures::stream::TryStreamExt;
/// # let channel = Channel::from_static("http://localhost:1234")
/// # .connect()
/// # .await
/// # .expect("error connecting");
/// # let channel: tonic::transport::Channel = unimplemented!();
/// # let ticket = Ticket { ticket: Bytes::from("foo") };
/// let mut client = FlightClient::new(channel);
///
Expand All @@ -199,8 +206,7 @@ impl FlightClient {
.do_get(request)
.await?
.into_inner()
// convert to FlightError
.map_err(|e| e.into());
.map_err(FlightError::Tonic);

Ok(FlightRecordBatchStream::new_from_flight_data(
response_stream,
Expand All @@ -217,11 +223,7 @@ impl FlightClient {
/// # async fn run() {
/// # use arrow_flight::FlightClient;
/// # use arrow_flight::FlightDescriptor;
/// # use tonic::transport::Channel;
/// # let channel = Channel::from_static("http://localhost:1234")
/// # .connect()
/// # .await
/// # .expect("error connecting");
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send a 'CMD' request to the server
Expand Down Expand Up @@ -256,13 +258,270 @@ impl FlightClient {
Ok(response)
}

// TODO other methods
// list_flights
// get_schema
// do_put
// do_action
// list_actions
// do_exchange
/// Make a `DoPut` call to the server with the provided
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a
/// stream of [`PutResult`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::{TryStreamExt, StreamExt};
/// # use std::sync::Arc;
/// # use arrow_array::UInt64Array;
/// # use arrow_array::RecordBatch;
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
/// # use arrow_flight::encode::FlightDataEncoderBuilder;
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
/// # ]).unwrap();
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a bit late now, but looking at this API in use, it is kind of a little strange Stream isn't mentioned at all in the name. i.e. s/Encoder/Stream or something...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code hasn't been released -- I think we can change the name

Also Codec seems to be a popular choice FlightDataCodec 🤔

/// .build(futures::stream::iter(vec![Ok(batch)]))
/// // data encoder return Results, but do_put requires FlightData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should accept a Stream<Item=Result<FlightData>>, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess that the input is not fallible? I'm not sure what the semantics is on put if it encounters non-complete data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should accept a Stream<Item=Result>, otherwise I'm not sure how one could use this to stream a fallible computation without panicking?

I agree -- I was thinking about this too -- I'll see what I can do (though maybe I will do as as a follow on PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking in #3462

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal in #3464

/// .map(|batch|batch.unwrap());
///
/// // send the stream and get the results as `PutResult`
/// let response: Vec<PutResult>= client
/// .do_put(flight_data_stream)
/// .await
/// .unwrap()
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error calling do_put");
/// # }
/// ```
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real value add of this API / client, in my opinion, is this docstring / example and the tests

&mut self,
request: S,
) -> Result<BoxStream<'static, Result<PutResult>>> {
let request = self.make_request(request);

let response = self
.inner
.do_put(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(response.boxed())
}

/// Make a `DoExchange` call to the server with the provided
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a
/// stream of [`FlightData`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::{TryStreamExt, StreamExt};
/// # use std::sync::Arc;
/// # use arrow_array::UInt64Array;
/// # use arrow_array::RecordBatch;
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
/// # use arrow_flight::encode::FlightDataEncoderBuilder;
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
/// # ]).unwrap();
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
/// .build(futures::stream::iter(vec![Ok(batch)]))
/// // data encoder return Results, but do_exchange requires FlightData
/// .map(|batch|batch.unwrap());
///
/// // send the stream and get the results as `RecordBatches`
/// let response: Vec<RecordBatch> = client
/// .do_exchange(flight_data_stream)
/// .await
/// .unwrap()
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error calling do_exchange");
/// # }
/// ```
pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S,
) -> Result<FlightRecordBatchStream> {
let request = self.make_request(request);

let response = self
.inner
.do_exchange(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(FlightRecordBatchStream::new_from_flight_data(response))
}

/// Make a `ListFlights` call to the server with the provided
/// critera and returning a [`Stream`](futures::Stream) of [`FlightInfo`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::TryStreamExt;
/// # use bytes::Bytes;
/// # use arrow_flight::{FlightInfo, FlightClient};
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send 'Name=Foo' bytes as the "expression" to the server
/// // and gather the returned FlightInfo
/// let responses: Vec<FlightInfo> = client
/// .list_flights(Bytes::from("Name=Foo"))
/// .await
/// .expect("error listing flights")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering flights");
/// # }
/// ```
pub async fn list_flights(
&mut self,
expression: impl Into<Bytes>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea a higher-level FlightSQL client will handle the Any encoding nonsense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was the idea -- flight doesn't really say anything about what "expression" should be (aka it doesn't say it will be an encoded protobuf message) FlightSQL does use protobuf encoded messages but I think that should be handled in higher level

) -> Result<BoxStream<'static, Result<FlightInfo>>> {
let request = Criteria {
expression: expression.into(),
};

let request = self.make_request(request);

let response = self
.inner
.list_flights(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(response.boxed())
}

/// Make a `GetSchema` call to the server with the provided
/// [`FlightDescriptor`] and returning the associated [`Schema`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use arrow_flight::{FlightDescriptor, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Request the schema result of a 'CMD' request to the server
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
///
/// let schema: Schema = client
/// .get_schema(request)
/// .await
/// .expect("error making request");
/// # }
/// ```
pub async fn get_schema(
&mut self,
flight_descriptor: FlightDescriptor,
) -> Result<Schema> {
let request = self.make_request(flight_descriptor);

let schema_result = self.inner.get_schema(request).await?.into_inner();

// attempt decode from IPC
let schema: Schema = schema_result.try_into()?;

Ok(schema)
}

/// Make a `ListActions` call to the server and returning a
/// [`Stream`](futures::Stream) of [`ActionType`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::TryStreamExt;
/// # use arrow_flight::{ActionType, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // List available actions on the server:
/// let actions: Vec<ActionType> = client
/// .list_actions()
/// .await
/// .expect("error listing actions")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering actions");
/// # }
/// ```
pub async fn list_actions(
&mut self,
) -> Result<BoxStream<'static, Result<ActionType>>> {
let request = self.make_request(Empty {});

let action_stream = self
.inner
.list_actions(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(action_stream.boxed())
}

/// Make a `DoAction` call to the server and returning a
/// [`Stream`](futures::Stream) of opaque [`Bytes`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use futures::TryStreamExt;
/// # use arrow_flight::{Action, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// let request = Action::new("my_action", "the body");
///
/// // Make a request to run the action on the server
/// let results: Vec<Bytes> = client
/// .do_action(request)
/// .await
/// .expect("error executing acton")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering action results");
/// # }
/// ```
pub async fn do_action(
&mut self,
action: Action,
) -> Result<BoxStream<'static, Result<Bytes>>> {
let request = self.make_request(action);

let result_stream = self
.inner
.do_action(request)
.await?
.into_inner()
.map_err(FlightError::Tonic)
.map(|r| {
r.map(|r| {
// unwrap inner bytes
let crate::Result { body } = r;
body
})
});

Ok(result_stream.boxed())
}

/// return a Request, adding any configured metadata
fn make_request<T>(&self, t: T) -> tonic::Request<T> {
Expand Down
Loading