Skip to content

Commit

Permalink
Add crdbpgxv5 for supporting github.com/jackc/pgx/v5
Browse files Browse the repository at this point in the history
  • Loading branch information
MoinTom authored and rafiss committed Mar 2, 2023
1 parent c97ddc6 commit 52888c5
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 17 deletions.
4 changes: 3 additions & 1 deletion crdb/crdbpgx/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
`crdbpgx` is a wrapper around the logic for issuing SQL transactions which
performs retries (as required by CockroachDB) when using
[`github.com/jackc/pgx`](https://github.com/jackc/pgx) in standalone-library
mode. pgx versions below v4 are not supported.
mode. pgx versions below v4 are not supported.

Note: use `crdbpgxv5` for `pgx/v5` support

If you're using pgx just as a driver for the standard `database/sql` package,
use the parent `crdb` package instead.
9 changes: 9 additions & 0 deletions crdb/crdbpgxv5/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
`crdbpgxv5` is a wrapper around the logic for issuing SQL transactions which
performs retries (as required by CockroachDB) when using
[`github.com/jackc/pgx`](https://github.com/jackc/pgx) in standalone-library
mode. `crdbpgxv5` only supports `pgx/v5`.

Note: use `crdbpgx` for `pgx/v4` support

If you're using pgx just as a driver for the standard `database/sql` package,
use the parent `crdb` package instead.
65 changes: 65 additions & 0 deletions crdb/crdbpgxv5/pgx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package crdbpgx

import (
"context"
"github.com/jackc/pgx/v5"

"github.com/cockroachdb/cockroach-go/v2/crdb"
)

// ExecuteTx runs fn inside a transaction and retries it as needed. On
// non-retryable failures, the transaction is aborted and rolled back; on
// success, the transaction is committed.
//
// See crdb.ExecuteTx() for more information.
//
// conn can be a pgx.Conn or a pgxpool.Pool.
func ExecuteTx(
ctx context.Context, conn Conn, txOptions pgx.TxOptions, fn func(pgx.Tx) error,
) error {
tx, err := conn.BeginTx(ctx, txOptions)
if err != nil {
return err
}
return crdb.ExecuteInTx(ctx, pgxTxAdapter{tx}, func() error { return fn(tx) })
}

// Conn abstracts pgx transactions creators: pgx.Conn and pgxpool.Pool.
type Conn interface {
Begin(context.Context) (pgx.Tx, error)
BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error)
}

type pgxTxAdapter struct {
tx pgx.Tx
}

var _ crdb.Tx = pgxTxAdapter{}

func (tx pgxTxAdapter) Commit(ctx context.Context) error {
return tx.tx.Commit(ctx)
}

func (tx pgxTxAdapter) Rollback(ctx context.Context) error {
return tx.tx.Rollback(ctx)
}

// Exec is part of the crdb.Tx interface.
func (tx pgxTxAdapter) Exec(ctx context.Context, q string, args ...interface{}) error {
_, err := tx.tx.Exec(ctx, q, args...)
return err
}
100 changes: 100 additions & 0 deletions crdb/crdbpgxv5/pgx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package crdbpgx

import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"testing"

"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/cockroachdb/cockroach-go/v2/testserver"
)

// TestExecuteTx verifies transaction retry using the classic
// example of write skew in bank account balance transfers.
func TestExecuteTx(t *testing.T) {
ts, err := testserver.NewTestServer()
if err != nil {
t.Fatal(err)
}
ctx := context.Background()

pool, err := pgxpool.New(ctx, ts.PGURL().String())
if err != nil {
t.Fatal(err)
}

if err := crdb.ExecuteTxGenericTest(ctx, pgxWriteSkewTest{pool: pool}); err != nil {
t.Fatal(err)
}
}

type pgxWriteSkewTest struct {
pool *pgxpool.Pool
}

func (t pgxWriteSkewTest) Init(ctx context.Context) error {
initStmt := `
CREATE DATABASE d;
CREATE TABLE d.t (acct INT PRIMARY KEY, balance INT);
INSERT INTO d.t (acct, balance) VALUES (1, 100), (2, 100);
`
_, err := t.pool.Exec(ctx, initStmt)
return err
}

var _ crdb.WriteSkewTest = pgxWriteSkewTest{}

// ExecuteTx is part of the crdb.WriteSkewTest interface.
func (t pgxWriteSkewTest) ExecuteTx(ctx context.Context, fn func(tx interface{}) error) error {
return ExecuteTx(ctx, t.pool, pgx.TxOptions{}, func(tx pgx.Tx) error {
return fn(tx)
})
}

// GetBalances is part of the crdb.WriteSkewTest interface.
func (t pgxWriteSkewTest) GetBalances(ctx context.Context, txi interface{}) (int, int, error) {
tx := txi.(pgx.Tx)
var rows pgx.Rows
rows, err := tx.Query(ctx, `SELECT balance FROM d.t WHERE acct IN (1, 2);`)
if err != nil {
return 0, 0, err
}
defer rows.Close()
var bal1, bal2 int
balances := []*int{&bal1, &bal2}
i := 0
for ; rows.Next(); i++ {
if err = rows.Scan(balances[i]); err != nil {
return 0, 0, err
}
}
if i != 2 {
return 0, 0, fmt.Errorf("expected two balances; got %d", i)
}
return bal1, bal2, nil
}

// UpdateBalance is part of the crdb.WriteSkewInterface.
func (t pgxWriteSkewTest) UpdateBalance(
ctx context.Context, txi interface{}, acct, delta int,
) error {
tx := txi.(pgx.Tx)
_, err := tx.Exec(ctx, `UPDATE d.t SET balance=balance+$1 WHERE acct=$2;`, delta, acct)
return err
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ go 1.13
require (
github.com/gofrs/flock v0.8.1
github.com/jackc/pgx/v4 v4.16.1
github.com/jackc/pgx/v5 v5.2.0
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.6
github.com/pkg/errors v0.9.1 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167 // indirect
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a // indirect
github.com/stretchr/testify v1.8.1
gorm.io/driver/postgres v1.3.5
gorm.io/gorm v1.23.5
)

Loading

0 comments on commit 52888c5

Please sign in to comment.