From 0879d6732bd58413c68d2ab3c233d08386c9d602 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Tue, 31 Oct 2023 21:24:37 +0100 Subject: [PATCH 1/3] pgx: Transfer all queries in a DB batch in one request --- storage/postgres/client.go | 147 ++++++++++++++++++++----------------- 1 file changed, 78 insertions(+), 69 deletions(-) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 8a51dded6..8ab6c3d9b 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -5,7 +5,6 @@ package postgres import ( "context" "fmt" - "os" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -96,89 +95,99 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error return c.SendBatchWithOptions(ctx, batch, pgx.TxOptions{}) } -func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { - // NOTE: Sending txs with tx.SendBatch(batch.AsPgxBatch()) is more efficient as it happens - // in a single roundtrip to the server. - // However, it reports errors poorly: If _any_ query is syntactically - // malformed, called with the wrong number of args, or has a type conversion problem, - // pgx will report the _first_ query as failing. - // - // TODO: Remove the first branch if we verify that the performance gain is negligible. - if os.Getenv("PGX_FAST_BATCH") == "1" { //nolint:nestif - pgxBatch := batch.AsPgxBatch() - var batchResults pgx.BatchResults - var emptyTxOptions pgx.TxOptions - var tx pgx.Tx - var err error - - // Begin a transaction. - useExplicitTx := opts != emptyTxOptions - if useExplicitTx { - // set up our own tx with the specified options - tx, err = c.pool.BeginTx(ctx, opts) - if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) - } - batchResults = c.pool.SendBatch(ctx, &pgxBatch) - } else { - // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 - batchResults = c.pool.SendBatch(ctx, &pgxBatch) +// Submits a new batch. Under the hood, uses `tx.SendBatch(batch.AsPgxBatch())`, +// which is more efficient as it happens in a single roundtrip to the server. +// However, it reports errors poorly: If _any_ query is syntactically +// malformed, called with the wrong number of args, or has a type conversion problem, +// pgx will report the _first_ query as failing. +func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + pgxBatch := batch.AsPgxBatch() + var batchResults pgx.BatchResults + var emptyTxOptions pgx.TxOptions + var tx pgx.Tx + var err error + + // Begin a transaction. + useExplicitTx := opts != emptyTxOptions + if useExplicitTx { + // set up our own tx with the specified options + tx, err = c.pool.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("failed to begin tx: %w", err) } - defer common.CloseOrLog(batchResults, c.logger) - - // Exec indiviual queries in the batch. - for i := 0; i < pgxBatch.Len(); i++ { - if _, err := batchResults.Exec(); err != nil { - rollbackErr := "" - if useExplicitTx { - err2 := tx.Rollback(ctx) - if err2 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) - } + batchResults = c.pool.SendBatch(ctx, &pgxBatch) + } else { + // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 + batchResults = c.pool.SendBatch(ctx, &pgxBatch) + } + defer common.CloseOrLog(batchResults, c.logger) + + // Exec indiviual queries in the batch. + for i := 0; i < pgxBatch.Len(); i++ { + if _, err := batchResults.Exec(); err != nil { + rollbackErr := "" + if useExplicitTx { + err2 := tx.Rollback(ctx) + if err2 != nil { + rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) } - return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) } + return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) } + } - // Commit the tx. - if useExplicitTx { - err := tx.Commit(ctx) - if err != nil { - return fmt.Errorf("failed to commit tx: %w", err) - } - } - } else { - // Begin a transaction. - tx, err := c.pool.BeginTx(ctx, opts) + // Commit the tx. + if useExplicitTx { + err := tx.Commit(ctx) if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) + return fmt.Errorf("failed to commit tx: %w", err) } + } + return nil +} - // Exec indiviual queries in the batch. - for i, q := range batch.Queries() { - if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil { - rollbackErr := "" - err3 := tx.Rollback(ctx) - if err3 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error()) - } - return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr) +// Submits a new batch of queries, sending one query at a time. Compared with `sendBatchWithOptionsSlow`, this +// gives slower performance but better error reporting. +func (c *Client) sendBatchWithOptionsSlow(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + // Begin a transaction. + tx, err := c.pool.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("failed to begin tx: %w", err) + } + + // Exec indiviual queries in the batch. + for i, q := range batch.Queries() { + if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil { + rollbackErr := "" + err3 := tx.Rollback(ctx) + if err3 != nil { + rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error()) } + return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr) } + } - // Commit the transaction. - err = tx.Commit(ctx) - if err != nil { - c.logger.Error("failed to submit tx", - "error", err, - "batch", batch.Queries(), - ) - return err - } + // Commit the transaction. + err = tx.Commit(ctx) + if err != nil { + c.logger.Error("failed to submit tx", + "error", err, + "batch", batch.Queries(), + ) + return err } return nil } +func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + if err := c.sendBatchWithOptionsFast(ctx, batch, opts); err == nil { + // The fast path succeeded. This should happen most of the time. + return nil + } + // There was an error. The tx was reverted, so we can resubmit. This time, use the slow method for better error msgs. + return c.sendBatchWithOptionsSlow(ctx, batch, opts) +} + // Query submits a new read query to PostgreSQL. func (c *Client) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { rows, err := c.pool.Query(ctx, sql, args...) From 176a4ed9fde14222550d59a7254024867e192280 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Mon, 5 Feb 2024 16:34:43 -0800 Subject: [PATCH 2/3] clean up comments, logging --- storage/postgres/client.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 8ab6c3d9b..318b37b75 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -122,7 +122,7 @@ func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.Qu } defer common.CloseOrLog(batchResults, c.logger) - // Exec indiviual queries in the batch. + // Read the results of indiviual queries in the batch. for i := 0; i < pgxBatch.Len(); i++ { if _, err := batchResults.Exec(); err != nil { rollbackErr := "" @@ -170,21 +170,26 @@ func (c *Client) sendBatchWithOptionsSlow(ctx context.Context, batch *storage.Qu // Commit the transaction. err = tx.Commit(ctx) if err != nil { - c.logger.Error("failed to submit tx", - "error", err, - "batch", batch.Queries(), - ) return err } return nil } func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { - if err := c.sendBatchWithOptionsFast(ctx, batch, opts); err == nil { + var err error + if err = c.sendBatchWithOptionsFast(ctx, batch, opts); err == nil { // The fast path succeeded. This should happen most of the time. return nil } - // There was an error. The tx was reverted, so we can resubmit. This time, use the slow method for better error msgs. + + // There was an error. The tx was reverted(*), so we can resubmit. + // (*) Theoretically it's possible that the tx was committed but the confirmation of commit + // didn't reach us due to networking issues. We don't handle this case. + // This time, use the slow method for better error msgs. + c.logger.Warn("failed to submit tx using the fast path; falling back to slow path", + "error", err, + "batch", batch.Queries(), + ) return c.sendBatchWithOptionsSlow(ctx, batch, opts) } From 8b8e4539c6586753ebf43e3d44b37a93464b6a2c Mon Sep 17 00:00:00 2001 From: Mitja T Date: Mon, 5 Feb 2024 17:24:24 -0800 Subject: [PATCH 3/3] document implicit txs --- storage/postgres/client.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 318b37b75..5cec9be00 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -100,6 +100,15 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error // However, it reports errors poorly: If _any_ query is syntactically // malformed, called with the wrong number of args, or has a type conversion problem, // pgx will report the _first_ query as failing. +// +// For efficiency and simplicity, the method does not use explicit transactions +// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed +// atomically, because: +// 1) We use pgx in its default QueryExecMode. +// 2) This in turn makes pgx use postgresql in pipeline mode. +// 3) Postgresql pipeline mode implies transactions-like behavior: +// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS + func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { pgxBatch := batch.AsPgxBatch() var batchResults pgx.BatchResults @@ -108,7 +117,7 @@ func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.Qu var err error // Begin a transaction. - useExplicitTx := opts != emptyTxOptions + useExplicitTx := opts != emptyTxOptions // see function docstring for more info if useExplicitTx { // set up our own tx with the specified options tx, err = c.pool.BeginTx(ctx, opts)