Skip to content

Commit

Permalink
feat: implement sqlite schema engine connection with external connect…
Browse files Browse the repository at this point in the history
…or (#5133)

* feat: implement sqlite schema engine connection with external connector

* fix: fix broken introspect test
  • Loading branch information
jacek-prisma authored Jan 23, 2025
1 parent eb5bc54 commit acc0b9d
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 263 deletions.
155 changes: 61 additions & 94 deletions schema-engine/connectors/sql-schema-connector/src/flavour/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@ mod native;
#[cfg(not(feature = "sqlite-native"))]
mod wasm;

use std::future::Future;

#[cfg(feature = "sqlite-native")]
use native::{
create_database, drop_database, ensure_connection_validity, generic_apply_migration_script, introspect, reset,
version, Connection,
};
use native as imp;

#[cfg(not(feature = "sqlite-native"))]
use wasm::{
create_database, drop_database, ensure_connection_validity, generic_apply_migration_script, introspect, reset,
version, Connection,
};
use wasm as imp;

use crate::flavour::SqlFlavour;
use indoc::indoc;
use schema_connector::{
migrations_directory::MigrationDirectory, BoxFuture, ConnectorError, ConnectorParams, ConnectorResult, Namespaces,
};
use sql_schema_describer::SqlSchema;
use sql_schema_describer::{sqlite::SqlSchemaDescriber, DescriberErrorKind, SqlSchema};

type State = super::State<Params, Connection>;
type State = imp::State;

struct Params {
connector_params: ConnectorParams,
Expand All @@ -35,18 +31,32 @@ pub(crate) struct SqliteFlavour {
}

impl SqliteFlavour {
#[cfg(not(feature = "sqlite-native"))]
pub(crate) fn new_external(_adapter: std::sync::Arc<dyn quaint::connector::ExternalConnector>) -> Self {
SqliteFlavour { state: State::Initial }
fn with_connection<'a, F, O, C>(&'a mut self, f: C) -> BoxFuture<'a, ConnectorResult<O>>
where
O: 'a + Send,
C: (FnOnce(&'a mut imp::Connection) -> F) + Send + Sync + 'a,
F: Future<Output = ConnectorResult<O>> + Send + 'a,
{
Box::pin(async move { f(imp::get_connection(&mut self.state)?).await })
}
}

#[cfg(feature = "sqlite-native")]
impl Default for SqliteFlavour {
fn default() -> Self {
SqliteFlavour { state: State::Initial }
}
}

impl SqliteFlavour {
#[cfg(not(feature = "sqlite-native"))]
pub(crate) fn new_external(adapter: std::sync::Arc<dyn quaint::connector::ExternalConnector>) -> Self {
SqliteFlavour {
state: State::new(adapter, Default::default()),
}
}
}

impl std::fmt::Debug for SqliteFlavour {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("<SQLite connector>")
Expand All @@ -67,15 +77,11 @@ impl SqlFlavour for SqliteFlavour {
migration_name: &'a str,
script: &'a str,
) -> BoxFuture<'a, ConnectorResult<()>> {
ready(with_connection(&mut self.state, move |_params, connection| {
generic_apply_migration_script(migration_name, script, connection)
}))
self.with_connection(|conn| conn.apply_migration_script(migration_name, script))
}

fn connection_string(&self) -> Option<&str> {
self.state
.params()
.map(|p| p.connector_params.connection_string.as_str())
imp::connection_string(&self.state)
}

fn table_names(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<Vec<String>>> {
Expand All @@ -93,10 +99,7 @@ impl SqlFlavour for SqliteFlavour {
}

fn create_database(&mut self) -> BoxFuture<'_, ConnectorResult<String>> {
Box::pin(async {
let params = self.state.get_unwrapped_params();
create_database(params)
})
Box::pin(async { imp::create_database(&self.state) })
}

fn create_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
Expand All @@ -121,26 +124,19 @@ impl SqlFlavour for SqliteFlavour {
}

fn describe_schema(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<SqlSchema>> {
Box::pin(async move {
let schema = with_connection(&mut self.state, |_, conn| Ok(Box::pin(conn.describe_schema())))?.await?;
Ok(schema)
})
self.with_connection(|conn| describe_schema(conn))
}

fn drop_database(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
let params = self.state.get_unwrapped_params();
let ret = drop_database(params);
ready(ret)
ready(imp::drop_database(&self.state))
}

fn drop_migrations_table(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
self.raw_cmd("DROP TABLE _prisma_migrations")
}

fn ensure_connection_validity(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
let params = self.state.get_unwrapped_params();
let result = ensure_connection_validity(params);
ready(result)
Box::pin(imp::ensure_connection_validity(&self.state))
}

fn load_migrations_table(
Expand All @@ -164,8 +160,8 @@ impl SqlFlavour for SqliteFlavour {
FROM `_prisma_migrations`
ORDER BY `started_at` ASC
"#};
ready(with_connection(&mut self.state, |_, conn| {
let rows = match conn.query_raw(SQL, &[]) {
self.with_connection(|conn| async {
let rows = match conn.query_raw(SQL, &[]).await {
Ok(result) => result,
Err(err) => {
#[cfg(feature = "sqlite-native")]
Expand Down Expand Up @@ -222,14 +218,14 @@ impl SqlFlavour for SqliteFlavour {
tracing::debug!("Found {} migrations in the migrations table.", rows.len());

Ok(Ok(rows))
}))
})
}

fn query<'a>(
&'a mut self,
query: quaint::ast::Query<'a>,
) -> BoxFuture<'a, ConnectorResult<quaint::prelude::ResultSet>> {
ready(with_connection(&mut self.state, |_, conn| conn.query(query)))
self.with_connection(|conn| conn.query(query))
}

fn query_raw<'a>(
Expand All @@ -238,60 +234,35 @@ impl SqlFlavour for SqliteFlavour {
params: &'a [quaint::Value<'a>],
) -> BoxFuture<'a, ConnectorResult<quaint::prelude::ResultSet>> {
tracing::debug!(sql, params = ?params, query_type = "query_raw");
ready(with_connection(&mut self.state, |_, conn| conn.query_raw(sql, params)))
self.with_connection(|conn| conn.query_raw(sql, params))
}

fn describe_query<'a>(
&'a mut self,
sql: &'a str,
) -> BoxFuture<'a, ConnectorResult<quaint::connector::DescribedQuery>> {
tracing::debug!(sql, query_type = "describe_query");
ready(with_connection(&mut self.state, |params, conn| {
conn.describe_query(sql, params)
}))
ready(imp::describe_query(&mut self.state, sql))
}

fn introspect(
&mut self,
namespaces: Option<Namespaces>,
ctx: &schema_connector::IntrospectionContext,
_namespaces: Option<Namespaces>,
_ctx: &schema_connector::IntrospectionContext,
) -> BoxFuture<'_, ConnectorResult<SqlSchema>> {
introspect(self, namespaces, ctx)
Box::pin(imp::introspect(&mut self.state))
}

fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>> {
ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql)))
self.with_connection(|conn| conn.raw_cmd(sql))
}

fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
ready(with_connection(&mut self.state, move |params, connection| {
reset(params, connection)
}))
}

