Skip to content

Commit

Permalink
Retry on lock_timeout errors (#353)
Browse files Browse the repository at this point in the history
Retry statements and transactions that fail due to `lock_timeout`
errors.

DDL operations and backfills are run in a session in which `SET
lock_timout TO xms'` has been set (`x` defaults to `500` but can be
specified with the `--lock-timeout` parameter). This ensures that a long
running query can't cause other queries to queue up behind a DDL
operation as it waits to acquire its lock.

The current behaviour if a DDL operation or backfill batch times out
when requesting a lock is to fail, forcing the user to retry the
migration operation (start, rollback, or complete).

This PR retries individual statements (like the DDL operations run by
migration operations) and transactions (used by backfills) if they fail
due to a `lock_timeout` error. The retry uses an exponential backoff
with jitter.

Fixes #171
  • Loading branch information
andrew-farries authored May 8, 2024
1 parent 4f0a715 commit 5c1aef2
Show file tree
Hide file tree
Showing 33 changed files with 365 additions and 166 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/xataio/pgroll
go 1.21

require (
github.com/cloudflare/backoff v0.0.0-20161212185259-647f3cdfc87a
github.com/google/go-cmp v0.6.0
github.com/lib/pq v1.10.9
github.com/oapi-codegen/nullable v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/backoff v0.0.0-20161212185259-647f3cdfc87a h1:8d1CEOF1xldesKds5tRG3tExBsMOgWYownMHNCsev54=
github.com/cloudflare/backoff v0.0.0-20161212185259-647f3cdfc87a/go.mod h1:rzgs2ZOiguV6/NpiDgADjRLPNyZlApIWxKpkT+X8SdY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
82 changes: 82 additions & 0 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0

package db

import (
"context"
"database/sql"
"errors"
"time"

"github.com/cloudflare/backoff"
"github.com/lib/pq"
)

const (
lockNotAvailableErrorCode pq.ErrorCode = "55P03"
maxBackoffDuration = 1 * time.Minute
backoffInterval = 1 * time.Second
)

type DB interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
WithRetryableTransaction(ctx context.Context, f func(context.Context, *sql.Tx) error) error
Close() error
}

// RDB wraps a *sql.DB and retries queries using an exponential backoff (with
// jitter) on lock_timeout errors.
type RDB struct {
DB *sql.DB
}

// ExecContext wraps sql.DB.ExecContext, retrying queries on lock_timeout errors.
func (db *RDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
b := backoff.New(maxBackoffDuration, backoffInterval)

for {
res, err := db.DB.ExecContext(ctx, query, args...)
if err == nil {
return res, nil
}

pqErr := &pq.Error{}
if errors.As(err, &pqErr) && pqErr.Code == lockNotAvailableErrorCode {
<-time.After(b.Duration())
} else {
return nil, err
}
}
}

// WithRetryableTransaction runs `f` in a transaction, retrying on lock_timeout errors.
func (db *RDB) WithRetryableTransaction(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
b := backoff.New(maxBackoffDuration, backoffInterval)

for {
tx, err := db.DB.BeginTx(ctx, nil)
if err != nil {
return err
}

err = f(ctx, tx)
if err == nil {
return tx.Commit()
}

if errRollback := tx.Rollback(); errRollback != nil {
return errRollback
}

pqErr := &pq.Error{}
if errors.As(err, &pqErr) && pqErr.Code == lockNotAvailableErrorCode {
<-time.After(b.Duration())
} else {
return err
}
}
}

func (db *RDB) Close() error {
return db.DB.Close()
}
121 changes: 121 additions & 0 deletions pkg/db/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-License-Identifier: Apache-2.0

package db_test

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.SharedTestMain(m)
}

func TestExecContext(t *testing.T) {
t.Parallel()

testutils.WithConnectionToContainer(t, func(conn *sql.DB, connStr string) {
ctx := context.Background()
// create a table on which an exclusive lock is held for 2 seconds
setupTableLock(t, connStr, 2*time.Second)

// set the lock timeout to 100ms
ensureLockTimeout(t, conn, 100)

// execute a query that should retry until the lock is released
rdb := &db.RDB{DB: conn}
_, err := rdb.ExecContext(ctx, "INSERT INTO test(id) VALUES (1)")
require.NoError(t, err)
})
}

func TestWithRetryableTransaction(t *testing.T) {
t.Parallel()

testutils.WithConnectionToContainer(t, func(conn *sql.DB, connStr string) {
ctx := context.Background()

// create a table on which an exclusive lock is held for 2 seconds
setupTableLock(t, connStr, 2*time.Second)

// set the lock timeout to 100ms
ensureLockTimeout(t, conn, 100)

// run a transaction that should retry until the lock is released
rdb := &db.RDB{DB: conn}
err := rdb.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
return tx.QueryRowContext(ctx, "SELECT 1 FROM test").Err()
})
require.NoError(t, err)
})
}

// setupTableLock:
// * connects to the database
// * creates a table in the database
// * starts a transaction that temporarily locks the table
func setupTableLock(t *testing.T, connStr string, d time.Duration) {
t.Helper()
ctx := context.Background()

// connect to the database
conn2, err := sql.Open("postgres", connStr)
require.NoError(t, err)

// create a table in the database
_, err = conn2.ExecContext(ctx, "CREATE TABLE test (id INT PRIMARY KEY)")
require.NoError(t, err)

// start a transaction that takes a temporary lock on the table
errCh := make(chan error)
go func() {
// begin a transaction
tx, err := conn2.Begin()
if err != nil {
errCh <- err
return
}

// lock the table
_, err = tx.ExecContext(ctx, "LOCK TABLE test IN ACCESS EXCLUSIVE MODE")
if err != nil {
errCh <- err
return
}

// signal that the lock is obtained
errCh <- nil

// temporarily hold the lock
time.Sleep(d)

// commit the transaction
tx.Commit()
}()

// wait for the lock to be obtained
err = <-errCh
require.NoError(t, err)
}

