Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq-python: adding support for Transaction #268

Merged
merged 25 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5c72c0e
feat: adding transaction as decorator
tavallaie Jun 15, 2024
39c4ac6
feat: test for transactions
tavallaie Jun 15, 2024
2e7ba20
add logger
tavallaie Jun 15, 2024
18ccefc
chore: linting
tavallaie Jun 15, 2024
405dd12
feat: successfull transaction operation
tavallaie Jun 16, 2024
b535b07
Merge branch 'main' into transaction
tavallaie Jun 16, 2024
fd15e60
chore: linting and formatting
tavallaie Jun 16, 2024
3682523
Merge branch 'transaction' of github.com:tavallaie/pgmq into transaction
tavallaie Jun 16, 2024
bae10b4
feat: adding better logger and optional for verbose
tavallaie Jun 16, 2024
2557382
feat: update readme for transaction
tavallaie Jun 16, 2024
5955c01
resolve conflict
tavallaie Sep 15, 2024
9d37d69
feat: support for transaction:
tavallaie Sep 15, 2024
ce339b3
Merge branch 'main' into transaction
ChuckHend Sep 19, 2024
e57141f
feat:remove perform_transaction
tavallaie Sep 20, 2024
d2554f2
Merge branch 'transaction' of github.com:tavallaie/pgmq into transaction
tavallaie Sep 20, 2024
5d3b9c2
feat: adding example for transaction
tavallaie Sep 20, 2024
34f11bb
feat: update readme for using transactions
tavallaie Sep 20, 2024
5e92223
chore: linting
tavallaie Sep 20, 2024
bba66f4
chore: remove unused tnx variable
tavallaie Sep 20, 2024
895a468
feat: update examples for non-db and non-pgmq
tavallaie Sep 20, 2024
a95d5cb
Merge branch 'main' into transaction
ChuckHend Sep 28, 2024
e3900f1
chore: remove extra space in README
tavallaie Sep 28, 2024
2d67949
feat: complete async example app
tavallaie Sep 28, 2024
f0f4c4a
Merge branch 'transaction' of github.com:tavallaie/pgmq into transaction
tavallaie Sep 28, 2024
c4e5b51
chore: fixing some python code intention within README
tavallaie Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 182 additions & 116 deletions tembo-pgmq-python/tembo_pgmq_python/queue.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from dataclasses import dataclass, field
import functools
import logging
import os
from datetime import datetime
from typing import Optional, List
from dataclasses import dataclass, field
from typing import Optional, List, Callable, Union
from psycopg.types.json import Jsonb
from psycopg_pool import ConnectionPool
import os

logger = logging.getLogger(__name__)
log_filename = datetime.now().strftime("pgmq_debug_%Y%m%d_%H%M%S.log")

# Configure logging at the start of the script
logging.basicConfig(
filename=log_filename,
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)


@dataclass
Expand All @@ -25,6 +37,36 @@ class QueueMetrics:
scrape_time: datetime


def transaction(func: Callable) -> Callable:
"""Decorator to run a method within a database transaction."""

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
perform_transaction = kwargs.pop("perform_transaction", self.perform_transaction)
if perform_transaction:
with self.pool.connection() as conn:
try:
logger.debug(f"Transaction started with conn: {conn}")
with conn.transaction():
result = func(self, *args, conn=conn, **kwargs)
logger.debug(f"Transaction completed with conn: {conn}")
return result
except Exception as e:
Copy link
Collaborator

@v0idpwn v0idpwn Jun 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully grasp this code, too. What makes it not start (and finish) a transaction every single call? Is this "transaction completed" log wrong? The transaction shouldn't finish upon the statement running, no?

logger.error(f"Transaction failed with exception: {e}, rolling back.")
try:
conn.rollback()
logger.debug(f"Transaction rolled back successfully with conn: {conn}")
except Exception as rollback_exception:
logger.error(f"Rollback failed: {rollback_exception}")
raise
else:
with self.pool.connection() as conn:
logger.debug(f"Non-transactional execution with conn: {conn}")
return func(self, *args, conn=conn, **kwargs)

