Skip to content

Commit

Permalink
feat: add purge to extensionless client (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn authored Sep 1, 2023
1 parent 3c0730c commit 3cac957
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ readme = "README.md"
repository = "https://github.com/tembo-io/pgmq"

[dependencies]
pgmq_core = { package = "pgmq-core", version = "0.2.0" }
pgmq_core = { package = "pgmq-core", version = "0.3.0" }
chrono = { version = "0.4.23", features = [ "serde" ] }
serde = { version = "1.0.152" }
serde_json = { version = "1.0.91", features = [ "raw_value" ] }
Expand Down
13 changes: 6 additions & 7 deletions pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,13 +743,12 @@ impl PGMQueue {
Ok(num_deleted)
}

// TODO: release when pgmq-core 0.3.0 is released
//pub async fn purge(&self, queue_name: &str) -> Result<u64, PgmqError> {
// let query = &core_query::purge_queue(queue_name)?;
// let row = sqlx::query(query).execute(&self.connection).await?;
// let num_deleted = row.rows_affected();
// Ok(num_deleted)
//}
pub async fn purge(&self, queue_name: &str) -> Result<u64, PgmqError> {
let query = &core_query::purge_queue(queue_name)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
}

/// Moves a message, by message id, from the queue table to archive table
/// View messages on the archive table with sql:
Expand Down
16 changes: 16 additions & 0 deletions pgmq-rs/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,22 @@ async fn test_database_error_modes() {
}
}

#[tokio::test]
async fn test_purge() {
let test_queue = format!("test_purge_{}", rand::thread_rng().gen_range(0..100000));
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();
let _ = queue.send(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();

let purged_count = queue.purge(&test_queue).await.expect("purge queue error");

assert_eq!(purged_count, 3);
let post_purge_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_purge_rowcount, 0);
}

/// test parsing operations that should produce errors
#[tokio::test]
async fn test_parsing_error_modes() {
Expand Down

0 comments on commit 3cac957

Please sign in to comment.