Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transactional enqueuing #53

Open
nathankawalec opened this issue Aug 29, 2024 · 10 comments · May be fixed by #63
Open

Add transactional enqueuing #53

nathankawalec opened this issue Aug 29, 2024 · 10 comments · May be fixed by #63

Comments

@nathankawalec
Copy link

It would be very beneficial to be able to queue messages in a transaction, similar to what riverqueue offers (https://pkg.go.dev/github.com/riverqueue/river#Client.InsertTx), this would completely mitigate errors that could arise if data is added to a db and a crash occurs before the message could be queued, or vice versa.
This can be done either by returning a pointer to a tx or by accepting a tx as an input parameter when queuing a message. If this is a feature you could add it would be very appreciated :)

@craigpastro
Copy link
Owner

Hi @nathankawalec! There is SendBatch, but perhaps this doesn't work for your use case? If so, then perhaps a SendTx makes sense. I'm happy to review a PR, otherwise, I will attempt an implementation when I have some spare time. Thanks!

@nathankawalec
Copy link
Author

Sorry for the late response. Upon further review, I think all that is required is something like:
NewFromTx() *PGMQ { ... }

Only issue I can think of is pgxpool transaction doesn't implement Close or Ping methods, which would require end user to wrap the transaction object in order to implement those methods (with no op)

I believe Close method called on rows in SendBatchWithDelay, ReadBatch, ArchiveBatch, and DeleteBatch can be safely called in a transaction with more work to be done, but I may be mistaken.

I think ideally it would be possible to use each method inside a transaction.

@nathankawalec
Copy link
Author

Oops, didn't mean to close.

@nathankawalec nathankawalec reopened this Sep 25, 2024
@SoftExpert
Copy link

My 2 cents here: with commit 1e953ef it became clear that the transaction is fully supported by pgmq.
Perhaps the internal code of the GO client will be a bit more complicated, in order to call the correct APIs of pgmq ...

@craigpastro
Copy link
Owner

I finally got around to implementing this in #63. I dropped the PGMQ struct and added an argument to all functions that only needs to satisfy the DB interface:

type DB interface {
	Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, query string, args ...any) pgx.Row
}

In particular, pgx.Tx satisfies this interface.

Example usage:

// Suppose pool is an instance of *pgxpool.Pool and a PGQM queue named "queue" exists

tx, err := pool.Begin(ctx)
// handle error

id, err := pgmq.Send(ctx, tx, "queue", json.RawMessage(`{"foo": "bar"}`))
// handle err

// do other transaction stuff
err = tx.Commit(ctx)

WDYT?

@craigpastro craigpastro linked a pull request Jan 4, 2025 that will close this issue
@SoftExpert
Copy link

If I understand correctly, the DB connection object will be maintained outside of the pgmq-go library and only passed on as parameter for each exposed method, right ?
Thus, the application becomes entirely responsible of initializing, testing and maintaining the connection to the DB.

Is this correct ?

@craigpastro
Copy link
Owner

Correct. I think it makes sense because you may be doing many other things with the connection. The advantage of using a queue in Postgres. Of course we could provide convenience functions like CreatePgxPool, etc. if those are valuable.

@SoftExpert
Copy link

SoftExpert commented Jan 6, 2025

There will be use cases where the transactions are not required / needed.
What happens with the methods that are not transaction aware ?

Since the library requires anyways to work with a DB compliant connection, probably it would be easier to accept a pointer to the active DB object at the initialization and use that reference forward - thus avoiding to pass every time the DB object as parameter.

What do you think ?

@craigpastro
Copy link
Owner

Oh yeah, definitely. I aimed for a general API that would support all/most use cases. So, if you don't need transactions and just want to use PGMQ, then the simplest way would be

 pool, err := pgmq.NewPgxPool(ctx, "postgres://postgres:password@localhost:5432/postgres")
err = pgmq.CreatePGMQExtension(ctx, pool)
// could combine these two functions into pgmq.NewPGMQ I suppose

err = pgmq.CreateQueue(ctx, pool, "my_queue")
id, err := pgmq.Send(ctx, pool, "my_queue", msg)

It is a bit annoying to pass the DB object in every case, but it was the least objectionable solution that I could think of and facilitates interweaving with other DB operations and working with transactions.

I thought about accepting the pointer to the active DB object and defining methods on that, but then how would one work with transactions? I thought of adding extra methods that work on transactions, or perhaps adding an extra argument to take an optional transaction and use the transaction if supplied, but neither of these were that nice to me.

@SoftExpert
Copy link

If you would accept the pointer to the DB object, the transaction aware methods could :

  1. just declare their own transaction object and return it to the calling code
  2. accept an external transaction object reference - for more complex handling
    In both cases, the calling code would be responsible to commit or rollback the received transaction object.

I would see something like:

  • pgmq.NewPgxPool <- creates and returns a PgxPool object
  • pgmq.NewFromPgxPool <- receives a pointer to an existing PgxPool object

Both methods could have a boolean flag to ensure the extensions is present / created

  • pgmq.Send <- no Tx is necessary
  • pgmq.SendTx <- a Tx object is created and returned
  • pgmq.SendWithTx <- an existing Tx object is passed on as parameter

The API looks more consistent like this, in my opinion ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants