Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unlogged queues support in pgmq-rs, support newer ext versions on ext client #125

Merged
merged 10 commits into from
Sep 27, 2023
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.22.2"
version = "0.23.0"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand All @@ -12,7 +12,7 @@ readme = "README.md"
repository = "https://github.com/tembo-io/pgmq"

[dependencies]
pgmq_core = {package = "pgmq-core", path = "../core" }
pgmq_core = { package = "pgmq-core", version = "0.6.1" }
chrono = { version = "0.4.23", features = [ "serde" ] }
serde = { version = "1.0.152" }
serde_json = { version = "1.0.91", features = [ "raw_value" ] }
Expand Down
24 changes: 22 additions & 2 deletions pgmq-rs/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@
},
"query": "SELECT * from pgmq.read_with_poll($1::text, $2, $3, $4, $5)"
},
"056c21572eb57d3fbc1bdbc94380e391b07bb7674b731305c7f197faedd3feb0": {
"describe": {
"columns": [
{
"name": "create_unlogged",
"ordinal": 0,
"type_info": "Void"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT * from pgmq.create_unlogged($1::text);"
},
"07917815df25f1ac06675a645a812d9791b3c1d32098248cf0f34aead4dcebc8": {
"describe": {
"columns": [
Expand Down Expand Up @@ -95,7 +115,7 @@
{
"name": "archive",
"ordinal": 0,
"type_info": "Bool"
"type_info": "Int8"
}
],
"nullable": [
Expand Down Expand Up @@ -193,7 +213,7 @@
{
"name": "delete",
"ordinal": 0,
"type_info": "Bool"
"type_info": "Int8"
}
],
"nullable": [
Expand Down
13 changes: 12 additions & 1 deletion pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,18 @@ impl PGMQueue {
/// }
pub async fn create(&self, queue_name: &str) -> Result<(), PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::init_queue_client_only(queue_name)?;
let setup = query::init_queue_client_only(queue_name, false)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
tx.commit().await?;
Ok(())
}

/// Create an unlogged queue
pub async fn create_unlogged(&self, queue_name: &str) -> Result<(), PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::init_queue_client_only(queue_name, true)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
Expand Down
36 changes: 25 additions & 11 deletions pgmq-rs/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ impl PGMQueueExt {
Ok(true)
}

/// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create_unlogged(&self, queue_name: &str) -> Result<bool, PgmqError> {
check_input(queue_name)?;
sqlx::query!("SELECT * from pgmq.create_unlogged($1::text);", queue_name)
.execute(&self.connection)
.await?;
Ok(true)
}

/// Create a new partitioned queue.
/// Errors when there is any database error and Ok(false) when the queue already exists.
pub async fn create_partitioned(&self, queue_name: &str) -> Result<bool, PgmqError> {
Expand Down Expand Up @@ -291,21 +300,23 @@ impl PGMQueueExt {
Ok(arch.archive.expect("no archive result"))
}

/// Move a message to the archive table.
/// Move a slice of messages to the archive table.
pub async fn archive_batch(
&self,
queue_name: &str,
msg_ids: &[i64],
) -> Result<bool, PgmqError> {
) -> Result<usize, PgmqError> {
check_input(queue_name)?;
let arch = sqlx::query!(
let qty = sqlx::query!(
"SELECT * from pgmq.archive($1::text, $2::bigint[])",
queue_name,
msg_ids
)
.fetch_one(&self.connection)
.await?;
Ok(arch.archive.expect("no archive result"))
.fetch_all(&self.connection)
.await?
.len();

Ok(qty)
}

// Read and message and immediately delete it.
Expand Down Expand Up @@ -352,14 +363,17 @@ impl PGMQueueExt {
}

// Delete with a slice of message ids
pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result<bool, PgmqError> {
let row = sqlx::query!(
pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result<usize, PgmqError> {
let qty = sqlx::query!(
"SELECT * from pgmq.delete($1::text, $2::bigint[])",
queue_name,
msg_id
)
.fetch_one(&self.connection)
.await?;
Ok(row.delete.expect("no delete result"))
.fetch_all(&self.connection)
.await?
.len();

// FIXME: change function signature to Vec<i64> and return rows
Ok(qty)
}
}
9 changes: 6 additions & 3 deletions pgmq-rs/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

use pgmq_core::{errors, query, util};

pub fn init_queue_client_only(name: &str) -> Result<Vec<String>, errors::PgmqError> {
pub fn init_queue_client_only(
name: &str,
is_unlogged: bool,
) -> Result<Vec<String>, errors::PgmqError> {
let name = util::CheckedName::new(name)?;
Ok(vec![
query::create_schema(),
query::create_meta(),
query::create_queue(name)?,
query::create_queue(name, is_unlogged)?,
query::create_index(name)?,
query::create_archive(name)?,
query::create_archive_index(name)?,
query::insert_meta(name, false)?,
query::insert_meta(name, false, is_unlogged)?,
query::grant_pgmon_meta(),
query::grant_pgmon_queue(name)?,
])
Expand Down
4 changes: 2 additions & 2 deletions pgmq-rs/tests/pg_ext_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ async fn test_ext_archive_batch() {
let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await;

assert_eq!(post_archive_rowcount, 0);
assert_eq!(archive_result, true);
assert_eq!(archive_result, 3);

let post_archive_archive_rowcount = archive_rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_archive_archive_rowcount, 3);
Expand All @@ -297,7 +297,7 @@ async fn test_ext_delete_batch() {
.expect("delete batch error");
let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_delete_rowcount, 0);
assert_eq!(delete_result, true);
assert_eq!(delete_result, 3);
}

#[tokio::test]
Expand Down