Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
send schema before batches
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me committed Jan 29, 2020
1 parent 260f9ca commit ad2e3b0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
16 changes: 15 additions & 1 deletion rust/arrow/src/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
use flight::FlightData;

use crate::datatypes::Schema;
use crate::ipc::writer;
use crate::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes
/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
Expand All @@ -35,4 +36,17 @@ impl From<&RecordBatch> for FlightData {
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let schema = writer::schema_to_bytes(schema);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema,
data_body: vec![],
}
}
}

// TODO: add more explicit conversion that expoess flight descriptor and metadata options
11 changes: 7 additions & 4 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ impl<W: Write> Drop for StreamWriter<W> {
}
}

/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let schema = {
let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
Expand All @@ -227,9 +226,13 @@ fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<
fbb.finish(data, None);

let data = fbb.finished_data();
let written = write_padded_data(writer, data, WriteDataType::Header);
data.to_vec()
}

written
/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
let data = schema_to_bytes(schema);
write_padded_data(writer, &data[..], WriteDataType::Header)
}

/// The message type being written. This determines whether to write the data length or not.
Expand Down
14 changes: 13 additions & 1 deletion rust/datafusion/examples/flight-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,24 @@ impl FlightService for FlightServiceImpl {

// execute the query
let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;
if results.is_empty() {
return Err(Status::internal("There were no results from ticket"));
}

let flights: Vec<Result<FlightData, Status>> = results
// add an initial FlightData message that sends schema
// TODO: find a more ergonomic way of doing this
let schema = results[0].schema();
let mut flights: Vec<Result<FlightData, Status>> =
vec![Ok(FlightData::from(schema.as_ref()))];

let mut batches: Vec<Result<FlightData, Status>> = results
.iter()
.map(|batch| Ok(FlightData::from(batch)))
.collect();

// append batch vector to schema vector, so that the first message sent is the schema
flights.append(&mut batches);

let output = futures::stream::iter(flights);

Ok(Response::new(Box::pin(output) as Self::DoGetStream))
Expand Down

0 comments on commit ad2e3b0

Please sign in to comment.