Skip to content

Latest commit

 

History

History
125 lines (88 loc) · 4.17 KB

README.md

File metadata and controls

125 lines (88 loc) · 4.17 KB

Poetry Ruff PyPI - Version PyPI - License PyPI - Python Version codecov Documentation Status

pgmq-sqlalchemy

More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn.

Table of Contents

Features

  • Supports async and sync engines and sessionmakers, or built from dsn.
  • Automatically creates pgmq (or pg_partman) extension on the database if not exists.
  • Supports all postgres DBAPIs supported by sqlalchemy.

    e.g. psycopg, psycopg2, asyncpg ..
    See SQLAlchemy Postgresql Dialects

Installation

Install with pip:

pip install pgmq-sqlalchemy

Install with additional DBAPIs packages:

pip install "pgmq-sqlalchemy[asyncpg]"
pip install "pgmq-sqlalchemy[psycopg2-binary]"
# pip install "pgmq-sqlalchemy[postgres-python-driver]"

Getting Started

Postgres Setup

Prerequisites: Postgres with PGMQ extension installed.
For quick setup:

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest

For more information, see PGMQ

Usage

Note

Check pgmq-sqlalchemy Document for more examples and detailed usage.

For dispatcher.py:

from typing import List
from pgmq_sqlalchemy import PGMQueue

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)
pgmq.create_queue('my_queue')

msg = {'key': 'value', 'key2': 'value2'}
msg_id:int = pgmq.send('my_queue', msg)

# could also send a list of messages
msg_ids:List[int] = pgmq.send_batch('my_queue', [msg, msg])

For consumer.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import Message

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# read a single message
msg:Message = pgmq.read('my_queue')

# read a batch of messages
msgs:List[Message] = pgmq.read_batch('my_queue', 10)

For monitor.py:

from pgmq_sqlalchemy import PGMQueue
from pgmq_sqlalchemy.schema import QueueMetrics

postgres_dsn = 'postgresql://postgres:postgres@localhost:5432/postgres'

pgmq = PGMQueue(dsn=postgres_dsn)

# get queue metrics
metrics:QueueMetrics = pgmq.metrics('my_queue')
print(metrics.queue_length)
print(metrics.total_messages)

Issue/ Contributing / Development

Welcome to open an issue or pull request !
See Development on Online Document or CONTRIBUTING.md for more information.

TODO

  • Add time-based partition option and validation to create_partitioned_queue method.
  • Read(single/batch) Archive Table ( read_archive method )
  • Detach Archive Table ( detach_archive method )
  • Add set_vt utils method.