diff --git a/Cargo.lock b/Cargo.lock index 6ac25080..4e324e45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,10 +1243,10 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.11.1" +version = "0.11.2" dependencies = [ "chrono", - "pgmq 0.14.3", + "pgmq 0.14.4", "pgrx", "pgrx-tests", "rand", @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.14.3" +version = "0.14.4" dependencies = [ "chrono", "log", diff --git a/Cargo.toml b/Cargo.toml index 52bb1fef..25b186bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.11.1" +version = "0.11.2" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/core/Cargo.toml b/core/Cargo.toml index edba5d92..8bb290ee 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.14.3" +version = "0.14.4" edition = "2021" authors = ["Tembo.io"] description = "A distributed message queue for Rust applications, on Postgres." diff --git a/core/src/query.rs b/core/src/query.rs index 6aec4ed0..8d8a982c 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -35,7 +35,7 @@ pub fn create_queue(name: CheckedName<'_>) -> Result { CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} ( msg_id BIGSERIAL NOT NULL, read_ct INT DEFAULT 0 NOT NULL, - enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL, + enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ); @@ -49,8 +49,8 @@ pub fn create_archive(name: CheckedName<'_>) -> Result { CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive ( msg_id BIGSERIAL NOT NULL, read_ct INT DEFAULT 0 NOT NULL, - enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL, - deleted_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL, + enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + deleted_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ); @@ -63,7 +63,7 @@ pub fn create_meta() -> String { " CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta ( queue_name VARCHAR UNIQUE NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL ); " ) @@ -183,9 +183,7 @@ pub fn enqueue( check_input(name)?; let mut values = "".to_owned(); for message in messages.iter() { - let full_msg = format!( - "((now() at time zone 'utc' + interval '{delay} seconds'), '{message}'::json)," - ); + let full_msg = format!("((now() + interval '{delay} seconds'), '{message}'::json),"); values.push_str(&full_msg) } // drop trailing comma from constructed string @@ -207,14 +205,14 @@ pub fn read(name: &str, vt: &i32, limit: &i32) -> Result { ( SELECT msg_id FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE vt <= now() at time zone 'utc' + WHERE vt <= now() ORDER BY msg_id ASC LIMIT {limit} FOR UPDATE SKIP LOCKED ) UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} SET - vt = (now() at time zone 'utc' + interval '{vt} seconds'), + vt = now() + interval '{vt} seconds', read_ct = read_ct + 1 WHERE msg_id in (select msg_id from cte) RETURNING *; @@ -287,7 +285,7 @@ pub fn pop(name: &str) -> Result { ( SELECT msg_id FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} - WHERE vt <= now() at time zone 'utc' + WHERE vt <= now() ORDER BY msg_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/sql/pgmq--0.11.1--0.11.2.sql b/sql/pgmq--0.11.1--0.11.2.sql new file mode 100644 index 00000000..1da05e07 --- /dev/null +++ b/sql/pgmq--0.11.1--0.11.2.sql @@ -0,0 +1,12 @@ +DO $$ +DECLARE + table_name TEXT; +BEGIN + FOR table_name IN (SELECT queue_name FROM public.pgmq_meta) + LOOP + EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name); + EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive'); + EXECUTE format('ALTER TABLE %I ALTER COLUMN deleted_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive'); + + END LOOP; +END $$; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 9436aa24..a5f8ce0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,7 +98,7 @@ fn enqueue_str(name: &str) -> Result { Ok(format!( " INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt, message) - VALUES (now() at time zone 'utc', $1) + VALUES (now(), $1) RETURNING msg_id; " )) @@ -310,7 +310,7 @@ fn pgmq_set_vt( let query = format!( " UPDATE {TABLE_PREFIX}_{queue_name} - SET vt = (now() at time zone 'utc' + interval '{vt_offset} seconds') + SET vt = (now() + interval '{vt_offset} seconds') WHERE msg_id = $1 RETURNING *; " @@ -442,7 +442,8 @@ mod tests { let partition_interval = "2".to_owned(); let retention_interval = "2".to_owned(); - let _ = Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("SQL select failed"); + let _ = + Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("Failed dropping pg_partman"); let failed = pgmq_create_partitioned( &qname, @@ -451,7 +452,8 @@ mod tests { ); assert!(failed.is_err()); - let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed"); + let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman") + .expect("Failed creating pg_partman"); let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap(); let queues = api::listit().unwrap(); diff --git a/src/partition.rs b/src/partition.rs index affa8fad..b002af1e 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -55,7 +55,7 @@ fn create_partitioned_queue( CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} ( msg_id BIGSERIAL NOT NULL, read_ct INT DEFAULT 0 NOT NULL, - enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL, + enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ) PARTITION BY RANGE ({partition_col}); diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index be26b3d8..ac62b733 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -97,6 +97,11 @@ async fn test_lifecycle() { .expect("expected message"); assert_eq!(message.msg_id, 1); + let _ = sqlx::query("CREATE EXTENSION IF NOT EXISTS pg_partman") + .execute(&conn) + .await + .expect("failed to create extension"); + // CREATE with 5 seconds per partition, 10 seconds retention let test_duration_queue = format!("test_duration_{test_num}"); let q = format!("SELECT \"pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);");