Skip to content

Commit

Permalink
Fix deadlock retrier (#117)
Browse files Browse the repository at this point in the history
* pglock: speed up retries on transaction deadlocks

* pglock: upgrade pgx version used for testing
  • Loading branch information
ucirello authored Oct 24, 2024
1 parent d5c1348 commit 1b6d0d1
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 216 deletions.
20 changes: 9 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *Client) AcquireContext(ctx context.Context, name string, opts ...LockOp
if err := ctx.Err(); err != nil {
return nil, ErrNotAcquired
}
err := c.retry(ctx, func() error { return c.tryAcquire(ctx, l) })
err := c.retry(func() error { return c.tryAcquire(ctx, l) })
switch {
case l.failIfLocked && errors.Is(err, ErrNotAcquired):
c.log.Debug("not acquired, exit")
Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *Client) Release(l *Lock) error {
func (c *Client) ReleaseContext(ctx context.Context, l *Lock) error {
l.heartbeatCancel()
l.heartbeatWG.Wait()
err := c.retry(ctx, func() error { return c.storeRelease(ctx, l) })
err := c.retry(func() error { return c.storeRelease(ctx, l) })
return err
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (c *Client) SendHeartbeat(ctx context.Context, l *Lock) error {
if l.isReleased {
return ErrLockAlreadyReleased
}
err := c.retry(ctx, func() error { return c.storeHeartbeat(ctx, l) })
err := c.retry(func() error { return c.storeHeartbeat(ctx, l) })
if err != nil {
l.isReleased = true
return fmt.Errorf("cannot send heartbeat (%v): %w", l.name, err)
Expand Down Expand Up @@ -444,7 +444,7 @@ func (c *Client) GetDataContext(ctx context.Context, name string) ([]byte, error
// holding it first.
func (c *Client) GetContext(ctx context.Context, name string) (*Lock, error) {
var l *Lock
err := c.retry(ctx, func() error {
err := c.retry(func() error {
var err error
l, err = c.getLock(ctx, name)
return err
Expand Down Expand Up @@ -486,19 +486,17 @@ func (c *Client) getNextRVN(ctx context.Context, db *sql.DB) (int64, error) {

const maxRetries = 1024

func (c *Client) retry(ctx context.Context, f func() error) error {
retryPeriod := c.heartbeatFrequency
if retryPeriod == 0 {
retryPeriod = c.leaseDuration
}
func (c *Client) retry(f func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = f()
if failedPrecondition := (&FailedPreconditionError{}); err == nil || !errors.As(err, &failedPrecondition) {
break
}
c.log.Debug("bad transaction, retrying: %v", err)
waitFor(ctx, retryPeriod)
if isContextError(err) {
break
}
}
return err
}
Expand All @@ -511,7 +509,7 @@ func (c *Client) GetAllLocks() ([]*ReadOnlyLock, error) {
// GetAllLocksContext returns all known locks in a read-only fashion.
func (c *Client) GetAllLocksContext(ctx context.Context) ([]*ReadOnlyLock, error) {
var locks []*ReadOnlyLock
err := c.retry(ctx, func() error {
err := c.retry(func() error {
var err error
locks, err = c.getAllLocks(ctx)
return err
Expand Down
4 changes: 2 additions & 2 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRetry(t *testing.T) {
&FailedPreconditionError{errors.New("failed precondition")},
&OtherError{errors.New("other error")},
}
err := c.retry(context.Background(), func() error {
err := c.retry(func() error {
var err error
err, errs = errs[0], errs[1:]
return err
Expand All @@ -78,7 +78,7 @@ func TestRetry(t *testing.T) {
log: &flatLogger{log.New(io.Discard, "", 0)},
}
var retries int
err := c.retry(context.Background(), func() error {
err := c.retry(func() error {
retries++
return &FailedPreconditionError{errors.New("failed precondition")}
})
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"time"

"cirello.io/pglock"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/lib/pq"
"golang.org/x/sync/errgroup"
)
Expand Down
10 changes: 3 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ module cirello.io/pglock

require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/jackc/pgx/v4 v4.18.3
github.com/jackc/pgx/v5 v5.7.1
github.com/lib/pq v1.10.9
golang.org/x/sync v0.8.0
)

require (
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgtype v1.14.4 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/text v0.19.0 // indirect
)
Expand Down
Loading

0 comments on commit 1b6d0d1

Please sign in to comment.