Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove offset casting #40

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.11.1"
version = "0.11.2"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.14.3"
version = "0.14.4"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
18 changes: 8 additions & 10 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn create_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
Expand All @@ -49,8 +49,8 @@ pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
Expand All @@ -63,7 +63,7 @@ pub fn create_meta() -> String {
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (
queue_name VARCHAR UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
"
)
Expand Down Expand Up @@ -183,9 +183,7 @@ pub fn enqueue(
check_input(name)?;
let mut values = "".to_owned();
for message in messages.iter() {
let full_msg = format!(
"((now() at time zone 'utc' + interval '{delay} seconds'), '{message}'::json),"
);
let full_msg = format!("((now() + interval '{delay} seconds'), '{message}'::json),");
values.push_str(&full_msg)
}
// drop trailing comma from constructed string
Expand All @@ -207,14 +205,14 @@ pub fn read(name: &str, vt: &i32, limit: &i32) -> Result<String, PgmqError> {
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
WHERE vt <= now()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
SET
vt = (now() at time zone 'utc' + interval '{vt} seconds'),
vt = now() + interval '{vt} seconds',
read_ct = read_ct + 1
WHERE msg_id in (select msg_id from cte)
RETURNING *;
Expand Down Expand Up @@ -287,7 +285,7 @@ pub fn pop(name: &str) -> Result<String, PgmqError> {
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
WHERE vt <= now()
ORDER BY msg_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
Expand Down
12 changes: 12 additions & 0 deletions sql/pgmq--0.11.1--0.11.2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
DO $$
DECLARE
table_name TEXT;
BEGIN
FOR table_name IN (SELECT queue_name FROM public.pgmq_meta)
LOOP
EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name);
EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive');
EXECUTE format('ALTER TABLE %I ALTER COLUMN deleted_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive');

END LOOP;
END $$;
10 changes: 6 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn enqueue_str(name: &str) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt, message)
VALUES (now() at time zone 'utc', $1)
VALUES (now(), $1)
RETURNING msg_id;
"
))
Expand Down Expand Up @@ -310,7 +310,7 @@ fn pgmq_set_vt(
let query = format!(
"
UPDATE {TABLE_PREFIX}_{queue_name}
SET vt = (now() at time zone 'utc' + interval '{vt_offset} seconds')
SET vt = (now() + interval '{vt_offset} seconds')
WHERE msg_id = $1
RETURNING *;
"
Expand Down Expand Up @@ -442,7 +442,8 @@ mod tests {
let partition_interval = "2".to_owned();
let retention_interval = "2".to_owned();

let _ = Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("SQL select failed");
let _ =
Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("Failed dropping pg_partman");

let failed = pgmq_create_partitioned(
&qname,
Expand All @@ -451,7 +452,8 @@ mod tests {
);
assert!(failed.is_err());

let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed");
let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman")
.expect("Failed creating pg_partman");
let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap();

let queues = api::listit().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn create_partitioned_queue(
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE ({partition_col});
Expand Down
5 changes: 5 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ async fn test_lifecycle() {
.expect("expected message");
assert_eq!(message.msg_id, 1);

let _ = sqlx::query("CREATE EXTENSION IF NOT EXISTS pg_partman")
.execute(&conn)
.await
.expect("failed to create extension");

// CREATE with 5 seconds per partition, 10 seconds retention
let test_duration_queue = format!("test_duration_{test_num}");
let q = format!("SELECT \"pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);");
Expand Down