Skip to content

Commit

Permalink
Merge pull request #8439 from heyitsanthony/stm-serialized-snapshot
Browse files Browse the repository at this point in the history
concurrency: retry snapshot serializable stm if writes since first header rev
  • Loading branch information
Anthony Romano authored Aug 23, 2017
2 parents 5c975fd + b206afc commit 8e7a0de
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
2 changes: 1 addition & 1 deletion clientv3/concurrency/example_stm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ExampleSTM_apply() {

// transfer amount
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt-xfer
fromInt, toInt = fromInt-xfer, toInt+xfer

// writeback
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
Expand Down
5 changes: 3 additions & 2 deletions clientv3/concurrency/stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ func (rs readSet) add(keys []string, txnresp *v3.TxnResponse) {
}
}

// first returns the store revision from the first fetch
func (rs readSet) first() int64 {
ret := int64(math.MaxInt64 - 1)
for _, resp := range rs {
if len(resp.Kvs) > 0 && resp.Kvs[0].ModRevision < ret {
ret = resp.Kvs[0].ModRevision
if rev := resp.Header.Revision; rev < ret {
ret = rev
}
}
return ret
Expand Down
36 changes: 35 additions & 1 deletion integration/v3_stm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package integration

import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"

v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"golang.org/x/net/context"
"github.com/coreos/etcd/pkg/testutil"
)

// TestSTMConflict tests that conflicts are retried.
Expand Down Expand Up @@ -253,3 +254,36 @@ func TestSTMApplyOnConcurrentDeletion(t *testing.T) {
t.Fatalf("bad value. got %+v, expected 'bar2' value", resp)
}
}

func TestSTMSerializableSnapshotPut(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.Client(0)
// key with lower create/mod revision than keys being updated
_, err := cli.Put(context.TODO(), "a", "0")
testutil.AssertNil(t, err)

tries := 0
applyf := func(stm concurrency.STM) error {
if tries > 2 {
return fmt.Errorf("too many retries")
}
tries++
stm.Get("a")
stm.Put("b", "1")
return nil
}

iso := concurrency.WithIsolation(concurrency.SerializableSnapshot)
_, err = concurrency.NewSTM(cli, applyf, iso)
testutil.AssertNil(t, err)
_, err = concurrency.NewSTM(cli, applyf, iso)
testutil.AssertNil(t, err)

resp, err := cli.Get(context.TODO(), "b")
testutil.AssertNil(t, err)
if resp.Kvs[0].Version != 2 {
t.Fatalf("bad version. got %+v, expected version 2", resp)
}
}

0 comments on commit 8e7a0de

Please sign in to comment.