Skip to content

Commit

Permalink
Return bools from archive/delete_batch (#95)
Browse files Browse the repository at this point in the history
* Return booleans from archive/delete_batch

* add archive_batch test

* Refactor archive in terms of archive_batch

* fix table names

* Remove obsolete test

* fix typo

* lint

* Replace IN with = ANY

* bump versions and add migration scripts

* try

* add archive

* in prep for rebase

---------

Co-authored-by: Adam Hendel <[email protected]>
  • Loading branch information
craigpangea and ChuckHend authored Sep 3, 2023
1 parent 553d8ba commit a4bd31c
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 142 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.22.0"
version = "0.23.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
80 changes: 8 additions & 72 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,6 @@ pub fn read(name: &str, vt: i32, limit: i32) -> Result<String, PgmqError> {
))
}

pub fn delete(name: &str, msg_id: i64) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id = {msg_id};
"
))
}

pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime<Utc>) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
Expand All @@ -252,60 +242,33 @@ pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime<Utc>) -> Result<Stri
))
}

pub fn delete_batch(name: &str, msg_ids: &[i64]) -> Result<String, PgmqError> {
pub fn delete_batch(name: &str) -> Result<String, PgmqError> {
// construct string of comma separated msg_id
check_input(name)?;
let mut msg_id_list: String = "".to_owned();
for msg_id in msg_ids.iter() {
let id_str = format!("{msg_id},");
msg_id_list.push_str(&id_str)
}
// drop trailing comma from constructed string
msg_id_list.pop();
Ok(format!(
"
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id in ({msg_id_list});
"
))
}

pub fn archive(name: &str, msg_id: i64) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id = {msg_id}
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived;
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id = ANY($1)
RETURNING msg_id;
"
))
}

pub fn archive_batch(name: &str, msg_ids: &[i64]) -> Result<String, PgmqError> {
pub fn archive_batch(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
let mut msg_id_list: String = "".to_owned();
for msg_id in msg_ids.iter() {
let id_str = format!("{msg_id},");
msg_id_list.push_str(&id_str)
}
// drop trailing comma from constructed string
msg_id_list.pop();

Ok(format!(
"
WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id in ({msg_id_list})
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived;
FROM archived
RETURNING msg_id;
"
))
}
Expand Down Expand Up @@ -450,33 +413,6 @@ $$ LANGUAGE plpgsql;
assert!(query.contains(&vt.to_string()));
}

#[test]
fn test_delete() {
let qname = "myqueue";
let msg_id: i64 = 42;

let query = delete(&qname, msg_id).unwrap();

assert!(query.contains(&qname));
assert!(query.contains(&msg_id.to_string()));
}

#[test]
fn test_delete_batch() {
let mut msg_ids: Vec<i64> = Vec::new();
let qname = "myqueue";
msg_ids.push(42);
msg_ids.push(43);
msg_ids.push(44);

let query = delete_batch(&qname, &msg_ids).unwrap();

assert!(query.contains(&qname));
for id in msg_ids.iter() {
assert!(query.contains(&id.to_string()));
}
}

#[test]
fn check_input_rejects_names_too_large() {
let table_name = "my_valid_table_name";
Expand Down
2 changes: 1 addition & 1 deletion pgmq-rs/tests/pg_ext_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn test_ext_send_read_delete() {
.expect("failed to delete");
assert!(deleted);

// try to delete a message that doesnt exist
// try to delete a message that doesn't exist
let deleted = queue
.delete(&test_queue, msg_id_del)
.await
Expand Down
30 changes: 30 additions & 0 deletions sql/pgmq--0.22.0--0.23.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
ALTER EXTENSION pgmq DROP FUNCTION pgmq_delete(TEXT, bigint[]);
DROP FUNCTION pgmq_delete(TEXT, bigint[]);

-- src/lib.rs:215
-- pgmq::pgmq_delete
CREATE FUNCTION "pgmq_delete"(
"queue_name" TEXT, /* &str */
"msg_ids" bigint[] /* alloc::vec::Vec<i64> */
) RETURNS TABLE (
"pgmq_delete" bool /* bool */
)
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'pgmq_delete_batch_wrapper';


ALTER EXTENSION pgmq DROP FUNCTION pgmq_archive(TEXT, bigint[]);
DROP FUNCTION pgmq_archive(TEXT, bigint[]);

-- src/lib.rs:260
-- pgmq::pgmq_archive
CREATE FUNCTION "pgmq_archive"(
"queue_name" TEXT, /* &str */
"msg_ids" bigint[] /* alloc::vec::Vec<i64> */
) RETURNS TABLE (
"pgmq_archive" bool /* bool */
)
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'pgmq_archive_batch_wrapper';
Loading

0 comments on commit a4bd31c

Please sign in to comment.