Skip to content

Commit

Permalink
Merge pull request #556 from oasisprotocol/mitjat/fast-pgx-batches
Browse files Browse the repository at this point in the history
pgx: Transfer entire DB batch in one request
  • Loading branch information
mitjat authored Feb 6, 2024
2 parents cd158cb + 8b8e453 commit c433cdf
Showing 1 changed file with 92 additions and 69 deletions.
161 changes: 92 additions & 69 deletions storage/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package postgres
import (
"context"
"fmt"
"os"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -96,89 +95,113 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error
return c.SendBatchWithOptions(ctx, batch, pgx.TxOptions{})
}

func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
// NOTE: Sending txs with tx.SendBatch(batch.AsPgxBatch()) is more efficient as it happens
// in a single roundtrip to the server.
// However, it reports errors poorly: If _any_ query is syntactically
// malformed, called with the wrong number of args, or has a type conversion problem,
// pgx will report the _first_ query as failing.
//
// TODO: Remove the first branch if we verify that the performance gain is negligible.
if os.Getenv("PGX_FAST_BATCH") == "1" { //nolint:nestif
pgxBatch := batch.AsPgxBatch()
var batchResults pgx.BatchResults
var emptyTxOptions pgx.TxOptions
var tx pgx.Tx
var err error

// Begin a transaction.
useExplicitTx := opts != emptyTxOptions
if useExplicitTx {
// set up our own tx with the specified options
tx, err = c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
} else {
// use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
// Submits a new batch. Under the hood, uses `tx.SendBatch(batch.AsPgxBatch())`,
// which is more efficient as it happens in a single roundtrip to the server.
// However, it reports errors poorly: If _any_ query is syntactically
// malformed, called with the wrong number of args, or has a type conversion problem,
// pgx will report the _first_ query as failing.
//
// For efficiency and simplicity, the method does not use explicit transactions
// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed
// atomically, because:
// 1) We use pgx in its default QueryExecMode.
// 2) This in turn makes pgx use postgresql in pipeline mode.
// 3) Postgresql pipeline mode implies transactions-like behavior:
// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS

func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
pgxBatch := batch.AsPgxBatch()
var batchResults pgx.BatchResults
var emptyTxOptions pgx.TxOptions
var tx pgx.Tx
var err error

// Begin a transaction.
useExplicitTx := opts != emptyTxOptions // see function docstring for more info
if useExplicitTx {
// set up our own tx with the specified options
tx, err = c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
defer common.CloseOrLog(batchResults, c.logger)

// Exec indiviual queries in the batch.
for i := 0; i < pgxBatch.Len(); i++ {
if _, err := batchResults.Exec(); err != nil {
rollbackErr := ""
if useExplicitTx {
err2 := tx.Rollback(ctx)
if err2 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error())
}
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
} else {
// use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
}
defer common.CloseOrLog(batchResults, c.logger)

// Read the results of indiviual queries in the batch.
for i := 0; i < pgxBatch.Len(); i++ {
if _, err := batchResults.Exec(); err != nil {
rollbackErr := ""
if useExplicitTx {
err2 := tx.Rollback(ctx)
if err2 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error())
}
return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr)
}
return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr)
}
}

// Commit the tx.
if useExplicitTx {
err := tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
}
} else {
// Begin a transaction.
tx, err := c.pool.BeginTx(ctx, opts)
// Commit the tx.
if useExplicitTx {
err := tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
return fmt.Errorf("failed to commit tx: %w", err)
}
}
return nil
}

// Exec indiviual queries in the batch.
for i, q := range batch.Queries() {
if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil {
rollbackErr := ""
err3 := tx.Rollback(ctx)
if err3 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error())
}
return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr)
// Submits a new batch of queries, sending one query at a time. Compared with `sendBatchWithOptionsSlow`, this
// gives slower performance but better error reporting.
func (c *Client) sendBatchWithOptionsSlow(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
// Begin a transaction.
tx, err := c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}

// Exec indiviual queries in the batch.
for i, q := range batch.Queries() {
if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil {
rollbackErr := ""
err3 := tx.Rollback(ctx)
if err3 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error())
}
return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr)
}
}

// Commit the transaction.
err = tx.Commit(ctx)
if err != nil {
c.logger.Error("failed to submit tx",
"error", err,
"batch", batch.Queries(),
)
return err
}
// Commit the transaction.
err = tx.Commit(ctx)
if err != nil {
return err
}
return nil
}

func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
var err error
if err = c.sendBatchWithOptionsFast(ctx, batch, opts); err == nil {
// The fast path succeeded. This should happen most of the time.
return nil
}

// There was an error. The tx was reverted(*), so we can resubmit.
// (*) Theoretically it's possible that the tx was committed but the confirmation of commit
// didn't reach us due to networking issues. We don't handle this case.
// This time, use the slow method for better error msgs.
c.logger.Warn("failed to submit tx using the fast path; falling back to slow path",
"error", err,
"batch", batch.Queries(),
)
return c.sendBatchWithOptionsSlow(ctx, batch, opts)
}

// Query submits a new read query to PostgreSQL.
func (c *Client) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
rows, err := c.pool.Query(ctx, sql, args...)
Expand Down

0 comments on commit c433cdf

Please sign in to comment.