return wrapper


@dataclass
class PGMQueue:
"""Base class for interacting with a queue"""
Expand All @@ -39,17 +81,9 @@ class PGMQueue:
pool_size: int = 10
kwargs: dict = field(default_factory=dict)
pool: ConnectionPool = field(init=False)
perform_transaction: bool = False

def __post_init__(self) -> None:
self.host = self.host or "localhost"
self.port = self.port or "5432"
self.database = self.database or "postgres"
self.username = self.username or "postgres"
self.password = self.password or "postgres"

if not all([self.host, self.port, self.database, self.username, self.password]):
raise ValueError("Incomplete database connection information provided.")

conninfo = f"""
host={self.host}
port={self.port}
Expand All @@ -58,155 +92,174 @@ def __post_init__(self) -> None:
password={self.password}
"""
self.pool = ConnectionPool(conninfo, open=True, **self.kwargs)

with self.pool.connection() as conn:
conn.execute("create extension if not exists pgmq cascade;")

self._initialize_extensions()

def _initialize_extensions(self, conn=None) -> None:
self._execute_query("create extension if not exists pgmq cascade;", conn=conn)

def _execute_query(self, query: str, params: Optional[Union[List, tuple]] = None, conn=None) -> None:
logger.debug(f"Executing query: {query} with params: {params} using conn: {conn}")
if conn:
conn.execute(query, params)
else:
with self.pool.connection() as conn:
conn.execute(query, params)

def _execute_query_with_result(self, query: str, params: Optional[Union[List, tuple]] = None, conn=None):
logger.debug(f"Executing query with result: {query} with params: {params} using conn: {conn}")
if conn:
return conn.execute(query, params).fetchall()
else:
with self.pool.connection() as conn:
return conn.execute(query, params).fetchall()

@transaction
def create_partitioned_queue(
self,
queue: str,
partition_interval: int = 10000,
retention_interval: int = 100000,
conn=None,
) -> None:
"""Create a new queue

Note: Partitions are created pg_partman which must be configured in postgresql.conf
Set `pg_partman_bgw.interval` to set the interval for partition creation and deletion.
A value of 10 will create new/delete partitions every 10 seconds. This value should be tuned
according to the volume of messages being sent to the queue.

Args:
queue: The name of the queue.
partition_interval: The number of messages per partition. Defaults to 10,000.
retention_interval: The number of messages to retain. Messages exceeding this number will be dropped.
Defaults to 100,000.
"""
"""Create a new queue"""
query = "select pgmq.create(%s, %s::text, %s::text);"
params = [queue, partition_interval, retention_interval]
self._execute_query(query, params, conn=conn)

with self.pool.connection() as conn:
conn.execute(
"select pgmq.create(%s, %s::text, %s::text);",
[queue, partition_interval, retention_interval],
)

def create_queue(self, queue: str, unlogged: bool = False) -> None:
@transaction
def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> None:
"""Create a new queue."""
with self.pool.connection() as conn:
if unlogged:
conn.execute("select pgmq.create_unlogged(%s);", [queue])
else:
conn.execute("select pgmq.create(%s);", [queue])
logger.debug(f"create_queue called with conn: {conn}")
query = "select pgmq.create_unlogged(%s);" if unlogged else "select pgmq.create(%s);"
self._execute_query(query, [queue], conn=conn)

def validate_queue_name(self, queue_name: str) -> None:
def validate_queue_name(self, queue_name: str, conn=None) -> None:
"""Validate the length of a queue name."""
with self.pool.connection() as conn:
conn.execute("select pgmq.validate_queue_name(%s);", [queue_name])
query = "select pgmq.validate_queue_name(%s);"
self._execute_query(query, [queue_name], conn=conn)

def drop_queue(self, queue: str, partitioned: bool = False) -> bool:
@transaction
def drop_queue(self, queue: str, partitioned: bool = False, conn=None) -> bool:
"""Drop a queue."""
with self.pool.connection() as conn:
result = conn.execute("select pgmq.drop_queue(%s, %s);", [queue, partitioned]).fetchone()
return result[0]
logger.debug(f"drop_queue called with conn: {conn}")
query = "select pgmq.drop_queue(%s, %s);"
result = self._execute_query_with_result(query, [queue, partitioned], conn=conn)
return result[0][0]

