Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flight): add sql-info helpers #4266

Merged
merged 10 commits into from
May 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
base64 = { version = "0.21", default-features = false, features = ["std"] }
tonic = { version = "0.9", default-features = false, features = ["transport", "codegen", "prost"] }
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
once_cell = { version = "1", optional = true }
paste = { version = "1.0" }
prost = { version = "0.11", default-features = false, features = ["prost-derive"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tonic = { version = "0.9", default-features = false, features = ["transport", "codegen", "prost"] }

# CLI-related dependencies
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
Expand All @@ -51,7 +53,7 @@ all-features = true

[features]
default = []
flight-sql-experimental = []
flight-sql-experimental = ["once_cell"]
tls = ["tonic/tls"]

# Enable CLI tools
Expand Down
104 changes: 70 additions & 34 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,41 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::sql::{
ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
ActionCreatePreparedStatementResult, ActionEndSavepointRequest,
ActionEndTransactionRequest, Any, CommandStatementSubstraitPlan, ProstMessageExt,
SqlInfo,
};
use arrow_flight::{
Action, FlightData, FlightEndpoint, HandshakeRequest, HandshakeResponse, IpcMessage,
Location, SchemaAsIpc, Ticket,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use futures::{stream, Stream};
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
use tonic::transport::{Certificate, Identity, ServerTlsConfig};
use tonic::{Request, Response, Status, Streaming};

use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionRequest, ActionBeginTransactionResult,
ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
ActionEndSavepointRequest, ActionEndTransactionRequest, Any, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, ProstMessageExt, SqlInfo,
TicketStatementQuery,
};
use arrow_flight::utils::batches_to_flight_data;
use arrow_flight::{
flight_service_server::FlightService,
flight_service_server::FlightServiceServer,
sql::{
server::FlightSqlService, ActionBeginTransactionRequest,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedSubstraitPlanRequest, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery, CommandStatementUpdate, TicketStatementQuery,
},
FlightDescriptor, FlightInfo,
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, Location, SchemaAsIpc, Ticket,
};
use arrow_ipc::writer::IpcWriteOptions;
use arrow_schema::{ArrowError, DataType, Field, Schema};
Expand All @@ -68,6 +64,15 @@ const FAKE_TOKEN: &str = "uuid_token";
const FAKE_HANDLE: &str = "uuid_handle";
const FAKE_UPDATE_RESULT: i64 = 1;

static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
SqlInfoList::new()
// Server information
.with_sql_info(SqlInfo::FlightSqlServerName, "Example Flight SQL Server")
.with_sql_info(SqlInfo::FlightSqlServerVersion, "1")
// 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
});

#[derive(Clone)]
pub struct FlightSqlServiceImpl {}

Expand Down Expand Up @@ -283,12 +288,38 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn get_flight_info_sql_info(
&self,
_query: CommandGetSqlInfo,
_request: Request<FlightDescriptor>,
query: CommandGetSqlInfo,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_sql_info not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};

let options = IpcWriteOptions::default();

// encode the schema into the correct form
let IpcMessage(schema) = SchemaAsIpc::new(SqlInfoList::schema(), &options)
.try_into()
.expect("valid sql_info schema");

let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
// we assume users wnating to use this helper would reasonably
// never need to be distributed across multile endpoints?
location: vec![],
}];

let flight_info = FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records: -1,
total_bytes: -1,
ordered: false,
};

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_primary_keys(
Expand Down Expand Up @@ -394,10 +425,15 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn do_get_sql_info(
&self,
_query: CommandGetSqlInfo,
query: CommandGetSqlInfo,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_sql_info not implemented"))
let batch = INSTANCE_SQL_INFO.filter(&query.info).encode();
let stream = FlightDataEncoderBuilder::new()
.with_schema(Arc::new(SqlInfoList::schema().clone()))
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_primary_keys(
Expand Down
4 changes: 4 additions & 0 deletions arrow-flight/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub use gen::SqlSupportedPositionedCommands;
pub use gen::SqlSupportedResultSetConcurrency;
pub use gen::SqlSupportedResultSetType;
pub use gen::SqlSupportedSubqueries;
pub use gen::SqlSupportedTransaction;
pub use gen::SqlSupportedTransactions;
pub use gen::SqlSupportedUnions;
pub use gen::SqlSupportsConvert;
Expand All @@ -92,8 +93,11 @@ pub use gen::SupportedSqlGrammar;
pub use gen::TicketStatementQuery;
pub use gen::UpdateDeleteRules;

pub use sql_info::SqlInfoList;

pub mod client;
pub mod server;
pub mod sql_info;

/// ProstMessageExt are useful utility methods for prost::Message types
pub trait ProstMessageExt: prost::Message + Default {
Expand Down
19 changes: 9 additions & 10 deletions arrow-flight/src/sql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,29 @@

use std::pin::Pin;

use crate::sql::{Any, Command};
use futures::Stream;
use prost::Message;
use tonic::{Request, Response, Status, Streaming};

use super::{
super::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
},
ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionRequest, ActionBeginTransactionResult,
ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
ActionEndSavepointRequest, ActionEndTransactionRequest, Any, Command,
CommandGetCatalogs, CommandGetCrossReference, CommandGetDbSchemas,
CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, DoPutUpdateResult,
ProstMessageExt, SqlInfo, TicketStatementQuery,
};
use crate::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
};

pub(crate) static CREATE_PREPARED_STATEMENT: &str = "CreatePreparedStatement";
pub(crate) static CLOSE_PREPARED_STATEMENT: &str = "ClosePreparedStatement";
Expand Down
Loading