Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flight): add helpers to handle CommandGetCatalogs, CommandGetSchemas, and CommandGetTables requests #4296

Merged
merged 20 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ repository = { workspace = true }
license = { workspace = true }

[dependencies]
arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-data = { workspace = true, optional = true }
roeap marked this conversation as resolved.
Show resolved Hide resolved
arrow-ipc = { workspace = true }
arrow-row = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-schema = { workspace = true }
arrow-string = { workspace = true, optional = true }
base64 = { version = "0.21", default-features = false, features = ["std"] }
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -53,7 +57,7 @@ all-features = true

[features]
default = []
flight-sql-experimental = ["once_cell"]
flight-sql-experimental = ["arrow-arith", "arrow-data", "arrow-row", "arrow-select", "arrow-string", "once_cell"]
tls = ["tonic/tls"]

# Enable CLI tools
Expand Down
211 changes: 191 additions & 20 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use base64::Engine;
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
Expand All @@ -30,6 +31,10 @@ use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::sql::catalogs::{
get_catalogs_batch, get_catalogs_schema, get_db_schemas_schema, get_tables_schema,
GetSchemasBuilder, GetTablesBuilder,
};
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
Expand Down Expand Up @@ -73,6 +78,8 @@ static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
});

static TABLES: Lazy<Vec<&'static str>> = Lazy::new(|| vec!["flight_sql.example.table"]);

#[derive(Clone)]
pub struct FlightSqlServiceImpl {}

Expand Down Expand Up @@ -248,32 +255,106 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn get_flight_info_catalogs(
&self,
_query: CommandGetCatalogs,
_request: Request<FlightDescriptor>,
query: CommandGetCatalogs,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_catalogs not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};

let options = IpcWriteOptions::default();

// encode the schema into the correct form
let IpcMessage(schema) = SchemaAsIpc::new(get_catalogs_schema(), &options)
.try_into()
.expect("valid catalogs schema");
roeap marked this conversation as resolved.
Show resolved Hide resolved

let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
location: vec![],
}];

let flight_info = FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records: -1,
total_bytes: -1,
ordered: false,
};

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_schemas(
&self,
_query: CommandGetDbSchemas,
_request: Request<FlightDescriptor>,
query: CommandGetDbSchemas,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_schemas not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};

let options = IpcWriteOptions::default();

// encode the schema into the correct form
let IpcMessage(schema) =
SchemaAsIpc::new(get_db_schemas_schema().as_ref(), &options)
.try_into()
.expect("valid schemas schema");

let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
location: vec![],
}];

let flight_info = FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records: -1,
total_bytes: -1,
ordered: false,
};

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_tables(
&self,
_query: CommandGetTables,
_request: Request<FlightDescriptor>,
query: CommandGetTables,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_tables not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};

let options = IpcWriteOptions::default();

// encode the schema into the correct form
let IpcMessage(schema) =
SchemaAsIpc::new(get_tables_schema(query.include_schema).as_ref(), &options)
.try_into()
.expect("valid tables schema");

let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
location: vec![],
}];

let flight_info = FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records: -1,
total_bytes: -1,
ordered: false,
};

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_table_types(
Expand Down Expand Up @@ -396,23 +477,113 @@ impl FlightSqlService for FlightSqlServiceImpl {
_query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_catalogs not implemented"))
let catalog_names = TABLES
.iter()
.map(|full_name| full_name.split('.').collect::<Vec<_>>()[0].to_string())
.collect::<HashSet<_>>();
let batch = get_catalogs_batch(catalog_names.into_iter().collect());
let stream = FlightDataEncoderBuilder::new()
.with_schema(Arc::new(get_catalogs_schema().clone()))
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_schemas(
&self,
_query: CommandGetDbSchemas,
query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_schemas not implemented"))
let schemas = TABLES
.iter()
.map(|full_name| {
let parts = full_name.split('.').collect::<Vec<_>>();
(parts[0].to_string(), parts[1].to_string())
})
.collect::<HashSet<_>>();

let mut builder = GetSchemasBuilder::new(query.db_schema_filter_pattern);
if let Some(catalog) = query.catalog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the equality matching on catalog should be the same for all implementations, what do you think about putting it in the builder itself?

The same comment applies to the GetTablesBuilder too

We can do this as a follow on PR too (or never)

Something like:

        let mut builder = GetSchemasBuilder::new(query.db_schema_filter_pattern);
        if let Some(catalog) = query.catalog {
          builder = builder.with_catalog_filter(catalog)
        }
        for (catalog_name, schema_name) in schemas {
              builder
                  .append(catalog_name, schema_name)
                  .map_err(Status::from)?;
        }

Copy link
Contributor Author

@roeap roeap May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb, since we are then using all the fields on the command object, should we just convert one into the other?

Someting like given query is CommandGetDbSchemas.

let mut builder: GetSchemasBuilder = query.builder();

implementing from, with maybe the downside, that you would always actually have to cast the type..

let mut builder: GetSchemasBuilder = query.into();

for (catalog_name, schema_name) in schemas {
if catalog == catalog_name {
builder
.append(catalog_name, schema_name)
.map_err(Status::from)?;
}
}
} else {
for (catalog_name, schema_name) in schemas {
builder
.append(catalog_name, schema_name)
.map_err(Status::from)?;
}
};

let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(get_db_schemas_schema())
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Comment on lines +440 to +443
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing that this code is also always the same, should we add a method build_stream?

Ok(Response::new(Box::pin(stream)))
}

async fn do_get_tables(
&self,
_query: CommandGetTables,
query: CommandGetTables,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_tables not implemented"))
let tables = TABLES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth a comment that this does not implement full SQL style multipart identifier semantics to warn anyone who tries to use this code as a starting point for implementing FlightSQL? Maybe that will be obvious to anyone who uses this code, but we fought for quite a while to get consistent identifier semantics in DataFusion (one needs to parse it via SQL)

.iter()
.map(|full_name| {
let parts = full_name.split('.').collect::<Vec<_>>();
(
parts[0].to_string(),
parts[1].to_string(),
parts[2].to_string(),
)
})
.collect::<HashSet<_>>();

let mut builder = GetTablesBuilder::new(
query.db_schema_filter_pattern,
query.table_name_filter_pattern,
query.include_schema,
);
let dummy_schema = Schema::empty();
if let Some(catalog) = query.catalog {
for (catalog_name, schema_name, table_name) in tables {
if catalog == catalog_name {
builder
.append(
catalog_name,
schema_name,
table_name,
"TABLE",
&dummy_schema,
)
.map_err(Status::from)?;
}
}
} else {
for (catalog_name, schema_name, table_name) in tables {
builder
.append(
catalog_name,
schema_name,
table_name,
"TABLE",
&dummy_schema,
)
.map_err(Status::from)?;
}
};

let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(get_db_schemas_schema())
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_table_types(
Expand Down
Loading