From ad2e3b066dcdedd5dcf45179cf552f3be286614c Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 29 Jan 2020 17:32:24 +0200 Subject: [PATCH] send schema before batches --- rust/arrow/src/flight/mod.rs | 16 +++++++++++++++- rust/arrow/src/ipc/writer.rs | 11 +++++++---- rust/datafusion/examples/flight-server.rs | 14 +++++++++++++- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs index a421e6edef556..90061df726976 100644 --- a/rust/arrow/src/flight/mod.rs +++ b/rust/arrow/src/flight/mod.rs @@ -19,10 +19,11 @@ use flight::FlightData; +use crate::datatypes::Schema; use crate::ipc::writer; use crate::record_batch::RecordBatch; -/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes +/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes impl From<&RecordBatch> for FlightData { fn from(batch: &RecordBatch) -> Self { let (header, body) = writer::record_batch_to_bytes(batch); @@ -35,4 +36,17 @@ impl From<&RecordBatch> for FlightData { } } +/// Convert a `Schema` to `FlightData` by converting to an IPC message +impl From<&Schema> for FlightData { + fn from(schema: &Schema) -> Self { + let schema = writer::schema_to_bytes(schema); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: schema, + data_body: vec![], + } + } +} + // TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index e79a344bbcf78..c872c8286d704 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -209,8 +209,7 @@ impl Drop for StreamWriter { } } -/// Convert the schema to its IPC representation, and write it to the `writer` -fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { +pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec { let mut fbb = FlatBufferBuilder::new(); let schema = { let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); @@ -227,9 +226,13 @@ fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result< fbb.finish(data, None); let data = fbb.finished_data(); - let written = write_padded_data(writer, data, WriteDataType::Header); + data.to_vec() +} - written +/// Convert the schema to its IPC representation, and write it to the `writer` +fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { + let data = schema_to_bytes(schema); + write_padded_data(writer, &data[..], WriteDataType::Header) } /// The message type being written. This determines whether to write the data length or not. diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 84f88b06627ae..6f000d6e5b01a 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -77,12 +77,24 @@ impl FlightService for FlightServiceImpl { // execute the query let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; + if results.is_empty() { + return Err(Status::internal("There were no results from ticket")); + } - let flights: Vec> = results + // add an initial FlightData message that sends schema + // TODO: find a more ergonomic way of doing this + let schema = results[0].schema(); + let mut flights: Vec> = + vec![Ok(FlightData::from(schema.as_ref()))]; + + let mut batches: Vec> = results .iter() .map(|batch| Ok(FlightData::from(batch))) .collect(); + // append batch vector to schema vector, so that the first message sent is the schema + flights.append(&mut batches); + let output = futures::stream::iter(flights); Ok(Response::new(Box::pin(output) as Self::DoGetStream))