func ensureLockTimeout(t *testing.T, conn *sql.DB, ms int) {
t.Helper()

// Set the lock timeout
query := fmt.Sprintf("SET lock_timeout = '%dms'", ms)
_, err := conn.ExecContext(context.Background(), query)
require.NoError(t, err)

// Ensure the lock timeout is set
var lockTimeout string
err = conn.QueryRowContext(context.Background(), "SHOW lock_timeout").Scan(&lockTimeout)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%dms", ms), lockTimeout)
}
34 changes: 14 additions & 20 deletions pkg/migrations/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/schema"
)

Expand All @@ -18,7 +19,7 @@ import (
// 2. Get the first batch of rows from the table, ordered by the primary key.
// 3. Update each row in the batch, setting the value of the primary key column to itself.
// 4. Repeat steps 2 and 3 until no more rows are returned.
func Backfill(ctx context.Context, conn *sql.DB, table *schema.Table, cbs ...CallbackFn) error {
func Backfill(ctx context.Context, conn db.DB, table *schema.Table, cbs ...CallbackFn) error {
// get the backfill column
identityColumn := getIdentityColumn(table)
if identityColumn == nil {
Expand Down Expand Up @@ -85,27 +86,20 @@ type batcher struct {
batchSize int
}

// updateBatch updates the next batch of rows in the table.
func (b *batcher) updateBatch(ctx context.Context, conn *sql.DB) error {
// Start the transaction for this batch
tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

// Build the query to update the next batch of rows
query := b.buildQuery()
func (b *batcher) updateBatch(ctx context.Context, conn db.DB) error {
return conn.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
// Build the query to update the next batch of rows
query := b.buildQuery()

// Execute the query to update the next batch of rows and update the last PK
// value for the next batch
err = tx.QueryRowContext(ctx, query).Scan(&b.lastValue)
if err != nil {
return err
}
// Execute the query to update the next batch of rows and update the last PK
// value for the next batch
err := tx.QueryRowContext(ctx, query).Scan(&b.lastValue)
if err != nil {
return err
}

// Commit the transaction for this batch
return tx.Commit()
return nil
})
}

// buildQuery builds the query used to update the next batch of rows.
Expand Down
6 changes: 3 additions & 3 deletions pkg/migrations/comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package migrations

import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
)

func addCommentToColumn(ctx context.Context, conn *sql.DB, tableName, columnName string, comment *string) error {
func addCommentToColumn(ctx context.Context, conn db.DB, tableName, columnName string, comment *string) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS %s`,
pq.QuoteIdentifier(tableName),
pq.QuoteIdentifier(columnName),
Expand All @@ -19,7 +19,7 @@ func addCommentToColumn(ctx context.Context, conn *sql.DB, tableName, columnName
return err
}

func addCommentToTable(ctx context.Context, conn *sql.DB, tableName string, comment *string) error {
func addCommentToTable(ctx context.Context, conn db.DB, tableName string, comment *string) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`COMMENT ON TABLE %s IS %s`,
pq.QuoteIdentifier(tableName),
commentToSQL(comment)))
Expand Down
6 changes: 3 additions & 3 deletions pkg/migrations/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package migrations

import (
"context"
"database/sql"
"fmt"
"slices"
"strings"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/schema"
)

type Duplicator struct {
conn *sql.DB
conn db.DB
table *schema.Table
column *schema.Column
asName string
Expand All @@ -24,7 +24,7 @@ type Duplicator struct {
}

// NewColumnDuplicator creates a new Duplicator for a column.
func NewColumnDuplicator(conn *sql.DB, table *schema.Table, column *schema.Column) *Duplicator {
func NewColumnDuplicator(conn db.DB, table *schema.Table, column *schema.Column) *Duplicator {
return &Duplicator{
conn: conn,
table: table,
Expand Down
8 changes: 4 additions & 4 deletions pkg/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package migrations

import (
"context"
"database/sql"
"fmt"

_ "github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
"github.com/xataio/pgroll/pkg/schema"
)

Expand All @@ -18,16 +18,16 @@ type Operation interface {
// version in the database (through a view)
// update the given views to expose the new schema version
// Returns the table that requires backfilling, if any.
Start(ctx context.Context, conn *sql.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error)
Start(ctx context.Context, conn db.DB, stateSchema string, tr SQLTransformer, s *schema.Schema, cbs ...CallbackFn) (*schema.Table, error)

// Complete will update the database schema to match the current version
// after calling Start.
// This method should be called once the previous version is no longer used
Complete(ctx context.Context, conn *sql.DB, tr SQLTransformer, s *schema.Schema) error
Complete(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error

// Rollback will revert the changes made by Start. It is not possible to
// rollback a completed migration.
Rollback(ctx context.Context, conn *sql.DB, tr SQLTransformer) error
Rollback(ctx context.Context, conn db.DB, tr SQLTransformer) error

// Validate returns a descriptive error if the operation cannot be applied to the given schema
Validate(ctx context.Context, s *schema.Schema) error
Expand Down
Loading

0 comments on commit 5c1aef2

Please sign in to comment.