From acc0b9dd43eb689cbd20c9470515d719db10d0b0 Mon Sep 17 00:00:00 2001 From: jacek-prisma Date: Thu, 23 Jan 2025 09:58:44 +0000 Subject: [PATCH] feat: implement sqlite schema engine connection with external connector (#5133) * feat: implement sqlite schema engine connection with external connector * fix: fix broken introspect test --- .../src/flavour/sqlite.rs | 155 ++++------ .../src/flavour/sqlite/native/mod.rs | 284 ++++++++++-------- .../src/flavour/sqlite/wasm/mod.rs | 104 ++++--- .../sql-schema-describer/src/sqlite.rs | 15 +- 4 files changed, 295 insertions(+), 263 deletions(-) diff --git a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite.rs b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite.rs index fab1e982d098..234030848739 100644 --- a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite.rs +++ b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite.rs @@ -4,26 +4,22 @@ mod native; #[cfg(not(feature = "sqlite-native"))] mod wasm; +use std::future::Future; + #[cfg(feature = "sqlite-native")] -use native::{ - create_database, drop_database, ensure_connection_validity, generic_apply_migration_script, introspect, reset, - version, Connection, -}; +use native as imp; #[cfg(not(feature = "sqlite-native"))] -use wasm::{ - create_database, drop_database, ensure_connection_validity, generic_apply_migration_script, introspect, reset, - version, Connection, -}; +use wasm as imp; use crate::flavour::SqlFlavour; use indoc::indoc; use schema_connector::{ migrations_directory::MigrationDirectory, BoxFuture, ConnectorError, ConnectorParams, ConnectorResult, Namespaces, }; -use sql_schema_describer::SqlSchema; +use sql_schema_describer::{sqlite::SqlSchemaDescriber, DescriberErrorKind, SqlSchema}; -type State = super::State; +type State = imp::State; struct Params { connector_params: ConnectorParams, @@ -35,18 +31,32 @@ pub(crate) struct SqliteFlavour { } impl SqliteFlavour { - #[cfg(not(feature = "sqlite-native"))] - pub(crate) fn new_external(_adapter: std::sync::Arc) -> Self { - SqliteFlavour { state: State::Initial } + fn with_connection<'a, F, O, C>(&'a mut self, f: C) -> BoxFuture<'a, ConnectorResult> + where + O: 'a + Send, + C: (FnOnce(&'a mut imp::Connection) -> F) + Send + Sync + 'a, + F: Future> + Send + 'a, + { + Box::pin(async move { f(imp::get_connection(&mut self.state)?).await }) } } +#[cfg(feature = "sqlite-native")] impl Default for SqliteFlavour { fn default() -> Self { SqliteFlavour { state: State::Initial } } } +impl SqliteFlavour { + #[cfg(not(feature = "sqlite-native"))] + pub(crate) fn new_external(adapter: std::sync::Arc) -> Self { + SqliteFlavour { + state: State::new(adapter, Default::default()), + } + } +} + impl std::fmt::Debug for SqliteFlavour { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("") @@ -67,15 +77,11 @@ impl SqlFlavour for SqliteFlavour { migration_name: &'a str, script: &'a str, ) -> BoxFuture<'a, ConnectorResult<()>> { - ready(with_connection(&mut self.state, move |_params, connection| { - generic_apply_migration_script(migration_name, script, connection) - })) + self.with_connection(|conn| conn.apply_migration_script(migration_name, script)) } fn connection_string(&self) -> Option<&str> { - self.state - .params() - .map(|p| p.connector_params.connection_string.as_str()) + imp::connection_string(&self.state) } fn table_names(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult>> { @@ -93,10 +99,7 @@ impl SqlFlavour for SqliteFlavour { } fn create_database(&mut self) -> BoxFuture<'_, ConnectorResult> { - Box::pin(async { - let params = self.state.get_unwrapped_params(); - create_database(params) - }) + Box::pin(async { imp::create_database(&self.state) }) } fn create_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { @@ -121,16 +124,11 @@ impl SqlFlavour for SqliteFlavour { } fn describe_schema(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult> { - Box::pin(async move { - let schema = with_connection(&mut self.state, |_, conn| Ok(Box::pin(conn.describe_schema())))?.await?; - Ok(schema) - }) + self.with_connection(|conn| describe_schema(conn)) } fn drop_database(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { - let params = self.state.get_unwrapped_params(); - let ret = drop_database(params); - ready(ret) + ready(imp::drop_database(&self.state)) } fn drop_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { @@ -138,9 +136,7 @@ impl SqlFlavour for SqliteFlavour { } fn ensure_connection_validity(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { - let params = self.state.get_unwrapped_params(); - let result = ensure_connection_validity(params); - ready(result) + Box::pin(imp::ensure_connection_validity(&self.state)) } fn load_migrations_table( @@ -164,8 +160,8 @@ impl SqlFlavour for SqliteFlavour { FROM `_prisma_migrations` ORDER BY `started_at` ASC "#}; - ready(with_connection(&mut self.state, |_, conn| { - let rows = match conn.query_raw(SQL, &[]) { + self.with_connection(|conn| async { + let rows = match conn.query_raw(SQL, &[]).await { Ok(result) => result, Err(err) => { #[cfg(feature = "sqlite-native")] @@ -222,14 +218,14 @@ impl SqlFlavour for SqliteFlavour { tracing::debug!("Found {} migrations in the migrations table.", rows.len()); Ok(Ok(rows)) - })) + }) } fn query<'a>( &'a mut self, query: quaint::ast::Query<'a>, ) -> BoxFuture<'a, ConnectorResult> { - ready(with_connection(&mut self.state, |_, conn| conn.query(query))) + self.with_connection(|conn| conn.query(query)) } fn query_raw<'a>( @@ -238,7 +234,7 @@ impl SqlFlavour for SqliteFlavour { params: &'a [quaint::Value<'a>], ) -> BoxFuture<'a, ConnectorResult> { tracing::debug!(sql, params = ?params, query_type = "query_raw"); - ready(with_connection(&mut self.state, |_, conn| conn.query_raw(sql, params))) + self.with_connection(|conn| conn.query_raw(sql, params)) } fn describe_query<'a>( @@ -246,52 +242,27 @@ impl SqlFlavour for SqliteFlavour { sql: &'a str, ) -> BoxFuture<'a, ConnectorResult> { tracing::debug!(sql, query_type = "describe_query"); - ready(with_connection(&mut self.state, |params, conn| { - conn.describe_query(sql, params) - })) + ready(imp::describe_query(&mut self.state, sql)) } fn introspect( &mut self, - namespaces: Option, - ctx: &schema_connector::IntrospectionContext, + _namespaces: Option, + _ctx: &schema_connector::IntrospectionContext, ) -> BoxFuture<'_, ConnectorResult> { - introspect(self, namespaces, ctx) + Box::pin(imp::introspect(&mut self.state)) } fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>> { - ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql))) + self.with_connection(|conn| conn.raw_cmd(sql)) } fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { - ready(with_connection(&mut self.state, move |params, connection| { - reset(params, connection) - })) - } - - fn set_params(&mut self, params: ConnectorParams) -> ConnectorResult<()> { - let quaint::connector::SqliteParams { file_path, .. } = - quaint::connector::SqliteParams::try_from(params.connection_string.as_str()) - .map_err(ConnectorError::url_parse_error)?; - - self.state.set_params(Params { - connector_params: params, - file_path, - }); - Ok(()) + Box::pin(imp::reset(&mut self.state)) } fn set_preview_features(&mut self, preview_features: enumflags2::BitFlags) { - match &mut self.state { - super::State::Initial => { - if !preview_features.is_empty() { - tracing::warn!("set_preview_feature on Initial state has no effect ({preview_features})."); - } - } - super::State::WithParams(params) | super::State::Connected(params, _) => { - params.connector_params.preview_features = preview_features - } - } + imp::set_preview_features(&mut self.state, preview_features) } #[tracing::instrument(skip(self, migrations))] @@ -303,7 +274,7 @@ impl SqlFlavour for SqliteFlavour { ) -> BoxFuture<'a, ConnectorResult> { Box::pin(async move { tracing::debug!("Applying migrations to temporary in-memory SQLite database."); - let mut shadow_db_conn = Connection::new_in_memory(); + let shadow_db_conn = imp::Connection::new_in_memory(); for migration in migrations { let script = migration.read_migration_script()?; @@ -312,17 +283,21 @@ impl SqlFlavour for SqliteFlavour { migration.migration_name() ); - shadow_db_conn.raw_cmd(&script).map_err(|connector_error| { + shadow_db_conn.raw_cmd(&script).await.map_err(|connector_error| { connector_error.into_migration_does_not_apply_cleanly(migration.migration_name().to_owned()) })?; } - shadow_db_conn.describe_schema().await + describe_schema(&shadow_db_conn).await }) } + fn set_params(&mut self, connector_params: ConnectorParams) -> ConnectorResult<()> { + imp::set_params(&mut self.state, connector_params) + } + fn version(&mut self) -> BoxFuture<'_, ConnectorResult>> { - version(self) + self.with_connection(|conn| conn.version()) } fn search_path(&self) -> &str { @@ -330,28 +305,20 @@ impl SqlFlavour for SqliteFlavour { } } -fn acquire_lock(connection: &mut Connection) -> ConnectorResult<()> { - connection.raw_cmd("PRAGMA main.locking_mode=EXCLUSIVE") +async fn acquire_lock(connection: &imp::Connection) -> ConnectorResult<()> { + connection.raw_cmd("PRAGMA main.locking_mode=EXCLUSIVE").await } -fn with_connection<'a, O, C>(state: &'a mut State, f: C) -> ConnectorResult -where - O: 'a + Send, - C: (FnOnce(&'a mut Params, &'a mut Connection) -> ConnectorResult) + Send + Sync + 'a, -{ - match state { - super::State::Initial => panic!("logic error: Initial"), - super::State::Connected(p, c) => f(p, c), - super::State::WithParams(p) => { - let conn = Connection::new(p)?; - let params = match std::mem::replace(state, super::State::Initial) { - super::State::WithParams(p) => p, - _ => unreachable!(), - }; - *state = super::State::Connected(params, conn); - with_connection(state, f) - } - } +async fn describe_schema(connection: &imp::Connection) -> ConnectorResult { + SqlSchemaDescriber::new(connection.as_connector()) + .describe_impl() + .await + .map_err(|err| match err.into_kind() { + DescriberErrorKind::QuaintError(err) => ConnectorError::from_source(err, "Error describing the database."), + DescriberErrorKind::CrossSchemaReference { .. } => { + unreachable!("No schemas on SQLite") + } + }) } fn ready(output: O) -> BoxFuture<'static, O> { diff --git a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/native/mod.rs b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/native/mod.rs index b3f7ef23d621..ada84bcb2481 100644 --- a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/native/mod.rs +++ b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/native/mod.rs @@ -2,57 +2,49 @@ pub(crate) use quaint::connector::rusqlite; -use crate::flavour::SqlFlavour; use quaint::connector::{ColumnType, DescribedColumn, DescribedParameter, GetRow, ToColumnNames}; -use schema_connector::{BoxFuture, ConnectorError, ConnectorResult, Namespaces}; -use sql_schema_describer::{sqlite as describer, DescriberErrorKind, SqlSchema}; +use schema_connector::{BoxFuture, ConnectorError, ConnectorParams, ConnectorResult}; +use sql_schema_describer::SqlSchema; use sqlx_core::{column::Column, type_info::TypeInfo}; use sqlx_sqlite::SqliteColumn; use std::sync::Mutex; use user_facing_errors::schema_engine::ApplyMigrationError; +use super::Params; + +pub(super) type State = crate::flavour::State; + pub(super) struct Connection(Mutex); impl Connection { - pub(super) fn new(params: &super::Params) -> ConnectorResult { + pub fn new(params: &super::Params) -> ConnectorResult { Ok(Connection(Mutex::new( rusqlite::Connection::open(¶ms.file_path).map_err(convert_error)?, ))) } - pub(super) fn new_in_memory() -> Self { - Connection(Mutex::new(rusqlite::Connection::open_in_memory().unwrap())) + pub fn as_connector(&self) -> &Mutex { + &self.0 } - pub(super) async fn describe_schema(&mut self) -> ConnectorResult { - // Note: this relies on quaint::connector::rusqlite::Connection, which is exposed by `quaint/expose-drivers`, and is not Wasm-compatible. - describer::SqlSchemaDescriber::new(&self.0) - .describe_impl() - .await - .map_err(|err| match err.into_kind() { - DescriberErrorKind::QuaintError(err) => { - ConnectorError::from_source(err, "Error describing the database.") - } - DescriberErrorKind::CrossSchemaReference { .. } => { - unreachable!("No schemas on SQLite") - } - }) + pub fn new_in_memory() -> Self { + Connection(Mutex::new(rusqlite::Connection::open_in_memory().unwrap())) } - pub(super) fn raw_cmd(&mut self, sql: &str) -> ConnectorResult<()> { + pub async fn raw_cmd(&self, sql: &str) -> ConnectorResult<()> { tracing::debug!(query_type = "raw_cmd", sql); let conn = self.0.lock().unwrap(); conn.execute_batch(sql).map_err(convert_error) } - pub(super) fn query(&mut self, query: quaint::ast::Query<'_>) -> ConnectorResult { + pub async fn query(&self, query: quaint::ast::Query<'_>) -> ConnectorResult { use quaint::visitor::Visitor; let (sql, params) = quaint::visitor::Sqlite::build(query).unwrap(); - self.query_raw(&sql, ¶ms) + self.query_raw(&sql, ¶ms).await } - pub(super) fn query_raw( - &mut self, + pub async fn query_raw( + &self, sql: &str, params: &[quaint::prelude::Value<'_>], ) -> ConnectorResult { @@ -77,82 +69,34 @@ impl Connection { )) } - pub(super) fn describe_query( - &mut self, - sql: &str, - params: &super::Params, - ) -> ConnectorResult { - tracing::debug!(query_type = "describe_query", sql); - // SQLite only provides type information for _declared_ column types. That means any expression will not contain type information. - // Sqlx works around this by running an `EXPLAIN` query and inferring types by interpreting sqlite bytecode. - // If you're curious, here's the code: https://github.com/launchbadge/sqlx/blob/16e3f1025ad1e106d1acff05f591b8db62d688e2/sqlx-sqlite/src/connection/explain.rs#L557 - // We use SQLx's as a fallback for when quaint's infers Unknown. - let describe = sqlx_sqlite::describe_blocking(sql, ¶ms.file_path) - .map_err(|err| ConnectorError::from_source(err, "Error describing the query."))?; + pub async fn apply_migration_script(&self, migration_name: &str, script: &str) -> ConnectorResult<()> { + tracing::debug!(query_type = "raw_cmd", sql = script); let conn = self.0.lock().unwrap(); - let stmt = conn.prepare_cached(sql).map_err(convert_error)?; - - let parameters = (1..=stmt.parameter_count()) - .map(|idx| match stmt.parameter_name(idx) { - Some(name) => { - // SQLite parameter names are prefixed with a colon. We remove it here so that the js doc parser can match the names. - let name = name.strip_prefix(':').unwrap_or(name); - - DescribedParameter::new_named(name, ColumnType::Unknown) - } - None => DescribedParameter::new_unnamed(idx, ColumnType::Unknown), - }) - .collect(); - let columns = stmt - .columns() - .iter() - .zip(&describe.nullable) - .enumerate() - .map(|(idx, (col, nullable))| { - let typ = match ColumnType::from(col) { - // If the column type is unknown, we try to infer it from the describe. - ColumnType::Unknown => describe.column(idx).to_column_type(), - typ => typ, - }; - - DescribedColumn::new_named(col.name(), typ).is_nullable(nullable.unwrap_or(true)) + conn.execute_batch(script).map_err(|sqlite_error: rusqlite::Error| { + let database_error_code = match sqlite_error { + rusqlite::Error::SqliteFailure(rusqlite::ffi::Error { extended_code, .. }, _) + | rusqlite::Error::SqlInputError { + error: rusqlite::ffi::Error { extended_code, .. }, + .. + } => extended_code.to_string(), + _ => "none".to_owned(), + }; + + ConnectorError::user_facing(ApplyMigrationError { + migration_name: migration_name.to_owned(), + database_error_code, + database_error: sqlite_error.to_string(), }) - .collect(); - - Ok(quaint::connector::DescribedQuery { - columns, - parameters, - enum_names: None, }) } -} -pub(super) fn generic_apply_migration_script( - migration_name: &str, - script: &str, - conn: &Connection, -) -> ConnectorResult<()> { - tracing::debug!(query_type = "raw_cmd", sql = script); - let conn = conn.0.lock().unwrap(); - conn.execute_batch(script).map_err(|sqlite_error: rusqlite::Error| { - let database_error_code = match sqlite_error { - rusqlite::Error::SqliteFailure(rusqlite::ffi::Error { extended_code, .. }, _) - | rusqlite::Error::SqlInputError { - error: rusqlite::ffi::Error { extended_code, .. }, - .. - } => extended_code.to_string(), - _ => "none".to_owned(), - }; - - ConnectorError::user_facing(ApplyMigrationError { - migration_name: migration_name.to_owned(), - database_error_code, - database_error: sqlite_error.to_string(), - }) - }) + pub(super) fn version(&mut self) -> BoxFuture<'_, ConnectorResult>> { + super::ready(Ok(Some(quaint::connector::sqlite_version().to_owned()))) + } } -pub(super) fn create_database(params: &super::Params) -> ConnectorResult { +pub(super) fn create_database(state: &State) -> ConnectorResult { + let params = state.get_unwrapped_params(); let path = std::path::Path::new(¶ms.file_path); if path.exists() { @@ -171,13 +115,15 @@ pub(super) fn create_database(params: &super::Params) -> ConnectorResult Ok(params.file_path.clone()) } -pub(super) fn drop_database(params: &super::Params) -> ConnectorResult<()> { +pub(super) fn drop_database(state: &State) -> ConnectorResult<()> { + let params = state.get_unwrapped_params(); let file_path = ¶ms.file_path; std::fs::remove_file(file_path) .map_err(|err| ConnectorError::from_msg(format!("Failed to delete SQLite database at `{file_path}`.\n{err}"))) } -pub(super) fn ensure_connection_validity(params: &super::Params) -> ConnectorResult<()> { +pub(super) async fn ensure_connection_validity(state: &State) -> ConnectorResult<()> { + let params = state.get_unwrapped_params(); let path = std::path::Path::new(¶ms.file_path); // we use metadata() here instead of Path::exists() because we want accurate diagnostics: // if the file is not reachable because of missing permissions, we don't want to return @@ -197,41 +143,31 @@ pub(super) fn ensure_connection_validity(params: &super::Params) -> ConnectorRes } } -pub(super) fn introspect<'a>( - instance: &'a mut super::SqliteFlavour, - namespaces: Option, - _ctx: &schema_connector::IntrospectionContext, -) -> BoxFuture<'a, ConnectorResult> { - // TODO: move to a separate function - Box::pin(async move { - if let Some(params) = instance.state.params() { - let path = std::path::Path::new(¶ms.file_path); - if std::fs::metadata(path).is_err() { - return Err(ConnectorError::user_facing( - user_facing_errors::common::DatabaseDoesNotExist::Sqlite { - database_file_name: path - .file_name() - .map(|name| name.to_string_lossy().into_owned()) - .unwrap_or_default(), - database_file_path: params.file_path.clone(), - }, - )); - } +pub(super) async fn introspect(state: &mut State) -> ConnectorResult { + if let Some(params) = state.params() { + let path = std::path::Path::new(¶ms.file_path); + if std::fs::metadata(path).is_err() { + return Err(ConnectorError::user_facing( + user_facing_errors::common::DatabaseDoesNotExist::Sqlite { + database_file_name: path + .file_name() + .map(|name| name.to_string_lossy().into_owned()) + .unwrap_or_default(), + database_file_path: params.file_path.clone(), + }, + )); } + } - instance.describe_schema(namespaces).await - }) -} - -pub(super) fn version(_instance: &mut super::SqliteFlavour) -> BoxFuture<'_, ConnectorResult>> { - super::ready(Ok(Some(quaint::connector::sqlite_version().to_owned()))) + super::describe_schema(get_connection(state)?).await } -pub(super) fn reset(params: &super::Params, connection: &mut Connection) -> ConnectorResult<()> { +pub(super) async fn reset(state: &mut State) -> ConnectorResult<()> { + let (connection, params) = get_connection_and_params(state)?; let file_path = ¶ms.file_path; - connection.raw_cmd("PRAGMA main.locking_mode=NORMAL")?; - connection.raw_cmd("PRAGMA main.quick_check")?; + connection.raw_cmd("PRAGMA main.locking_mode=NORMAL").await?; + connection.raw_cmd("PRAGMA main.quick_check").await?; tracing::debug!("Truncating {:?}", file_path); @@ -242,7 +178,105 @@ pub(super) fn reset(params: &super::Params, connection: &mut Connection) -> Conn ) })?; - super::acquire_lock(connection) + super::acquire_lock(connection).await +} + +pub(super) fn describe_query(state: &mut State, sql: &str) -> ConnectorResult { + let (conn, params) = get_connection_and_params(state)?; + tracing::debug!(query_type = "describe_query", sql); + // SQLite only provides type information for _declared_ column types. That means any expression will not contain type information. + // Sqlx works around this by running an `EXPLAIN` query and inferring types by interpreting sqlite bytecode. + // If you're curious, here's the code: https://github.com/launchbadge/sqlx/blob/16e3f1025ad1e106d1acff05f591b8db62d688e2/sqlx-sqlite/src/connection/explain.rs#L557 + // We use SQLx's as a fallback for when quaint's infers Unknown. + let describe = sqlx_sqlite::describe_blocking(sql, ¶ms.file_path) + .map_err(|err| ConnectorError::from_source(err, "Error describing the query."))?; + let conn = conn.0.lock().unwrap(); + let stmt = conn.prepare_cached(sql).map_err(convert_error)?; + + let parameters = (1..=stmt.parameter_count()) + .map(|idx| match stmt.parameter_name(idx) { + Some(name) => { + // SQLite parameter names are prefixed with a colon. We remove it here so that the js doc parser can match the names. + let name = name.strip_prefix(':').unwrap_or(name); + + DescribedParameter::new_named(name, ColumnType::Unknown) + } + None => DescribedParameter::new_unnamed(idx, ColumnType::Unknown), + }) + .collect(); + let columns = stmt + .columns() + .iter() + .zip(&describe.nullable) + .enumerate() + .map(|(idx, (col, nullable))| { + let typ = match ColumnType::from(col) { + // If the column type is unknown, we try to infer it from the describe. + ColumnType::Unknown => describe.column(idx).to_column_type(), + typ => typ, + }; + + DescribedColumn::new_named(col.name(), typ).is_nullable(nullable.unwrap_or(true)) + }) + .collect(); + + Ok(quaint::connector::DescribedQuery { + columns, + parameters, + enum_names: None, + }) +} + +pub(super) fn connection_string(state: &State) -> Option<&str> { + state + .params() + .map(|params| params.connector_params.connection_string.as_str()) +} + +pub(super) fn get_connection(state: &mut State) -> ConnectorResult<&mut Connection> { + let (conn, _) = get_connection_and_params(state)?; + Ok(conn) +} + +fn get_connection_and_params(state: &mut State) -> ConnectorResult<(&mut Connection, &mut Params)> { + match state { + super::State::Initial => panic!("logic error: Initial"), + super::State::Connected(params, conn) => Ok((conn, params)), + super::State::WithParams(p) => { + let conn = Connection::new(p)?; + let params = match std::mem::replace(state, super::State::Initial) { + super::State::WithParams(p) => p, + _ => unreachable!(), + }; + *state = super::State::Connected(params, conn); + get_connection_and_params(state) + } + } +} + +pub(super) fn set_preview_features(state: &mut State, preview_features: enumflags2::BitFlags) { + match state { + super::State::Initial => { + if !preview_features.is_empty() { + tracing::warn!("set_preview_feature on Initial state has no effect ({preview_features})."); + } + } + super::State::WithParams(params) | super::State::Connected(params, _) => { + params.connector_params.preview_features = preview_features + } + } +} + +pub(super) fn set_params(state: &mut State, params: ConnectorParams) -> ConnectorResult<()> { + let quaint::connector::SqliteParams { file_path, .. } = + quaint::connector::SqliteParams::try_from(params.connection_string.as_str()) + .map_err(ConnectorError::url_parse_error)?; + + state.set_params(Params { + connector_params: params, + file_path, + }); + Ok(()) } fn convert_error(err: rusqlite::Error) -> ConnectorError { diff --git a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/wasm/mod.rs b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/wasm/mod.rs index 9262e6d4a6ea..d8f6bc2f3a9b 100644 --- a/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/wasm/mod.rs +++ b/schema-engine/connectors/sql-schema-connector/src/flavour/sqlite/wasm/mod.rs @@ -1,88 +1,108 @@ //! All the quaint-wrangling for the sqlite connector should happen here. -use quaint::connector::{ColumnType, DescribedColumn, DescribedParameter, GetRow, ToColumnNames}; -use schema_connector::{BoxFuture, ConnectorError, ConnectorResult, Namespaces}; -use sql_schema_describer::{sqlite as describer, DescriberErrorKind, SqlSchema}; -use user_facing_errors::schema_engine::ApplyMigrationError; - -// TODO: use ExternalConnector here. -pub(super) struct Connection(); +use crate::BitFlags; +use crate::ConnectorParams; +use psl::PreviewFeature; +use quaint::connector::ExternalConnector; +use schema_connector::{ConnectorError, ConnectorResult}; +use sql_schema_describer::SqlSchema; +use std::sync::Arc; + +pub(super) struct State { + connection: Connection, + preview_features: BitFlags, +} -impl Connection { - pub(super) fn new(params: &super::Params) -> ConnectorResult { - panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); +impl State { + pub fn new(connection: Arc, preview_features: BitFlags) -> Self { + Self { + preview_features, + connection: Connection(connection), + } } +} + +pub(super) struct Connection(Arc); - pub(super) fn new_in_memory() -> Self { +impl Connection { + pub fn new_in_memory() -> Self { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } - pub(super) async fn describe_schema(&mut self) -> ConnectorResult { - panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); + pub fn as_connector(&self) -> &Arc { + &self.0 } - pub(super) fn raw_cmd(&mut self, sql: &str) -> ConnectorResult<()> { + pub async fn raw_cmd(&self, sql: &str) -> ConnectorResult<()> { tracing::debug!(query_type = "raw_cmd", sql); - panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); + self.0.raw_cmd(sql).await.map_err(convert_error) } - pub(super) fn query(&mut self, query: quaint::ast::Query<'_>) -> ConnectorResult { + pub async fn query(&self, query: quaint::ast::Query<'_>) -> ConnectorResult { use quaint::visitor::Visitor; let (sql, params) = quaint::visitor::Sqlite::build(query).unwrap(); - self.query_raw(&sql, ¶ms) + self.query_raw(&sql, ¶ms).await } - pub(super) fn query_raw( - &mut self, + pub async fn query_raw( + &self, sql: &str, params: &[quaint::prelude::Value<'_>], ) -> ConnectorResult { tracing::debug!(query_type = "query_raw", sql); - panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); + self.0.query_raw(sql, params).await.map_err(convert_error) } - pub(super) fn describe_query( - &mut self, - sql: &str, - params: &super::Params, - ) -> ConnectorResult { + pub async fn apply_migration_script(&self, _migration_name: &str, _script: &str) -> ConnectorResult<()> { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } + + pub async fn version(&self) -> ConnectorResult> { + self.0.version().await.map_err(convert_error) + } } -pub(super) fn generic_apply_migration_script( - migration_name: &str, - script: &str, - conn: &Connection, -) -> ConnectorResult<()> { - tracing::debug!(query_type = "raw_cmd", sql = script); +pub(super) fn create_database(_params: &State) -> ConnectorResult { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } -pub(super) fn create_database(params: &super::Params) -> ConnectorResult { +pub(super) fn drop_database(_state: &State) -> ConnectorResult<()> { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } -pub(super) fn drop_database(params: &super::Params) -> ConnectorResult<()> { +pub(super) async fn reset(_state: &mut State) -> ConnectorResult<()> { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } -pub(super) fn ensure_connection_validity(params: &super::Params) -> ConnectorResult<()> { - panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); +pub(super) async fn ensure_connection_validity(state: &State) -> ConnectorResult<()> { + state.connection.version().await?; + Ok(()) } -pub(super) fn introspect<'a>( - instance: &'a mut super::SqliteFlavour, - namespaces: Option, - _ctx: &schema_connector::IntrospectionContext, -) -> BoxFuture<'a, ConnectorResult> { +pub(super) async fn introspect(instance: &mut State) -> ConnectorResult { + super::describe_schema(&instance.connection).await +} + +pub(super) fn describe_query(state: &mut State, sql: &str) -> ConnectorResult { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } -pub(super) fn reset(params: &super::Params, connection: &mut Connection) -> ConnectorResult<()> { +pub(super) fn connection_string(_state: &State) -> Option<&str> { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } -pub(super) fn version(instance: &mut super::SqliteFlavour) -> BoxFuture<'_, ConnectorResult>> { +pub(super) fn get_connection(state: &mut State) -> ConnectorResult<&mut Connection> { + Ok(&mut state.connection) +} + +pub(super) fn set_preview_features(state: &mut State, features: BitFlags) { + state.preview_features = features; +} + +pub(super) fn set_params(_state: &mut State, params: ConnectorParams) -> ConnectorResult<()> { panic!("[sql-schema-connector::flavour::sqlite::wasm] Not implemented"); } + +fn convert_error(err: quaint::error::Error) -> ConnectorError { + ConnectorError::from_source(err, "external connector error") +} diff --git a/schema-engine/sql-schema-describer/src/sqlite.rs b/schema-engine/sql-schema-describer/src/sqlite.rs index 658207283786..5000b8bad153 100644 --- a/schema-engine/sql-schema-describer/src/sqlite.rs +++ b/schema-engine/sql-schema-describer/src/sqlite.rs @@ -8,9 +8,9 @@ use either::Either; use indexmap::IndexMap; use quaint::{ ast::{Value, ValueType}, - prelude::ResultRow, + prelude::{Queryable, ResultRow}, }; -use std::{any::type_name, borrow::Cow, collections::BTreeMap, convert::TryInto, fmt::Debug, path::Path}; +use std::{any::type_name, borrow::Cow, collections::BTreeMap, convert::TryInto, fmt::Debug, path::Path, sync::Arc}; use tracing::trace; #[cfg(feature = "sqlite-native")] @@ -36,6 +36,17 @@ impl Connection for quaint::single::Quaint { } } +#[async_trait::async_trait] +impl Connection for Arc { + async fn query_raw<'a>( + &'a self, + sql: &'a str, + params: &'a [quaint::prelude::Value<'a>], + ) -> quaint::Result { + quaint::prelude::Queryable::query_raw(&**self, sql, params).await + } +} + pub struct SqlSchemaDescriber<'a> { conn: &'a (dyn Connection + Send + Sync), }