Skip to content

Commit

Permalink
feat: add isolation level option for disabling internal retries (#327)
Browse files Browse the repository at this point in the history
Internal retries of aborted transactions is enabled by default for
database/sql connections for Spanner. However, when an application or
framework knows that it will retry the transaction using a retry-loop,
it is more efficient to disable internal retries.

Some frameworks, such as gorm, however make this hard to achieve, as
they only allow TxOptions to be used to configure the transaction, and
do not give access to the underlying driver or connection.

This change adds a custom isolation level that can be used to disable
internal retries of aborted transactions.
  • Loading branch information
olavloite authored Nov 22, 2024
1 parent b4803a6 commit 118a177
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
25 changes: 24 additions & 1 deletion driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,20 @@ func (c *conn) resetTransactionForRetry(ctx context.Context, errDuringCommit boo
return c.tx.resetForRetry(ctx)
}

type spannerIsolationLevel sql.IsolationLevel

const (
levelNone spannerIsolationLevel = iota
levelDisableRetryAborts
)

// WithDisableRetryAborts returns a specific Spanner isolation level that contains
// both the given standard isolation level and a custom Spanner isolation level that
// disables internal retries for aborted transactions for a single transaction.
func WithDisableRetryAborts(level sql.IsolationLevel) sql.IsolationLevel {
return sql.IsolationLevel(levelDisableRetryAborts)<<8 + level
}

func (c *conn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
}
Expand All @@ -1231,6 +1245,15 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
if c.inBatch() {
return nil, status.Error(codes.FailedPrecondition, "This connection has an active batch. Run or abort the batch before starting a new transaction.")
}
disableRetryAborts := false
sil := opts.Isolation >> 8
opts.Isolation = opts.Isolation - sil
if sil > 0 {
switch spannerIsolationLevel(sil) {
case levelDisableRetryAborts:
disableRetryAborts = true
}
}

if opts.ReadOnly {
ro := c.client.ReadOnlyTransaction().WithTimestampBound(c.readOnlyStaleness)
Expand Down Expand Up @@ -1259,7 +1282,7 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
c.commitTs = commitTs
}
},
retryAborts: c.retryAborts,
retryAborts: c.retryAborts && !disableRetryAborts,
}
c.commitTs = nil
return c.tx, nil
Expand Down
72 changes: 72 additions & 0 deletions driver_with_mockserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,78 @@ func TestRunTransactionCommitError(t *testing.T) {
}
}

func TestTransactionWithLevelDisableRetryAborts(t *testing.T) {
t.Parallel()

ctx := context.Background()
db, server, teardown := setupTestDBConnection(t)
defer teardown()

tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: WithDisableRetryAborts(sql.LevelSerializable)})
if err != nil {
t.Fatal(err)
}
rows, err := tx.Query(testutil.SelectFooFromBar)
if err != nil {
t.Fatal(err)
}
defer rows.Close()
for want := int64(1); rows.Next(); want++ {
cols, err := rows.Columns()
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(cols, []string{"FOO"}) {
t.Fatalf("cols mismatch\nGot: %v\nWant: %v", cols, []string{"FOO"})
}
var got int64
err = rows.Scan(&got)
if err != nil {
t.Fatal(err)
}
if got != want {
t.Fatalf("value mismatch\nGot: %v\nWant: %v", got, want)
}
}
if err := rows.Err(); err != nil {
t.Fatal(err)
}
// Simulate that the transaction was aborted.
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
Errors: []error{gstatus.Error(codes.Aborted, "Aborted")},
})
// Committing the transaction should fail, as we have disabled internal retries.
err = tx.Commit()
if err == nil {
t.Fatal("missing aborted error after commit")
}
code := spanner.ErrCode(err)
if w, g := code, codes.Aborted; w != g {
t.Fatalf("error code mismatch\n Got: %v\nWant: %v", g, w)
}

requests := drainRequestsFromServer(server.TestSpanner)
sqlRequests := requestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
if g, w := len(sqlRequests), 1; g != w {
t.Fatalf("ExecuteSqlRequests count mismatch\nGot: %v\nWant: %v", g, w)
}
req := sqlRequests[0].(*sppb.ExecuteSqlRequest)
if req.Transaction == nil {
t.Fatalf("missing transaction for ExecuteSqlRequest")
}
if req.Transaction.GetId() == nil {
t.Fatalf("missing id selector for ExecuteSqlRequest")
}
commitRequests := requestsOfType(requests, reflect.TypeOf(&sppb.CommitRequest{}))
if g, w := len(commitRequests), 1; g != w {
t.Fatalf("commit requests count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReq := commitRequests[0].(*sppb.CommitRequest)
if c, e := commitReq.GetTransactionId(), req.Transaction.GetId(); !cmp.Equal(c, e) {
t.Fatalf("transaction id mismatch\nCommit: %c\nExecute: %v", c, e)
}
}

func numeric(v string) big.Rat {
res, _ := big.NewRat(1, 1).SetString(v)
return *res
Expand Down

0 comments on commit 118a177

Please sign in to comment.