diff --git a/Cargo.lock b/Cargo.lock index 8dd2d3d9..f57c2484 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,7 +1243,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.22.0" +version = "0.23.0" dependencies = [ "chrono", "pgmq-core", @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "pgmq-core" -version = "0.3.0" +version = "0.4.0" dependencies = [ "chrono", "log", diff --git a/Cargo.toml b/Cargo.toml index a0c3e896..bbb92045 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.22.0" +version = "0.23.0" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/core/Cargo.toml b/core/Cargo.toml index b1f8852b..bc2b67c2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq-core" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["Tembo.io"] description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension" diff --git a/core/src/query.rs b/core/src/query.rs index 6e23ecd1..d9ba0c54 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -229,16 +229,6 @@ pub fn read(name: &str, vt: i32, limit: i32) -> Result { )) } -pub fn delete(name: &str, msg_id: i64) -> Result { - check_input(name)?; - Ok(format!( - " - DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE msg_id = {msg_id}; - " - )) -} - pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime) -> Result { check_input(name)?; Ok(format!( @@ -252,60 +242,33 @@ pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime) -> Result Result { +pub fn delete_batch(name: &str) -> Result { // construct string of comma separated msg_id check_input(name)?; - let mut msg_id_list: String = "".to_owned(); - for msg_id in msg_ids.iter() { - let id_str = format!("{msg_id},"); - msg_id_list.push_str(&id_str) - } - // drop trailing comma from constructed string - msg_id_list.pop(); - Ok(format!( - " - DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE msg_id in ({msg_id_list}); - " - )) -} -pub fn archive(name: &str, msg_id: i64) -> Result { - check_input(name)?; Ok(format!( " - WITH archived AS ( - DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE msg_id = {msg_id} - RETURNING msg_id, vt, read_ct, enqueued_at, message - ) - INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message) - SELECT msg_id, vt, read_ct, enqueued_at, message - FROM archived; + DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + WHERE msg_id = ANY($1) + RETURNING msg_id; " )) } -pub fn archive_batch(name: &str, msg_ids: &[i64]) -> Result { +pub fn archive_batch(name: &str) -> Result { check_input(name)?; - let mut msg_id_list: String = "".to_owned(); - for msg_id in msg_ids.iter() { - let id_str = format!("{msg_id},"); - msg_id_list.push_str(&id_str) - } - // drop trailing comma from constructed string - msg_id_list.pop(); Ok(format!( " WITH archived AS ( DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE msg_id in ({msg_id_list}) + WHERE msg_id = ANY($1) RETURNING msg_id, vt, read_ct, enqueued_at, message ) INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message) SELECT msg_id, vt, read_ct, enqueued_at, message - FROM archived; + FROM archived + RETURNING msg_id; " )) } @@ -450,33 +413,6 @@ $$ LANGUAGE plpgsql; assert!(query.contains(&vt.to_string())); } - #[test] - fn test_delete() { - let qname = "myqueue"; - let msg_id: i64 = 42; - - let query = delete(&qname, msg_id).unwrap(); - - assert!(query.contains(&qname)); - assert!(query.contains(&msg_id.to_string())); - } - - #[test] - fn test_delete_batch() { - let mut msg_ids: Vec = Vec::new(); - let qname = "myqueue"; - msg_ids.push(42); - msg_ids.push(43); - msg_ids.push(44); - - let query = delete_batch(&qname, &msg_ids).unwrap(); - - assert!(query.contains(&qname)); - for id in msg_ids.iter() { - assert!(query.contains(&id.to_string())); - } - } - #[test] fn check_input_rejects_names_too_large() { let table_name = "my_valid_table_name"; diff --git a/pgmq-rs/tests/pg_ext_integration_test.rs b/pgmq-rs/tests/pg_ext_integration_test.rs index 16de10ff..2bb2211b 100644 --- a/pgmq-rs/tests/pg_ext_integration_test.rs +++ b/pgmq-rs/tests/pg_ext_integration_test.rs @@ -157,7 +157,7 @@ async fn test_ext_send_read_delete() { .expect("failed to delete"); assert!(deleted); - // try to delete a message that doesnt exist + // try to delete a message that doesn't exist let deleted = queue .delete(&test_queue, msg_id_del) .await diff --git a/sql/pgmq--0.22.0--0.23.0.sql b/sql/pgmq--0.22.0--0.23.0.sql new file mode 100644 index 00000000..8d1fd821 --- /dev/null +++ b/sql/pgmq--0.22.0--0.23.0.sql @@ -0,0 +1,30 @@ +ALTER EXTENSION pgmq DROP FUNCTION pgmq_delete(TEXT, bigint[]); +DROP FUNCTION pgmq_delete(TEXT, bigint[]); + +-- src/lib.rs:215 +-- pgmq::pgmq_delete +CREATE FUNCTION "pgmq_delete"( + "queue_name" TEXT, /* &str */ + "msg_ids" bigint[] /* alloc::vec::Vec */ +) RETURNS TABLE ( + "pgmq_delete" bool /* bool */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_delete_batch_wrapper'; + + +ALTER EXTENSION pgmq DROP FUNCTION pgmq_archive(TEXT, bigint[]); +DROP FUNCTION pgmq_archive(TEXT, bigint[]); + +-- src/lib.rs:260 +-- pgmq::pgmq_archive +CREATE FUNCTION "pgmq_archive"( + "queue_name" TEXT, /* &str */ + "msg_ids" bigint[] /* alloc::vec::Vec */ +) RETURNS TABLE ( + "pgmq_archive" bool /* bool */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_archive_batch_wrapper'; diff --git a/src/lib.rs b/src/lib.rs index 56732a28..54f6f398 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ pub mod partition; pub mod util; use pgmq_core::{ - query::{archive, archive_batch, delete, delete_batch, enqueue, init_queue, pop, read}, + query::{archive_batch, delete_batch, enqueue, init_queue, pop, read}, types::TABLE_PREFIX, util::check_input, }; @@ -209,81 +209,91 @@ fn readit( #[pg_extern] fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { - let mut num_deleted = 0; - let query = delete(queue_name, msg_id)?; - Spi::connect(|mut client| { - let tup_table = client.update(&query, None, None); - match tup_table { - Ok(tup_table) => num_deleted = tup_table.len(), - Err(e) => { - error!("error deleting message: {}", e); - } - } - }); - match num_deleted { - 1 => Ok(Some(true)), - 0 => { - warning!("no message found with msg_id: {}", msg_id); - Ok(Some(false)) - } - _ => { - error!("multiple messages found with msg_id: {}", msg_id); - } - } + pgmq_delete_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) } #[pg_extern(name = "pgmq_delete")] -fn pgmq_delete_batch(queue_name: &str, msg_ids: Vec) -> Result, PgmqExtError> { - let query = delete_batch(queue_name, &msg_ids)?; - Spi::connect(|mut client| { - let tup_table = client.update(&query, None, None); - match tup_table { - Ok(_) => Ok(Some(true)), - Err(e) => { - error!("error deleting message: {}", e); - } +fn pgmq_delete_batch( + queue_name: &str, + msg_ids: Vec, +) -> Result, PgmqExtError> { + let query = delete_batch(queue_name)?; + + let mut deleted: Vec = Vec::new(); + let _: Result<(), spi::Error> = Spi::connect(|mut client| { + let tup_table = client.update( + &query, + None, + Some(vec![( + PgBuiltInOids::INT8ARRAYOID.oid(), + msg_ids.clone().into_datum(), + )]), + )?; + + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + deleted.push(msg_id); } - }) + Ok(()) + }); + + let results = msg_ids + .iter() + .map(|msg_id| { + if deleted.contains(msg_id) { + (true,) + } else { + (false,) + } + }) + .collect::>(); + + Ok(TableIterator::new(results)) } /// archive a message forever instead of deleting it #[pg_extern] fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { - let mut num_deleted = 0; - let query = archive(queue_name, msg_id)?; - Spi::connect(|mut client| { - let tup_table = client.update(&query, None, None); - match tup_table { - Ok(tup_table) => num_deleted = tup_table.len(), - Err(e) => { - error!("error deleting message: {}", e); - } - } - }); - match num_deleted { - 1 => Ok(Some(true)), - 0 => { - warning!("no message found with msg_id: {}", msg_id); - Ok(Some(false)) - } - _ => { - error!("multiple messages found with msg_id: {}", msg_id); - } - } + pgmq_archive_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) } #[pg_extern(name = "pgmq_archive")] -fn pgmq_archive_batch(queue_name: &str, msg_ids: Vec) -> Result, PgmqExtError> { - let query = archive_batch(queue_name, &msg_ids)?; - Spi::connect(|mut client| { - let tup_table = client.update(&query, None, None); - match tup_table { - Ok(_) => Ok(Some(true)), - Err(e) => { - error!("error deleting message: {}", e); - } +fn pgmq_archive_batch( + queue_name: &str, + msg_ids: Vec, +) -> Result, PgmqExtError> { + let query = archive_batch(queue_name)?; + + let mut archived: Vec = Vec::new(); + let _: Result<(), spi::Error> = Spi::connect(|mut client| { + let tup_table: SpiTupleTable = client.update( + &query, + None, + Some(vec![( + PgBuiltInOids::INT8ARRAYOID.oid(), + msg_ids.clone().into_datum(), + )]), + )?; + + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + archived.push(msg_id); } - }) + Ok(()) + }); + + let results = msg_ids + .iter() + .map(|msg_id| { + if archived.contains(&msg_id) { + (true,) + } else { + (false,) + } + }) + .collect::>(); + + Ok(TableIterator::new(results)) } // reads and deletes at same time @@ -408,7 +418,7 @@ mod tests { use pgmq_core::types::TABLE_PREFIX; #[pg_test] - fn test_creat_non_partitioned() { + fn test_create_non_partitioned() { let qname = r#"test_queue"#; let _ = pgmq_create_non_partitioned(&qname).unwrap(); let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) @@ -573,7 +583,6 @@ mod tests { #[pg_test] fn test_archive() { let qname = r#"test_archive"#; - let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed"); let _ = pgmq_create_non_partitioned(&qname).unwrap(); // no messages in the queue let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) @@ -606,6 +615,50 @@ mod tests { assert_eq!(retval.unwrap(), 1); } + #[pg_test] + fn test_archive_batch() { + let qname = r#"test_archive_batch"#; + let _ = pgmq_create_non_partitioned(&qname).unwrap(); + // no messages in the queue + let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // no messages in queue archive + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // put messages on the queue + let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) + .unwrap() + .unwrap(); + let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) + .unwrap() + .unwrap(); + let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 2); + + // archive the message. The first two exist so should return true, the + // last one doesn't so should return false. + let mut archived = pgmq_archive_batch(&qname, vec![msg_id1, msg_id2, -1]).unwrap(); + assert!(archived.next().unwrap().0); + assert!(archived.next().unwrap().0); + assert!(!archived.next().unwrap().0); + + // should be no messages left on the queue table + let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // but two on the archive table + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 2); + } + #[pg_test] fn test_validate_same_type() { let invalid = validate_same_type("10", "daily");