Skip to content

Commit

Permalink
Add purge_queue function (extension, ext client) (#96)
Browse files Browse the repository at this point in the history
* Add purge_queue function to extension

* Bump core

* Return deleted count on purge

* Add purge_queue in pgmq-rs ext
  • Loading branch information
v0idpwn authored Aug 31, 2023
1 parent 9ab6abb commit 3c0730c
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
5 changes: 5 additions & 0 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub fn create_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
))
}

pub fn purge_queue(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!("DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name};"))
}

pub fn enqueue(
name: &str,
messages: &[serde_json::Value],
Expand Down
4 changes: 2 additions & 2 deletions pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.20.0"
version = "0.22.0"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand All @@ -12,7 +12,7 @@ readme = "README.md"
repository = "https://github.com/tembo-io/pgmq"

[dependencies]
pgmq_core = { package = "pgmq-core", version = "0.1.0" }
pgmq_core = { package = "pgmq-core", version = "0.2.0" }
chrono = { version = "0.4.23", features = [ "serde" ] }
serde = { version = "1.0.152" }
serde_json = { version = "1.0.91", features = [ "raw_value" ] }
Expand Down
20 changes: 20 additions & 0 deletions pgmq-rs/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@
},
"query": "SELECT * from pgmq_delete($1::text, $2::bigint[])"
},
"98463803e1b4548d758476cce697814fb8411f47f899043ece8a7a6bee71d84d": {
"describe": {
"columns": [
{
"name": "pgmq_purge_queue",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT * from pgmq_purge_queue($1::text);"
},
"9919286cee87946b387f69e67df94f94eb0acdd3b5f4848faf092c55d484b61a": {
"describe": {
"columns": [
Expand Down
8 changes: 8 additions & 0 deletions pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ impl PGMQueue {
Ok(num_deleted)
}

// TODO: release when pgmq-core 0.3.0 is released
//pub async fn purge(&self, queue_name: &str) -> Result<u64, PgmqError> {
// let query = &core_query::purge_queue(queue_name)?;
// let row = sqlx::query(query).execute(&self.connection).await?;
// let num_deleted = row.rows_affected();
// Ok(num_deleted)
//}

/// Moves a message, by message id, from the queue table to archive table
/// View messages on the archive table with sql:
/// ```sql
Expand Down
10 changes: 10 additions & 0 deletions pgmq-rs/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ impl PGMQueueExt {
Ok(())
}

/// Drop an existing queue table.
pub async fn purge_queue(&self, queue_name: &str) -> Result<i64, PgmqError> {
check_input(queue_name)?;
let purged = sqlx::query!("SELECT * from pgmq_purge_queue($1::text);", queue_name)
.fetch_one(&self.connection)
.await?;

Ok(purged.pgmq_purge_queue.expect("no purged count"))
}

/// List all queues in the Postgres instance.
pub async fn list_queues(&self) -> Result<Option<Vec<PGMQueueMeta>>, PgmqError> {
let queues = sqlx::query!("SELECT * from pgmq_list_queues();")
Expand Down
25 changes: 24 additions & 1 deletion pgmq-rs/tests/pg_ext_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn test_ext_send_delay() {
let vt = 1;
let queue = init_queue_ext(&test_queue).await;
let msg = MyMessage::default();
let msg_id = queue.send_delay(&test_queue, &msg, 5).await.unwrap();
queue.send_delay(&test_queue, &msg, 5).await.unwrap();

// No messages are found due to visibility timeout
let no_messages = queue.read::<MyMessage>(&test_queue, vt).await.unwrap();
Expand Down Expand Up @@ -272,6 +272,29 @@ async fn test_ext_delete_batch() {
assert_eq!(delete_result, true);
}

#[tokio::test]
async fn test_ext_purge_queue() {
let test_queue = format!(
"test_ext_purge_queue{}",
rand::thread_rng().gen_range(0..100000)
);

let queue = init_queue_ext(&test_queue).await;
let msg = MyMessage::default();
let _ = queue.send(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();

let purged_count = queue
.purge_queue(&test_queue)
.await
.expect("purge queue error");

assert_eq!(purged_count, 3);
let post_purge_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_purge_rowcount, 0);
}

#[tokio::test]
async fn test_pgmq_init() {
let db_url = env::var("DATABASE_URL")
Expand Down
11 changes: 10 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::errors::PgmqExtError;
use crate::partition::PARTMAN_SCHEMA;
use crate::util;
use pgmq_core::{
query::{destroy_queue, enqueue},
query::{destroy_queue, enqueue, purge_queue},
types::{PGMQ_SCHEMA, TABLE_PREFIX},
};

Expand Down Expand Up @@ -91,3 +91,12 @@ fn pgmq_send_batch(
});
Ok(TableIterator::new(results))
}

#[pg_extern]
fn pgmq_purge_queue(queue_name: String) -> Result<i64, PgmqExtError> {
Spi::connect(|mut client| {
let query = purge_queue(&queue_name)?;
let tup_table = client.update(query.as_str(), None, None)?;
Ok(tup_table.len() as i64)
})
}

0 comments on commit 3c0730c

Please sign in to comment.