Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate schemas #115

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.24.0"
version = "0.25.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
50 changes: 25 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ CREATE EXTENSION pgmq;

### Creating a queue

Every queue is its own table in Postgres. The table name is the queue name prefixed with `pgmq_`.
For example, `pgmq_my_queue` is the table for the queue `my_queue`.
Every queue is its own table in Postgres. The table name is the queue name prefixed with `queue_`.
For example, `queue_my_mq` is the table for the queue `my_mq`.

```sql
-- creates the queue
SELECT pgmq_create('my_queue');
SELECT pgmq.create('my_queue');
```

```text
pgmq_create
create
-------------

(1 row)
Expand All @@ -85,19 +85,19 @@ SELECT pgmq_create('my_queue');

```sql
-- messages are sent as JSON
SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');
SELECT * from pgmq.send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq.send('my_queue', '{"foo": "bar2"}');
```

The message id is returned from the send function.

```text
pgmq_send
send
-----------
1
(1 row)

pgmq_send
send
-----------
2
(1 row)
Expand All @@ -110,7 +110,7 @@ Read `2` message from the queue. Make them invisible for `30` seconds.
and can be read by another consumer.

```sql
SELECT * from pgmq_read('my_queue', 30, 2);
SELECT pgmq.read('my_queue', 30, 2);
```

```text
Expand All @@ -123,7 +123,7 @@ SELECT * from pgmq_read('my_queue', 30, 2);
If the queue is empty, or if all messages are currently invisible, no rows will be returned.

```sql
SELECT * from pgmq_read('my_queue', 30, 1);
SELECT pgmq.read('my_queue', 30, 1);
```

```text
Expand All @@ -135,7 +135,7 @@ SELECT * from pgmq_read('my_queue', 30, 1);

```sql
-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
SELECT * from pgmq_pop('my_queue');
SELECT pgmq.pop('my_queue');
```

```text
Expand All @@ -151,18 +151,18 @@ Archiving a message removes it from the queue, and inserts it to the archive tab
Archive message with msg_id=2.

```sql
SELECT * from pgmq_archive('my_queue', 2);
SELECT pgmq.archive('my_queue', 2);
```

```text
pgmq_archive
archive
--------------
t
(1 row)
```

```sql
SELECT * from pgmq_my_queue_archive;
SELECT pgmq.my_queue_archive;
```

```text
Expand All @@ -176,11 +176,11 @@ SELECT * from pgmq_my_queue_archive;
Send another message, so that we can delete it.

```sql
SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
SELECT pgmq.send('my_queue', '{"foo": "bar3"}');
```

```text
pgmq_send
send
-----------
3
(1 row)
Expand All @@ -189,11 +189,11 @@ SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
Delete the message with id `3` from the queue named `my_queue`.

```sql
SELECT pgmq_delete('my_queue', 3);
SELECT pgmq.delete('my_queue', 3);
```

```text
pgmq_delete
delete
-------------
t
(1 row)
Expand All @@ -204,11 +204,11 @@ SELECT pgmq_delete('my_queue', 3);
Delete the queue `my_queue`.

```sql
SELECT pgmq_drop_queue('my_queue');
SELECT pgmq.drop_queue('my_queue');
```

```text
pgmq_drop_queue
drop_queue
-----------------
t
(1 row)
Expand All @@ -220,18 +220,18 @@ SELECT pgmq_drop_queue('my_queue');

You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` partitioned queues.

`pgmq` queue tables can be created as a partitioned table by using `pgmq_create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/)
`pgmq` queue tables can be created as a partitioned table by using `pgmq.create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/)
handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions.

Partitions behavior is configured at the time queues are created, via `pgmq_create_partitioned()`. This function has three parameters:
Partitions behavior is configured at the time queues are created, via `pgmq.create_partitioned()`. This function has three parameters:

`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `pgmq_`. For example, `pgmq_my_queue`.
`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `queue_`. For example, `queue_my_mq`.


`partition_interval: text` - The interval at which partitions are created. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, queues are partitioned by the time at which messages are sent to the table (`enqueued_at`). A value of `'daily'` would create a new partition each day. When it is an integer value, queues are partitioned by the `msg_id`. A value of `'100'` will create a new partition every 100 messages. The value must agree with `retention_interval` (time based or numeric). The default value is `daily`.


`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq_delete()` or archived with `pgmq_archive()`. `pgmq_delete()` removes messages forever and `pgmq_archive()` moves messages to the corresponding archive table forever (for example, `pgmq_my_queue_archive`).
`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq.delete()` or archived with `pgmq.archive()`. `pgmq.delete()` removes messages forever and `pgmq.archive()` moves messages to the corresponding archive table forever (for example, `queue_my_mq_archive`).


In order for automatic partition maintenance to take place, several settings must be added to the `postgresql.conf` file, which is typically located in the postgres `DATADIR`.
Expand All @@ -252,7 +252,7 @@ pg_partman_bgw.dbname = 'postgres'

## Visibility Timeout (vt)

pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq_read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq_delete()` to completely remove the message from the queue or `pgmq_archive()` to move it to the archive table for the queue.
pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq.read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq.delete()` to completely remove the message from the queue or `pgmq.archive()` to move it to the archive table for the queue.

## ✨ Contributors

Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.4.1"
version = "0.5.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
Loading