fn set_params(&mut self, params: ConnectorParams) -> ConnectorResult<()> {
let quaint::connector::SqliteParams { file_path, .. } =
quaint::connector::SqliteParams::try_from(params.connection_string.as_str())
.map_err(ConnectorError::url_parse_error)?;

self.state.set_params(Params {
connector_params: params,
file_path,
});
Ok(())
Box::pin(imp::reset(&mut self.state))
}

fn set_preview_features(&mut self, preview_features: enumflags2::BitFlags<psl::PreviewFeature>) {
match &mut self.state {
super::State::Initial => {
if !preview_features.is_empty() {
tracing::warn!("set_preview_feature on Initial state has no effect ({preview_features}).");
}
}
super::State::WithParams(params) | super::State::Connected(params, _) => {
params.connector_params.preview_features = preview_features
}
}
imp::set_preview_features(&mut self.state, preview_features)
}

#[tracing::instrument(skip(self, migrations))]
Expand All @@ -303,7 +274,7 @@ impl SqlFlavour for SqliteFlavour {
) -> BoxFuture<'a, ConnectorResult<SqlSchema>> {
Box::pin(async move {
tracing::debug!("Applying migrations to temporary in-memory SQLite database.");
let mut shadow_db_conn = Connection::new_in_memory();
let shadow_db_conn = imp::Connection::new_in_memory();
for migration in migrations {
let script = migration.read_migration_script()?;

Expand All @@ -312,46 +283,42 @@ impl SqlFlavour for SqliteFlavour {
migration.migration_name()
);

shadow_db_conn.raw_cmd(&script).map_err(|connector_error| {
shadow_db_conn.raw_cmd(&script).await.map_err(|connector_error| {
connector_error.into_migration_does_not_apply_cleanly(migration.migration_name().to_owned())
})?;
}

shadow_db_conn.describe_schema().await
describe_schema(&shadow_db_conn).await
})
}

fn set_params(&mut self, connector_params: ConnectorParams) -> ConnectorResult<()> {
imp::set_params(&mut self.state, connector_params)
}

fn version(&mut self) -> BoxFuture<'_, ConnectorResult<Option<String>>> {
version(self)
self.with_connection(|conn| conn.version())
}

fn search_path(&self) -> &str {
"main"
}
}

fn acquire_lock(connection: &mut Connection) -> ConnectorResult<()> {
connection.raw_cmd("PRAGMA main.locking_mode=EXCLUSIVE")
async fn acquire_lock(connection: &imp::Connection) -> ConnectorResult<()> {
connection.raw_cmd("PRAGMA main.locking_mode=EXCLUSIVE").await
}

fn with_connection<'a, O, C>(state: &'a mut State, f: C) -> ConnectorResult<O>
where
O: 'a + Send,
C: (FnOnce(&'a mut Params, &'a mut Connection) -> ConnectorResult<O>) + Send + Sync + 'a,
{
match state {
super::State::Initial => panic!("logic error: Initial"),
super::State::Connected(p, c) => f(p, c),
super::State::WithParams(p) => {
let conn = Connection::new(p)?;
let params = match std::mem::replace(state, super::State::Initial) {
super::State::WithParams(p) => p,
_ => unreachable!(),
};
*state = super::State::Connected(params, conn);
with_connection(state, f)
}
}
async fn describe_schema(connection: &imp::Connection) -> ConnectorResult<SqlSchema> {
SqlSchemaDescriber::new(connection.as_connector())
.describe_impl()
.await
.map_err(|err| match err.into_kind() {
DescriberErrorKind::QuaintError(err) => ConnectorError::from_source(err, "Error describing the database."),
DescriberErrorKind::CrossSchemaReference { .. } => {
unreachable!("No schemas on SQLite")
}
})
}

fn ready<O: Send + Sync + 'static>(output: O) -> BoxFuture<'static, O> {
Expand Down
Loading

0 comments on commit acc0b9d

Please sign in to comment.