From ffcdb4600fc8d2273b2d3d6347faf647dcb7d743 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Thu, 5 Oct 2023 00:36:44 +0300 Subject: [PATCH] Add pgmq.detach_archive function --- Cargo.lock | 2 +- Cargo.toml | 2 +- sql/pgmq--0.29.0--0.30.0.sql | 9 +++++++ src/api.rs | 13 ++++++++-- tests/integration_tests.rs | 49 ++++++++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 sql/pgmq--0.29.0--0.30.0.sql diff --git a/Cargo.lock b/Cargo.lock index bf4c1c8d..33132b24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1318,7 +1318,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.28.0" +version = "0.30.0" dependencies = [ "chrono", "pgmq-core", diff --git a/Cargo.toml b/Cargo.toml index e5741a4c..c923985c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.29.0" +version = "0.30.0" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/sql/pgmq--0.29.0--0.30.0.sql b/sql/pgmq--0.29.0--0.30.0.sql new file mode 100644 index 00000000..e073e61f --- /dev/null +++ b/sql/pgmq--0.29.0--0.30.0.sql @@ -0,0 +1,9 @@ +-- New function, copied from schema +-- src/api.rs:26 +-- pgmq::api::detach_archive +CREATE FUNCTION pgmq."detach_archive"( + "queue_name" TEXT /* alloc::string::String */ +) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_detach_archive_wrapper'; diff --git a/src/api.rs b/src/api.rs index d7b2446e..048fb846 100644 --- a/src/api.rs +++ b/src/api.rs @@ -8,9 +8,9 @@ use crate::partition; use crate::partition::PARTMAN_SCHEMA; use pgmq_core::{ - query::{destroy_queue, init_queue}, + query::{destroy_queue, init_queue, unassign_archive}, types::{PGMQ_SCHEMA, QUEUE_PREFIX}, - util::check_input, + util::{check_input, CheckedName}, }; #[pg_extern(name = "drop_queue")] @@ -22,6 +22,15 @@ fn pgmq_drop_queue( Ok(true) } +#[pg_extern(name = "detach_archive")] +fn pgmq_detach_archive(queue_name: String) -> Result<(), PgmqExtError> { + let query = unassign_archive(CheckedName::new(&queue_name)?)?; + Spi::connect(|mut client| { + client.update(query.as_str(), None, None)?; + Ok(()) + }) +} + pub fn delete_queue(queue_name: String, partitioned: bool) -> Result<(), PgmqExtError> { // TODO: we should keep track whether queue is partitioned in pgmq_meta // then read that to determine we want to delete the part_config entries diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 683f813f..3809754d 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -631,6 +631,55 @@ async fn test_transaction_read() { assert!(read_msg3.is_some()); } +// Integration tests are ignored by default +#[ignore] +#[tokio::test] +async fn test_detach_archive() { + let conn = init_database().await; + let queue_name = "detach_archive_queue"; + create_queue(&queue_name.to_string(), &conn).await; + + // Without detach, archive is dropped with the extension + let _ = sqlx::query(&format!("DROP EXTENSION pgmq CASCADE")) + .fetch_one(&conn) + .await; + + let table_exists = sqlx::query(&format!( + "select from pg_tables where schemaname = 'pgmq' and tablename = 'a_{queue_name}'" + )) + .fetch_optional(&conn) + .await + .unwrap(); + + assert!(table_exists.is_none()); + + // With detach, archive remains + let _ = sqlx::query(&format!("CREATE EXTENSION pgmq")) + .fetch_one(&conn) + .await; + + create_queue(&queue_name.to_string(), &conn).await; + + sqlx::query(&format!( + "select from {PGMQ_SCHEMA}.detach_archive('{queue_name}')" + )) + .fetch_one(&conn) + .await + .unwrap(); + + let _ = sqlx::query(&format!("DROP EXTENSION pgmq CASCADE")) + .fetch_one(&conn) + .await; + + let table_exists = sqlx::query(&format!( + "select from pg_tables where schemaname = 'pgmq' and tablename = 'a_{queue_name}'" + )) + .fetch_optional(&conn) + .await + .unwrap(); + + assert!(table_exists.is_some()); +} async fn create_queue(queue_name: &String, conn: &Pool) { sqlx::query(&format!("select from {PGMQ_SCHEMA}.create('{queue_name}')"))