Skip to content

Commit

Permalink
feat(flight): add sql-info helpers (#4266)
Browse files Browse the repository at this point in the history
* feat: baseline sql-info helpers

* chore: clippy

* chore: add license to files

* docs: add some basic docstrings

* Update arrow-flight/src/sql/sql_info.rs

Co-authored-by: Andrew Lamb <[email protected]>

* fix: move flight info

* test: add simple filter test

* fix: docs link

* fix: one more docs link

* fix: one more one more docs link

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
roeap and alamb authored May 27, 2023
1 parent 770e241 commit 77aa8f5
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 47 deletions.
8 changes: 5 additions & 3 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
base64 = { version = "0.21", default-features = false, features = ["std"] }
tonic = { version = "0.9", default-features = false, features = ["transport", "codegen", "prost"] }
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
once_cell = { version = "1", optional = true }
paste = { version = "1.0" }
prost = { version = "0.11", default-features = false, features = ["prost-derive"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tonic = { version = "0.9", default-features = false, features = ["transport", "codegen", "prost"] }

# CLI-related dependencies
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
Expand All @@ -51,7 +53,7 @@ all-features = true

[features]
default = []
flight-sql-experimental = []
flight-sql-experimental = ["once_cell"]
tls = ["tonic/tls"]

# Enable CLI tools
Expand Down
104 changes: 70 additions & 34 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,41 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::sql::{
ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
ActionCreatePreparedStatementResult, ActionEndSavepointRequest,
ActionEndTransactionRequest, Any, CommandStatementSubstraitPlan, ProstMessageExt,
SqlInfo,
};
use arrow_flight::{
Action, FlightData, FlightEndpoint, HandshakeRequest, HandshakeResponse, IpcMessage,
Location, SchemaAsIpc, Ticket,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use futures::{stream, Stream};
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
use tonic::transport::{Certificate, Identity, ServerTlsConfig};
use tonic::{Request, Response, Status, Streaming};

use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionRequest, ActionBeginTransactionResult,
ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
ActionEndSavepointRequest, ActionEndTransactionRequest, Any, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, ProstMessageExt, SqlInfo,
TicketStatementQuery,
};
use arrow_flight::utils::batches_to_flight_data;
use arrow_flight::{
flight_service_server::FlightService,
flight_service_server::FlightServiceServer,
sql::{
server::FlightSqlService, ActionBeginTransactionRequest,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedSubstraitPlanRequest, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery, CommandStatementUpdate, TicketStatementQuery,
},
FlightDescriptor, FlightInfo,
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, Location, SchemaAsIpc, Ticket,
};
use arrow_ipc::writer::IpcWriteOptions;
use arrow_schema::{ArrowError, DataType, Field, Schema};
Expand All @@ -68,6 +64,15 @@ const FAKE_TOKEN: &str = "uuid_token";
const FAKE_HANDLE: &str = "uuid_handle";
const FAKE_UPDATE_RESULT: i64 = 1;

static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
SqlInfoList::new()
// Server information
.with_sql_info(SqlInfo::FlightSqlServerName, "Example Flight SQL Server")
.with_sql_info(SqlInfo::FlightSqlServerVersion, "1")
// 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
});

#[derive(Clone)]
pub struct FlightSqlServiceImpl {}

Expand Down Expand Up @@ -283,12 +288,38 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn get_flight_info_sql_info(
&self,
_query: CommandGetSqlInfo,
_request: Request<FlightDescriptor>,
query: CommandGetSqlInfo,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_sql_info not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};

let options = IpcWriteOptions::default();

// encode the schema into the correct form
let IpcMessage(schema) = SchemaAsIpc::new(SqlInfoList::schema(), &options)
.try_into()
.expect("valid sql_info schema");

let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
// we assume users wnating to use this helper would reasonably
// never need to be distributed across multile endpoints?
location: vec![],
}];

let flight_info = FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records: -1,
total_bytes: -1,
ordered: false,
};

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_primary_keys(
Expand Down Expand Up @@ -394,10 +425,15 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn do_get_sql_info(
&self,
_query: CommandGetSqlInfo,
query: CommandGetSqlInfo,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_sql_info not implemented"))
let batch = INSTANCE_SQL_INFO.filter(&query.info).encode();
let stream = FlightDataEncoderBuilder::new()
.with_schema(Arc::new(SqlInfoList::schema().clone()))
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_primary_keys(
Expand Down
4 changes: 4 additions & 0 deletions arrow-flight/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub use gen::SqlSupportedPositionedCommands;
pub use gen::SqlSupportedResultSetConcurrency;
pub use gen::SqlSupportedResultSetType;
pub use gen::SqlSupportedSubqueries;
pub use gen::SqlSupportedTransaction;
pub use gen::SqlSupportedTransactions;
pub use gen::SqlSupportedUnions;
pub use gen::SqlSupportsConvert;
Expand All @@ -92,8 +93,11 @@ pub use gen::SupportedSqlGrammar;
pub use gen::TicketStatementQuery;
pub use gen::UpdateDeleteRules;

pub use sql_info::SqlInfoList;

pub mod client;
pub mod server;
pub mod sql_info;

/// ProstMessageExt are useful utility methods for prost::Message types
pub trait ProstMessageExt: prost::Message + Default {
Expand Down
19 changes: 9 additions & 10 deletions arrow-flight/src/sql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,29 @@
use std::pin::Pin;

use crate::sql::{Any, Command};
use futures::Stream;
use prost::Message;
use tonic::{Request, Response, Status, Streaming};

use super::{
super::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
},
ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionRequest, ActionBeginTransactionResult,
ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
ActionEndSavepointRequest, ActionEndTransactionRequest, Any, Command,
CommandGetCatalogs, CommandGetCrossReference, CommandGetDbSchemas,
CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, DoPutUpdateResult,
ProstMessageExt, SqlInfo, TicketStatementQuery,
};
use crate::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
};

pub(crate) static CREATE_PREPARED_STATEMENT: &str = "CreatePreparedStatement";
pub(crate) static CLOSE_PREPARED_STATEMENT: &str = "ClosePreparedStatement";
Expand Down
Loading

0 comments on commit 77aa8f5

Please sign in to comment.