diff --git a/storage/postgres/client.go b/storage/postgres/client.go index 8a51dded6..5cec9be00 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -5,7 +5,6 @@ package postgres import ( "context" "fmt" - "os" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -96,89 +95,113 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error return c.SendBatchWithOptions(ctx, batch, pgx.TxOptions{}) } -func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { - // NOTE: Sending txs with tx.SendBatch(batch.AsPgxBatch()) is more efficient as it happens - // in a single roundtrip to the server. - // However, it reports errors poorly: If _any_ query is syntactically - // malformed, called with the wrong number of args, or has a type conversion problem, - // pgx will report the _first_ query as failing. - // - // TODO: Remove the first branch if we verify that the performance gain is negligible. - if os.Getenv("PGX_FAST_BATCH") == "1" { //nolint:nestif - pgxBatch := batch.AsPgxBatch() - var batchResults pgx.BatchResults - var emptyTxOptions pgx.TxOptions - var tx pgx.Tx - var err error - - // Begin a transaction. - useExplicitTx := opts != emptyTxOptions - if useExplicitTx { - // set up our own tx with the specified options - tx, err = c.pool.BeginTx(ctx, opts) - if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) - } - batchResults = c.pool.SendBatch(ctx, &pgxBatch) - } else { - // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 - batchResults = c.pool.SendBatch(ctx, &pgxBatch) +// Submits a new batch. Under the hood, uses `tx.SendBatch(batch.AsPgxBatch())`, +// which is more efficient as it happens in a single roundtrip to the server. +// However, it reports errors poorly: If _any_ query is syntactically +// malformed, called with the wrong number of args, or has a type conversion problem, +// pgx will report the _first_ query as failing. +// +// For efficiency and simplicity, the method does not use explicit transactions +// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed +// atomically, because: +// 1) We use pgx in its default QueryExecMode. +// 2) This in turn makes pgx use postgresql in pipeline mode. +// 3) Postgresql pipeline mode implies transactions-like behavior: +// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS + +func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + pgxBatch := batch.AsPgxBatch() + var batchResults pgx.BatchResults + var emptyTxOptions pgx.TxOptions + var tx pgx.Tx + var err error + + // Begin a transaction. + useExplicitTx := opts != emptyTxOptions // see function docstring for more info + if useExplicitTx { + // set up our own tx with the specified options + tx, err = c.pool.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("failed to begin tx: %w", err) } - defer common.CloseOrLog(batchResults, c.logger) - - // Exec indiviual queries in the batch. - for i := 0; i < pgxBatch.Len(); i++ { - if _, err := batchResults.Exec(); err != nil { - rollbackErr := "" - if useExplicitTx { - err2 := tx.Rollback(ctx) - if err2 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) - } + batchResults = c.pool.SendBatch(ctx, &pgxBatch) + } else { + // use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879 + batchResults = c.pool.SendBatch(ctx, &pgxBatch) + } + defer common.CloseOrLog(batchResults, c.logger) + + // Read the results of indiviual queries in the batch. + for i := 0; i < pgxBatch.Len(); i++ { + if _, err := batchResults.Exec(); err != nil { + rollbackErr := "" + if useExplicitTx { + err2 := tx.Rollback(ctx) + if err2 != nil { + rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error()) } - return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) } + return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr) } + } - // Commit the tx. - if useExplicitTx { - err := tx.Commit(ctx) - if err != nil { - return fmt.Errorf("failed to commit tx: %w", err) - } - } - } else { - // Begin a transaction. - tx, err := c.pool.BeginTx(ctx, opts) + // Commit the tx. + if useExplicitTx { + err := tx.Commit(ctx) if err != nil { - return fmt.Errorf("failed to begin tx: %w", err) + return fmt.Errorf("failed to commit tx: %w", err) } + } + return nil +} - // Exec indiviual queries in the batch. - for i, q := range batch.Queries() { - if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil { - rollbackErr := "" - err3 := tx.Rollback(ctx) - if err3 != nil { - rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error()) - } - return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr) +// Submits a new batch of queries, sending one query at a time. Compared with `sendBatchWithOptionsSlow`, this +// gives slower performance but better error reporting. +func (c *Client) sendBatchWithOptionsSlow(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + // Begin a transaction. + tx, err := c.pool.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("failed to begin tx: %w", err) + } + + // Exec indiviual queries in the batch. + for i, q := range batch.Queries() { + if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil { + rollbackErr := "" + err3 := tx.Rollback(ctx) + if err3 != nil { + rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error()) } + return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr) } + } - // Commit the transaction. - err = tx.Commit(ctx) - if err != nil { - c.logger.Error("failed to submit tx", - "error", err, - "batch", batch.Queries(), - ) - return err - } + // Commit the transaction. + err = tx.Commit(ctx) + if err != nil { + return err } return nil } +func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { + var err error + if err = c.sendBatchWithOptionsFast(ctx, batch, opts); err == nil { + // The fast path succeeded. This should happen most of the time. + return nil + } + + // There was an error. The tx was reverted(*), so we can resubmit. + // (*) Theoretically it's possible that the tx was committed but the confirmation of commit + // didn't reach us due to networking issues. We don't handle this case. + // This time, use the slow method for better error msgs. + c.logger.Warn("failed to submit tx using the fast path; falling back to slow path", + "error", err, + "batch", batch.Queries(), + ) + return c.sendBatchWithOptionsSlow(ctx, batch, opts) +} + // Query submits a new read query to PostgreSQL. func (c *Client) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { rows, err := c.pool.Query(ctx, sql, args...)