From abfed19237c30b60f7478a808e3c5f299603b455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 15 Dec 2019 12:04:40 +0000 Subject: [PATCH] add initial tokio-postgres support --- .circleci/config.yml | 16 + refinery/Cargo.toml | 6 + refinery/tests/tokio_postgres.rs | 596 ++++++++++++++++++++++ refinery_migrations/Cargo.toml | 20 +- refinery_migrations/src/async_traits.rs | 114 +++++ refinery_migrations/src/lib.rs | 24 +- refinery_migrations/src/tokio_postgres.rs | 70 +++ refinery_migrations/src/traits.rs | 2 +- 8 files changed, 835 insertions(+), 13 deletions(-) create mode 100644 refinery/tests/tokio_postgres.rs create mode 100644 refinery_migrations/src/async_traits.rs create mode 100644 refinery_migrations/src/tokio_postgres.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index a50a54ce..796c9bdc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -108,6 +108,22 @@ jobs: - checkout - run: cargo build -p refinery_cli - run: cd refinery && cargo test -p refinery --features "tpostgres" --test postgres -- --test-threads 1 + test-tokio-postgres-stable: + docker: + - image: << pipeline.parameters.stable >> + - image: postgres:9.6.13-alpine + steps: + - checkout + - run: cargo build -p refinery_cli + - run: cd refinery && cargo test -p refinery --features "ttokio-postgres" --test tokio-postgres -- --test-threads 1 + test-tokio-postgres-nightly: + docker: + - image: << pipeline.parameters.nightly >> + - image: postgres:9.6.13-alpine + steps: + - checkout + - run: cargo build -p refinery_cli + - run: cd refinery && cargo test -p refinery --features "ttokio-postgres" --test tokio-postgres -- --test-threads 1 test-mysql-previous: docker: - image: << pipeline.parameters.previous >> diff --git a/refinery/Cargo.toml b/refinery/Cargo.toml index 6a4cc4ba..60c5468d 100644 --- a/refinery/Cargo.toml +++ b/refinery/Cargo.toml @@ -15,11 +15,14 @@ edition = "2018" default = [] rusqlite = ["refinery-migrations/rusqlite", "barrel/sqlite3"] postgres = ["refinery-migrations/postgres", "barrel/pg"] +tokio-postgres = ["refinery-migrations/tokio-postgres"] mysql = ["refinery-migrations/mysql", "barrel/mysql"] #testing features trusqlite = ["mysql", "postgres", "rusqlite", "refinery-migrations/mysql", "refinery-migrations/postgres", "refinery-migrations/rusqlite", "mod_migrations/sqlite"] tpostgres = ["mysql", "postgres", "rusqlite", "refinery-migrations/mysql", "refinery-migrations/postgres", "refinery-migrations/rusqlite", "mod_migrations/pg"] +ttokio-postgres = ["refinery-migrations/tokio-postgres", "mod_migrations/pg"] + tmysql = ["mysql", "postgres", "rusqlite", "refinery-migrations/mysql", "refinery-migrations/postgres", "refinery-migrations/rusqlite", "mod_migrations/mysql"] [dependencies] @@ -30,6 +33,9 @@ barrel = "0.5.3" [dev-dependencies] ttrusqlite = {package = "rusqlite", version = "0.18.0"} ttpostgres = {package = "postgres", version = "0.15"} +ttokio_postgres = {package = "tokio-postgres", version = "0.5.0-alpha.2" } +tokio = { version = "0.2", features = ["full"] } +futures = "0.3.1" ttmysql = {package = "mysql", version = "16.0"} mod_migrations = {path = "./tests/mod_migrations"} diff --git a/refinery/tests/tokio_postgres.rs b/refinery/tests/tokio_postgres.rs new file mode 100644 index 00000000..e164fd76 --- /dev/null +++ b/refinery/tests/tokio_postgres.rs @@ -0,0 +1,596 @@ +mod tokio_postgres { + use assert_cmd::prelude::*; + use chrono::{DateTime, Local}; + // use predicates::str::contains; + use refinery::Error; + use refinery::Migration; + use refinery_migrations::AsyncMigrate; + // use refinery::{migrate_from_config, Config, ConfigDbType, Error, Migrate as _, Migration}; + // use std::process::Command; + use tokio::runtime::Runtime; + use ttokio_postgres::NoTls; + + mod embedded { + use refinery::embed_migrations; + embed_migrations!("refinery/tests/sql_migrations"); + } + + mod broken { + use refinery::embed_migrations; + embed_migrations!("refinery/tests/sql_migrations_broken"); + } + + mod missing { + use refinery::embed_migrations; + embed_migrations!("refinery/tests/sql_migrations_missing"); + } + + fn get_migrations() -> Vec { + let migration1 = Migration::from_filename( + "V1__initial.sql", + include_str!("./sql_migrations/V1__initial.sql"), + ) + .unwrap(); + + let migration2 = Migration::from_filename( + "V2__add_cars_table", + include_str!("./sql_migrations/V2__add_cars_table.sql"), + ) + .unwrap(); + + let migration3 = Migration::from_filename( + "V3__add_brand_to_cars_table", + include_str!("./sql_migrations/V3__add_brand_to_cars_table.sql"), + ) + .unwrap(); + + let migration4 = Migration::from_filename( + "V4__add_year_field_to_cars", + &"ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + + vec![migration1, migration2, migration3, migration4] + } + + async fn clean_database() { + let (client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/template1", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + client + .execute( + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='postgres'", + &[], + ) + .await + .unwrap(); + client.execute("DROP DATABASE postgres", &[]).await.unwrap(); + client + .execute("CREATE DATABASE POSTGRES", &[]) + .await + .unwrap(); + } + + struct TearDown; + impl Drop for TearDown { + fn drop(&mut self) { + let mut rt = Runtime::new().unwrap(); + rt.block_on(clean_database()); + } + } + + #[tokio::test] + async fn embedded_creates_migration_table() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + let rows = client + .query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'", &[]) + .await + .unwrap(); + + for row in rows { + let table_name: String = row.get(0); + assert_eq!("refinery_schema_history", table_name); + } + } + + #[tokio::test] + async fn embedded_creates_migration_table_grouped_migrations() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + let rows = client + .query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'", &[]) + .await + .unwrap(); + + for row in rows { + let table_name: String = row.get(0); + assert_eq!("refinery_schema_history", table_name); + } + } + + #[tokio::test] + async fn embedded_applies_migration() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + client + .execute( + "INSERT INTO persons (name, city) VALUES ($1, $2)", + &[&"John Legend", &"New York"], + ) + .await + .unwrap(); + for row in client + .query("SELECT name, city FROM persons", &[]) + .await + .unwrap() + { + let name: String = row.get(0); + let city: String = row.get(1); + assert_eq!("John Legend", name); + assert_eq!("New York", city); + } + } + + #[tokio::test] + async fn embedded_applies_migration_grouped() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + client + .execute( + "INSERT INTO persons (name, city) VALUES ($1, $2)", + &[&"John Legend", &"New York"], + ) + .await + .unwrap(); + for row in client + .query("SELECT name, city FROM persons", &[]) + .await + .unwrap() + { + let name: String = row.get(0); + let city: String = row.get(1); + assert_eq!("John Legend", name); + assert_eq!("New York", city); + } + } + + #[tokio::test] + async fn embedded_updates_schema_history() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + for row in client + .query("SELECT MAX(version) FROM refinery_schema_history", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + + for row in client + .query("SELECT applied_on FROM refinery_schema_history where version=(SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let applied_on: String = row.get(0); + let applied_on = DateTime::parse_from_rfc3339(&applied_on).unwrap().with_timezone(&Local); + assert_eq!(Local::today(), applied_on.date()); + } + } + + #[tokio::test] + async fn embedded_updates_schema_history_grouped() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + for row in client + .query("SELECT MAX(version) FROM refinery_schema_history", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + + for row in client + .query("SELECT applied_on FROM refinery_schema_history where version=(SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let applied_on: String = row.get(0); + let applied_on = DateTime::parse_from_rfc3339(&applied_on).unwrap().with_timezone(&Local); + assert_eq!(Local::today(), applied_on.date()); + } + } + + #[tokio::test] + async fn embedded_updates_to_last_working() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + let result = broken::migrations::runner().run_async(&mut client).await; + + assert!(result.is_err()); + + for row in client + .query("SELECT MAX(version) FROM refinery_schema_history", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(2, current); + } + } + + #[tokio::test] + async fn mod_creates_migration_table() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + mod_migrations::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + let rows = client + .query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'", &[]) + .await + .unwrap(); + + for row in rows { + let table_name: String = row.get(0); + assert_eq!("refinery_schema_history", table_name); + } + } + + #[tokio::test] + async fn mod_applies_migration() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + mod_migrations::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + client + .execute( + "INSERT INTO persons (name, city) VALUES ($1, $2)", + &[&"John Legend", &"New York"], + ) + .await + .unwrap(); + for row in client + .query("SELECT name, city FROM persons", &[]) + .await + .unwrap() + { + let name: String = row.get(0); + let city: String = row.get(1); + assert_eq!("John Legend", name); + assert_eq!("New York", city); + } + } + + #[tokio::test] + async fn mod_updates_schema_history() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + mod_migrations::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + for row in client + .query("SELECT MAX(version) FROM refinery_schema_history", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + assert_eq!(3, current); + } + + for row in client + .query("SELECT applied_on FROM refinery_schema_history where version=(SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let applied_on: String = row.get(0); + let applied_on = DateTime::parse_from_rfc3339(&applied_on).unwrap().with_timezone(&Local); + assert_eq!(Local::today(), applied_on.date()); + } + } + + #[tokio::test] + async fn applies_new_migration() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + let migration1 = Migration::from_filename( + "V1__initial.sql", + include_str!("./sql_migrations/V1__initial.sql"), + ) + .unwrap(); + + let migration2 = Migration::from_filename( + "V2__add_cars_table", + include_str!("./sql_migrations/V2__add_cars_table.sql"), + ) + .unwrap(); + + let migration3 = Migration::from_filename( + "V3__add_brand_to_cars_table", + include_str!("./sql_migrations/V3__add_brand_to_cars_table.sql"), + ) + .unwrap(); + + let migration4 = Migration::from_filename( + "V4__add_year_field_to_cars", + &"ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + let mchecksum = migration4.checksum(); + client + .migrate( + &[migration1, migration2, migration3, migration4], + true, + true, + false, + ) + .await + .unwrap(); + + for row in client + .query("SELECT version, checksum FROM refinery_schema_history where version = (SELECT MAX(version) from refinery_schema_history)", &[]) + .await + .unwrap() + { + let current: i32 = row.get(0); + let checksum: String = row.get(1); + assert_eq!(4, current); + assert_eq!(mchecksum.to_string(), checksum); + } + } + + #[tokio::test] + async fn aborts_on_missing_migration_on_filesystem() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + mod_migrations::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration = Migration::from_filename( + "V4__add_year_field_to_cars", + &"ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + let err = client + .migrate(&[migration.clone()], true, true, false) + .await + .unwrap_err(); + + match err { + Error::MissingVersion(missing) => { + assert_eq!(1, missing.version); + assert_eq!("initial", missing.name); + } + _ => panic!("failed test"), + } + } + + #[tokio::test] + async fn aborts_on_divergent_migration() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + mod_migrations::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration = Migration::from_filename( + "V2__add_year_field_to_cars", + &"ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + let err = client + .migrate(&[migration.clone()], true, false, false) + .await + .unwrap_err(); + + match err { + Error::DivergentVersion(applied, divergent) => { + assert_eq!(migration, divergent); + assert_eq!(2, applied.version); + assert_eq!("add_cars_table", applied.name); + } + _ => panic!("failed test"), + } + } + + #[tokio::test] + async fn aborts_on_missing_migration_on_database() { + TearDown {}; + let (mut client, connection) = + ttokio_postgres::connect("postgres://postgres@localhost:5432/postgres", NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + connection.await.unwrap(); + }); + + missing::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migration1 = Migration::from_filename( + "V1__initial", + concat!( + "CREATE TABLE persons (", + "id int,", + "name varchar(255),", + "city varchar(255)", + ");" + ), + ) + .unwrap(); + + let migration2 = Migration::from_filename( + "V2__add_cars_table", + include_str!("./sql_migrations_missing/V2__add_cars_table.sql"), + ) + .unwrap(); + let err = client + .migrate(&[migration1, migration2], true, true, false) + .await + .unwrap_err(); + match err { + Error::MissingVersion(missing) => { + assert_eq!(1, missing.version); + assert_eq!("initial", missing.name); + } + _ => panic!("failed test"), + } + } +} diff --git a/refinery_migrations/Cargo.toml b/refinery_migrations/Cargo.toml index db666fbb..77853ea1 100644 --- a/refinery_migrations/Cargo.toml +++ b/refinery_migrations/Cargo.toml @@ -9,17 +9,19 @@ repository = "https://github.com/rust-db/refinery" edition = "2018" [dependencies] -lazy_static = "1.3.0" -regex = "1.1.6" -log = "0.4.6" +lazy_static = "1.3" +regex = "1.1" +log = "0.4" rusqlite = {version = "0.18", optional = true} postgres = {version = "0.15", optional = true} -mysql = {version = "16.0.2", optional = true} -chrono = "0.4.6" -walkdir = "2.2.7" +mysql = {version = "16.0", optional = true} +chrono = "0.4" +walkdir = "2.2" serde = { version = "1.0", features = ["derive"] } -cfg-if = "0.1.9" -thiserror = "1.0.6" +cfg-if = "0.1" +thiserror = "1.0" +async-trait = "0.1" +tokio-postgres = { version = "0.5.0-alpha.2", optional = true } [dev-dependencies] -tempdir = "0.3.7" +tempdir = "0.3" diff --git a/refinery_migrations/src/async_traits.rs b/refinery_migrations/src/async_traits.rs new file mode 100644 index 00000000..1ad8f7fc --- /dev/null +++ b/refinery_migrations/src/async_traits.rs @@ -0,0 +1,114 @@ +use crate::traits::{check_missing_divergent, ASSERT_MIGRATIONS_TABLE, GET_APPLIED_MIGRATIONS}; +use crate::{AppliedMigration, Error, Migration, WrapMigrationError}; + +use async_trait::async_trait; +use chrono::Local; + +#[async_trait] +pub trait AsyncTransaction { + type Error: std::error::Error + Send + Sync + 'static; + + async fn execute(&mut self, query: &str) -> Result; +} + +#[async_trait] +pub trait AsyncQuery: AsyncTransaction { + async fn query(&mut self, query: &str) -> Result, Self::Error>; +} + +#[async_trait] +pub trait AsyncExecuteMultiple: AsyncTransaction { + async fn execute_multiple(&mut self, queries: &[&str]) -> Result; +} + +async fn migrate( + transaction: &mut T, + migrations: Vec, +) -> Result<(), Error> { + for migration in migrations.iter() { + log::info!("applying migration: {}", migration); + let update_query = &format!( + "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", + migration.version, migration.name, Local::now().to_rfc3339(), migration.checksum().to_string()); + transaction + .execute_multiple(&[&migration.sql, update_query]) + .await + .migration_err(&format!("error applying migration {}", migration))?; + } + Ok(()) +} + +async fn migrate_grouped( + transaction: &mut T, + migrations: Vec, +) -> Result<(), Error> { + let mut grouped_migrations = Vec::new(); + let mut display_migrations = Vec::new(); + for migration in migrations.into_iter() { + let query = format!( + "INSERT INTO refinery_schema_history (version, name, applied_on, checksum) VALUES ({}, '{}', '{}', '{}')", + migration.version, migration.name, Local::now().to_rfc3339(), migration.checksum().to_string() + ); + display_migrations.push(migration.to_string()); + grouped_migrations.push(migration.sql); + grouped_migrations.push(query); + } + log::info!( + "going to apply batch migrations in single transaction: {:#?}", + display_migrations + ); + + let refs: Vec<&str> = grouped_migrations.iter().map(AsRef::as_ref).collect(); + + transaction + .execute_multiple(refs.as_ref()) + .await + .migration_err("error applying migrations")?; + + Ok(()) +} + +#[async_trait] +pub trait AsyncMigrate: AsyncQuery> + AsyncExecuteMultiple +where + Self: Sized, +{ + async fn migrate( + &mut self, + migrations: &[Migration], + abort_divergent: bool, + abort_missing: bool, + grouped: bool, + ) -> Result<(), Error> { + self.execute(ASSERT_MIGRATIONS_TABLE) + .await + .migration_err("error asserting migrations table")?; + + let applied_migrations = self + .query(GET_APPLIED_MIGRATIONS) + .await + .migration_err("error getting current schema version")? + .unwrap_or_default(); + + let migrations = check_missing_divergent( + applied_migrations, + migrations.to_vec(), + abort_divergent, + abort_missing, + )?; + + if migrations.is_empty() { + log::info!("no migrations to apply"); + } + + if grouped { + migrate(self, migrations).await? + } else { + migrate_grouped(self, migrations).await? + } + + Ok(()) + } +} + +impl AsyncMigrate for T where T: AsyncQuery> + AsyncExecuteMultiple {} diff --git a/refinery_migrations/src/lib.rs b/refinery_migrations/src/lib.rs index c5846593..0d8461ba 100644 --- a/refinery_migrations/src/lib.rs +++ b/refinery_migrations/src/lib.rs @@ -1,3 +1,4 @@ +mod async_traits; mod config; mod error; mod traits; @@ -9,11 +10,10 @@ use std::collections::hash_map::DefaultHasher; use std::fmt; use std::hash::{Hash, Hasher}; +pub use async_traits::AsyncMigrate; pub use config::{Config, ConfigDbType, Main}; pub use error::{Error, WrapMigrationError}; -pub use traits::{ - CommitTransaction, ExecuteMultiple, Migrate, MigrateGrouped, Query, Transaction, -}; +pub use traits::{CommitTransaction, ExecuteMultiple, Migrate, MigrateGrouped, Query, Transaction}; use utils::RE; pub use utils::{file_match_re, find_migrations_filenames, MigrationType}; @@ -23,6 +23,9 @@ pub use utils::migrate_from_config; #[cfg(feature = "rusqlite")] pub mod rusqlite; +// #[cfg(feature = "tokio-postgres")] +pub mod tokio_postgres; + #[cfg(feature = "postgres")] pub mod postgres; @@ -205,4 +208,19 @@ impl Runner { } Ok(()) } + + /// Runs the Migrations asynchronously in the supplied database connection + pub async fn run_async<'a, C>(&self, conn: &'a mut C) -> Result<(), Error> + where + C: AsyncMigrate + Send, + { + AsyncMigrate::migrate( + conn, + &self.migrations, + self.abort_divergent, + self.abort_missing, + self.grouped, + ) + .await + } } diff --git a/refinery_migrations/src/tokio_postgres.rs b/refinery_migrations/src/tokio_postgres.rs new file mode 100644 index 00000000..c8859f3a --- /dev/null +++ b/refinery_migrations/src/tokio_postgres.rs @@ -0,0 +1,70 @@ +use crate::async_traits::{AsyncExecuteMultiple, AsyncQuery, AsyncTransaction}; +use crate::AppliedMigration; +use async_trait::async_trait; +use chrono::{DateTime, Local}; +use tokio_postgres::error::Error as PgError; +use tokio_postgres::{Client, Transaction as PgTransaction}; + +async fn query_applied_migrations( + transaction: &PgTransaction<'_>, + query: &str, +) -> Result, PgError> { + let rows = transaction.query(query, &[]).await?; + let mut applied = Vec::new(); + for row in rows.into_iter() { + let version: i32 = row.get(0); + let applied_on: String = row.get(2); + let applied_on = DateTime::parse_from_rfc3339(&applied_on) + .unwrap() + .with_timezone(&Local); + + applied.push(AppliedMigration { + version: version as usize, + name: row.get(1), + applied_on, + checksum: row.get(3), + }); + } + Ok(applied) +} + +#[async_trait] +impl AsyncTransaction for Client { + type Error = PgError; + + async fn execute(&mut self, query: &str) -> Result { + let transaction = self.transaction().await?; + let count = transaction.execute(query, &[]).await?; + transaction.commit().await?; + Ok(count as usize) + } +} + +#[async_trait] +impl AsyncQuery> for Client { + async fn query( + &mut self, + query: &str, + ) -> Result>, ::Error> { + let transaction = self.transaction().await?; + let applied = query_applied_migrations(&transaction, query).await?; + transaction.commit().await?; + Ok(Some(applied)) + } +} + +#[async_trait] +impl AsyncExecuteMultiple for Client { + async fn execute_multiple( + &mut self, + queries: &[&str], + ) -> Result::Error> { + let transaction = self.transaction().await?; + let mut count = 0; + for query in queries.into_iter() { + count += transaction.execute(*query, &[]).await?; + } + transaction.commit().await?; + Ok(count as usize) + } +} diff --git a/refinery_migrations/src/traits.rs b/refinery_migrations/src/traits.rs index 79f54214..0999813f 100644 --- a/refinery_migrations/src/traits.rs +++ b/refinery_migrations/src/traits.rs @@ -33,7 +33,7 @@ pub trait Query: Transaction { //checks for missing migrations on filesystem or apllied migrations with a different name and checksum but same version //if abort_divergent or abort_missing are true returns Err on those cases, else returns the list of migrations to be applied -fn check_missing_divergent( +pub fn check_missing_divergent( applied: Vec, mut migrations: Vec, abort_divergent: bool,