diff --git a/Cargo.lock b/Cargo.lock index 8ad0dfa5..5488eef0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "pgmq-core" -version = "0.6.0" +version = "0.6.1" dependencies = [ "chrono", "log", diff --git a/pgmq-rs/Cargo.toml b/pgmq-rs/Cargo.toml index 0c2fcc33..59817350 100644 --- a/pgmq-rs/Cargo.toml +++ b/pgmq-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.22.2" +version = "0.23.0" edition = "2021" authors = ["Tembo.io"] description = "A distributed message queue for Rust applications, on Postgres." @@ -12,7 +12,7 @@ readme = "README.md" repository = "https://github.com/tembo-io/pgmq" [dependencies] -pgmq_core = {package = "pgmq-core", path = "../core" } +pgmq_core = { package = "pgmq-core", version = "0.6.1" } chrono = { version = "0.4.23", features = [ "serde" ] } serde = { version = "1.0.152" } serde_json = { version = "1.0.91", features = [ "raw_value" ] } diff --git a/pgmq-rs/sqlx-data.json b/pgmq-rs/sqlx-data.json index 3063ec0a..3591ae7c 100644 --- a/pgmq-rs/sqlx-data.json +++ b/pgmq-rs/sqlx-data.json @@ -68,6 +68,26 @@ }, "query": "SELECT * from pgmq.read_with_poll($1::text, $2, $3, $4, $5)" }, + "056c21572eb57d3fbc1bdbc94380e391b07bb7674b731305c7f197faedd3feb0": { + "describe": { + "columns": [ + { + "name": "create_unlogged", + "ordinal": 0, + "type_info": "Void" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Text" + ] + } + }, + "query": "SELECT * from pgmq.create_unlogged($1::text);" + }, "07917815df25f1ac06675a645a812d9791b3c1d32098248cf0f34aead4dcebc8": { "describe": { "columns": [ @@ -95,7 +115,7 @@ { "name": "archive", "ordinal": 0, - "type_info": "Bool" + "type_info": "Int8" } ], "nullable": [ @@ -193,7 +213,7 @@ { "name": "delete", "ordinal": 0, - "type_info": "Bool" + "type_info": "Int8" } ], "nullable": [ diff --git a/pgmq-rs/src/lib.rs b/pgmq-rs/src/lib.rs index 82d88bc4..7e7180ae 100644 --- a/pgmq-rs/src/lib.rs +++ b/pgmq-rs/src/lib.rs @@ -211,7 +211,18 @@ impl PGMQueue { /// } pub async fn create(&self, queue_name: &str) -> Result<(), PgmqError> { let mut tx = self.connection.begin().await?; - let setup = query::init_queue_client_only(queue_name)?; + let setup = query::init_queue_client_only(queue_name, false)?; + for q in setup { + sqlx::query(&q).execute(&mut tx).await?; + } + tx.commit().await?; + Ok(()) + } + + /// Create an unlogged queue + pub async fn create_unlogged(&self, queue_name: &str) -> Result<(), PgmqError> { + let mut tx = self.connection.begin().await?; + let setup = query::init_queue_client_only(queue_name, true)?; for q in setup { sqlx::query(&q).execute(&mut tx).await?; } diff --git a/pgmq-rs/src/pg_ext.rs b/pgmq-rs/src/pg_ext.rs index a103b4f2..34d44f0c 100644 --- a/pgmq-rs/src/pg_ext.rs +++ b/pgmq-rs/src/pg_ext.rs @@ -57,6 +57,15 @@ impl PGMQueueExt { Ok(true) } + /// Errors when there is any database error and Ok(false) when the queue already exists. + pub async fn create_unlogged(&self, queue_name: &str) -> Result { + check_input(queue_name)?; + sqlx::query!("SELECT * from pgmq.create_unlogged($1::text);", queue_name) + .execute(&self.connection) + .await?; + Ok(true) + } + /// Create a new partitioned queue. /// Errors when there is any database error and Ok(false) when the queue already exists. pub async fn create_partitioned(&self, queue_name: &str) -> Result { @@ -291,21 +300,23 @@ impl PGMQueueExt { Ok(arch.archive.expect("no archive result")) } - /// Move a message to the archive table. + /// Move a slice of messages to the archive table. pub async fn archive_batch( &self, queue_name: &str, msg_ids: &[i64], - ) -> Result { + ) -> Result { check_input(queue_name)?; - let arch = sqlx::query!( + let qty = sqlx::query!( "SELECT * from pgmq.archive($1::text, $2::bigint[])", queue_name, msg_ids ) - .fetch_one(&self.connection) - .await?; - Ok(arch.archive.expect("no archive result")) + .fetch_all(&self.connection) + .await? + .len(); + + Ok(qty) } // Read and message and immediately delete it. @@ -352,14 +363,17 @@ impl PGMQueueExt { } // Delete with a slice of message ids - pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result { - let row = sqlx::query!( + pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result { + let qty = sqlx::query!( "SELECT * from pgmq.delete($1::text, $2::bigint[])", queue_name, msg_id ) - .fetch_one(&self.connection) - .await?; - Ok(row.delete.expect("no delete result")) + .fetch_all(&self.connection) + .await? + .len(); + + // FIXME: change function signature to Vec and return rows + Ok(qty) } } diff --git a/pgmq-rs/src/query.rs b/pgmq-rs/src/query.rs index f53cbaf7..7a57b07a 100644 --- a/pgmq-rs/src/query.rs +++ b/pgmq-rs/src/query.rs @@ -2,16 +2,19 @@ use pgmq_core::{errors, query, util}; -pub fn init_queue_client_only(name: &str) -> Result, errors::PgmqError> { +pub fn init_queue_client_only( + name: &str, + is_unlogged: bool, +) -> Result, errors::PgmqError> { let name = util::CheckedName::new(name)?; Ok(vec![ query::create_schema(), query::create_meta(), - query::create_queue(name)?, + query::create_queue(name, is_unlogged)?, query::create_index(name)?, query::create_archive(name)?, query::create_archive_index(name)?, - query::insert_meta(name, false)?, + query::insert_meta(name, false, is_unlogged)?, query::grant_pgmon_meta(), query::grant_pgmon_queue(name)?, ]) diff --git a/pgmq-rs/tests/pg_ext_integration_test.rs b/pgmq-rs/tests/pg_ext_integration_test.rs index 3ef294a5..9838a6a6 100644 --- a/pgmq-rs/tests/pg_ext_integration_test.rs +++ b/pgmq-rs/tests/pg_ext_integration_test.rs @@ -273,7 +273,7 @@ async fn test_ext_archive_batch() { let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await; assert_eq!(post_archive_rowcount, 0); - assert_eq!(archive_result, true); + assert_eq!(archive_result, 3); let post_archive_archive_rowcount = archive_rowcount(&test_queue, &queue.connection).await; assert_eq!(post_archive_archive_rowcount, 3); @@ -297,7 +297,7 @@ async fn test_ext_delete_batch() { .expect("delete batch error"); let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await; assert_eq!(post_delete_rowcount, 0); - assert_eq!(delete_result, true); + assert_eq!(delete_result, 3); } #[tokio::test]