def list_queues(self) -> List[str]:
@transaction
def list_queues(self, conn=None) -> List[str]:
"""List all queues."""
with self.pool.connection() as conn:
rows = conn.execute("select queue_name from pgmq.list_queues();").fetchall()
logger.debug(f"list_queues called with conn: {conn}")
query = "select queue_name from pgmq.list_queues();"
rows = self._execute_query_with_result(query, conn=conn)
return [row[0] for row in rows]

def send(self, queue: str, message: dict, delay: int = 0) -> int:
@transaction
def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int:
"""Send a message to a queue."""
with self.pool.connection() as conn:
result = conn.execute("select * from pgmq.send(%s, %s, %s);", [queue, Jsonb(message), delay]).fetchall()
logger.debug(f"send called with conn: {conn}")
query = "select * from pgmq.send(%s, %s, %s);"
result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn)
return result[0][0]

def send_batch(self, queue: str, messages: List[dict], delay: int = 0) -> List[int]:
@transaction
def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]:
"""Send a batch of messages to a queue."""
with self.pool.connection() as conn:
result = conn.execute(
"select * from pgmq.send_batch(%s, %s, %s);",
[queue, [Jsonb(message) for message in messages], delay],
).fetchall()
logger.debug(f"send_batch called with conn: {conn}")
query = "select * from pgmq.send_batch(%s, %s, %s);"
params = [queue, [Jsonb(message) for message in messages], delay]
result = self._execute_query_with_result(query, params, conn=conn)
return [message[0] for message in result]

def read(self, queue: str, vt: Optional[int] = None) -> Optional[Message]:
@transaction
def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
with self.pool.connection() as conn:
rows = conn.execute("select * from pgmq.read(%s, %s, %s);", [queue, vt or self.vt, 1]).fetchall()

logger.debug(f"read called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, 1], conn=conn)
messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0] if len(messages) == 1 else None
return messages[0] if messages else None

def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1) -> Optional[List[Message]]:
@transaction
def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
with self.pool.connection() as conn:
rows = conn.execute(
"select * from pgmq.read(%s, %s, %s);",
[queue, vt or self.vt, batch_size],
).fetchall()

logger.debug(f"read_batch called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, batch_size], conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]

@transaction
def read_with_poll(
self,
queue: str,
vt: Optional[int] = None,
qty: int = 1,
max_poll_seconds: int = 5,
poll_interval_ms: int = 100,
conn=None,
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
with self.pool.connection() as conn:
rows = conn.execute(
"select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);",
[queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms],
).fetchall()

logger.debug(f"read_with_poll called with conn: {conn}")
query = "select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);"
params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms]
rows = self._execute_query_with_result(query, params, conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]

def pop(self, queue: str) -> Message:
@transaction
def pop(self, queue: str, conn=None) -> Message:
"""Pop a message from a queue."""
with self.pool.connection() as conn:
rows = conn.execute("select * from pgmq.pop(%s);", [queue]).fetchall()

logger.debug(f"pop called with conn: {conn}")
query = "select * from pgmq.pop(%s);"
rows = self._execute_query_with_result(query, [queue], conn=conn)
messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0]

def delete(self, queue: str, msg_id: int) -> bool:
@transaction
def delete(self, queue: str, msg_id: int, conn=None) -> bool:
"""Delete a message from a queue."""
with self.pool.connection() as conn:
row = conn.execute("select pgmq.delete(%s, %s);", [queue, msg_id]).fetchall()

return row[0][0]
logger.debug(f"delete called with conn: {conn}")
query = "select pgmq.delete(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id], conn=conn)
return result[0][0]

def delete_batch(self, queue: str, msg_ids: List[int]) -> List[int]:
@transaction
def delete_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]:
"""Delete multiple messages from a queue."""
with self.pool.connection() as conn:
result = conn.execute("select * from pgmq.delete(%s, %s);", [queue, msg_ids]).fetchall()
logger.debug(f"delete_batch called with conn: {conn}")
query = "select * from pgmq.delete(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_ids], conn=conn)
return [x[0] for x in result]

def archive(self, queue: str, msg_id: int) -> bool:
@transaction
def archive(self, queue: str, msg_id: int, conn=None) -> bool:
"""Archive a message from a queue."""
with self.pool.connection() as conn:
row = conn.execute("select pgmq.archive(%s, %s);", [queue, msg_id]).fetchall()

return row[0][0]
logger.debug(f"archive called with conn: {conn}")
query = "select pgmq.archive(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id], conn=conn)
return result[0][0]

def archive_batch(self, queue: str, msg_ids: List[int]) -> List[int]:
@transaction
def archive_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]:
"""Archive multiple messages from a queue."""
with self.pool.connection() as conn:
result = conn.execute("select * from pgmq.archive(%s, %s);", [queue, msg_ids]).fetchall()
logger.debug(f"archive_batch called with conn: {conn}")
query = "select * from pgmq.archive(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_ids], conn=conn)
return [x[0] for x in result]

def purge(self, queue: str) -> int:
@transaction
def purge(self, queue: str, conn=None) -> int:
"""Purge a queue."""
with self.pool.connection() as conn:
row = conn.execute("select pgmq.purge_queue(%s);", [queue]).fetchall()

return row[0][0]
logger.debug(f"purge called with conn: {conn}")
query = "select pgmq.purge_queue(%s);"
result = self._execute_query_with_result(query, [queue], conn=conn)
return result[0][0]

def metrics(self, queue: str) -> QueueMetrics:
with self.pool.connection() as conn:
result = conn.execute("SELECT * FROM pgmq.metrics(%s);", [queue]).fetchone()
@transaction
def metrics(self, queue: str, conn=None) -> QueueMetrics:
"""Get metrics for a specific queue."""
logger.debug(f"metrics called with conn: {conn}")
query = "SELECT * FROM pgmq.metrics(%s);"
result = self._execute_query_with_result(query, [queue], conn=conn)[0]
return QueueMetrics(
queue_name=result[0],
queue_length=result[1],
Expand All @@ -216,9 +269,12 @@ def metrics(self, queue: str) -> QueueMetrics:
scrape_time=result[5],
)

def metrics_all(self) -> List[QueueMetrics]:
with self.pool.connection() as conn:
results = conn.execute("SELECT * FROM pgmq.metrics_all();").fetchall()
@transaction
def metrics_all(self, conn=None) -> List[QueueMetrics]:
"""Get metrics for all queues."""
logger.debug(f"metrics_all called with conn: {conn}")
query = "SELECT * FROM pgmq.metrics_all();"
results = self._execute_query_with_result(query, conn=conn)
return [
QueueMetrics(
queue_name=row[0],
Expand All @@ -231,13 +287,23 @@ def metrics_all(self) -> List[QueueMetrics]:
for row in results
]

def set_vt(self, queue: str, msg_id: int, vt: int) -> Message:
@transaction
def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
"""Set the visibility timeout for a specific message."""
with self.pool.connection() as conn:
row = conn.execute("select * from pgmq.set_vt(%s, %s, %s);", [queue, msg_id, vt]).fetchone()
return Message(msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4])
logger.debug(f"set_vt called with conn: {conn}")
query = "select * from pgmq.set_vt(%s, %s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id, vt], conn=conn)[0]
return Message(
msg_id=result[0],
read_ct=result[1],
enqueued_at=result[2],
vt=result[3],
message=result[4],
)

def detach_archive(self, queue: str) -> None:
@transaction
def detach_archive(self, queue: str, conn=None) -> None:
"""Detach an archive from a queue."""
with self.pool.connection() as conn:
conn.execute("select pgmq.detach_archive(%s);", [queue])
logger.debug(f"detach_archive called with conn: {conn}")
query = "select pgmq.detach_archive(%s);"
self._execute_query(query, [queue], conn=conn)
Loading
Loading