Skip to content

Commit

Permalink
kvserver: prevent follower reads during merge critical state
Browse files Browse the repository at this point in the history
Prior to this commit, during a merge, the RHS leaseholder’s store
could continue broadcasting (actionable) closed timestamp updates
even after it had been subsumed. This allowed the followers to be
able to serve follower reads past the subsumption time of RHS.
Additionally after the merge, if the LHS had a lower closed timestamp
than the RHS, it could allow writes to the keyspan owned by RHS
at timestamps lower than the RHS’s max closed timestamp.

This commit fixes this bug by requiring that the followers catch up
to a LeaseAppliedIndex that belongs to the entry succeeding the
Subsume request.

Fixes #44878

Release note (bug fix): Fixes a bug that allowed follower reads
during the critical state of a merge transaction.

kvserver: improve testing around range merges and closedts interaction

Regression test for #44878

This commits adds a unit test to ensure that, during a range merge,
closed timestamps that are emitted by the RHS leaseholder’s store
cannot be activated by the follower replicas until the merge commits.

Release note: None
  • Loading branch information
aayushshah15 committed Jun 16, 2020
1 parent 9eea0be commit 1afc52b
Show file tree
Hide file tree
Showing 46 changed files with 499 additions and 271 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/addressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ func TestUpdateRangeAddressing(t *testing.T) {
tcsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: actx,
Settings: store.cfg.Settings,
Clock: store.cfg.Clock,
Settings: store.Cfg.Settings,
Clock: store.Cfg.Clock,
Stopper: stopper,
Metrics: kvcoord.MakeTxnMetrics(time.Second),
},
store.TestSender(),
)
db := kv.NewDB(actx, tcsf, store.cfg.Clock)
db := kv.NewDB(actx, tcsf, store.Cfg.Clock)
ctx := context.Background()
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
if err := txn.Run(ctx, b); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -65,6 +66,10 @@ type LocalResult struct {
MaybeGossipNodeLiveness *roachpb.Span
// Call maybeWatchForMerge.
MaybeWatchForMerge bool
// ReleaseFunc, when it's not the empty function, stores the callback returned
// by minprop.Tracker.Track(). It is used to record the LeaseAppliedIndex for
// a SubsumeRequest.
ReleaseFunc closedts.ReleaseFunc

// Metrics contains counters which are to be passed to the
// metrics subsystem.
Expand Down
244 changes: 216 additions & 28 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -52,7 +54,7 @@ func TestClosedTimestampCanServe(t *testing.T) {
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -112,7 +114,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
t.Skip("skipping under race")
}
ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -185,7 +187,7 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) {
t.Skip("https://github.com/cockroachdb/cockroach/issues/50091")

ctx := context.Background()
tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, _, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

Expand Down Expand Up @@ -266,7 +268,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
t.Skip("skipping under race")
}
ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
// Disable the automatic merging.
if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -347,7 +349,7 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, time.Hour)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, time.Hour)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -385,7 +387,7 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) {
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -419,14 +421,14 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) {
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration)
tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp. Wait if necessary.
// Verify that we can serve a follower read at a timestamp. Wait if necessary
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
baRead := makeReadBatchRequestForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
Expand All @@ -448,9 +450,183 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) {
verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2)
}

func TestClosedTimestampsNotActionableDuringMergeCriticalState(t *testing.T) {
defer leaktest.AfterTest(t)()
if util.RaceEnabled {
// Limiting how long transactions can run does not work well with race
// unless we're extremely lenient, which drives up the test duration.
t.Skip("skipping under race")
}

t.Run("without lease transfer", func(t *testing.T) {
verifyClosedTSInactiveDuringMergeCriticalState(t)
})
}

func verifyClosedTSInactiveDuringMergeCriticalState(t *testing.T) {
ctx := context.Background()
mergeTriggerReached := make(chan struct{})
blockMergeTrigger := make(chan struct{})
knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil {
// We block the LHS leaseholder from applying the merge trigger and
// then wait until the RHS follower replicas have completely caught
// up to their leaseholder.
mergeTriggerReached <- struct{}{}
<-blockMergeTrigger
}
}
return nil
},
},
}

tc, db0, desc, repls := setupClusterForClosedTSTestingWithCustomKnobs(ctx, t, testingTargetDuration,
knobs)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil {
t.Fatal(err)
}
// Start by ensuring that the values can be read from all replicas at ts.
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
baRead := makeReadBatchRequestForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2))
})
// Manually split the table to have easier access to descriptors.
tableID, err := getTableID(db0, "cttest", "kv")
if err != nil {
t.Fatalf("failed to lookup ids: %+v", err)
}

idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1)
k, err := sqlbase.EncodeTableKey(idxPrefix, tree.NewDInt(2), encoding.Ascending)
if err != nil {
t.Fatalf("failed to encode split key: %+v", err)
}
tcImpl := tc.(*testcluster.TestCluster)
leftDesc, rightDesc := tcImpl.SplitRangeOrFatal(t, k)
if err := tcImpl.WaitForFullReplication(); err != nil {
t.Fatal(err)
}
rRepls := replsForRange(ctx, t, tc, rightDesc)
g, _ := errgroup.WithContext(context.Background())
defer func() {
// Unblock the leaseholder so it can finally commit the merge.
close(blockMergeTrigger)
if err := g.Wait(); err != nil {
t.Error(err)
}
}()

// Merge the ranges back together. The LHS leaseholder should block right
// before the merge trigger request is sent.
g.Go(func() error {
if _, err := tc.Server(0).MergeRanges(leftDesc.StartKey.AsRawKey()); err != nil {
return err
}
return nil
})

// We now have the partially merged critical state, fully replicated.
<-mergeTriggerReached
target := getCurrentLeaseholder(t, tc, rightDesc)
store := getTargetStoreOrFatal(t, tc, target)
deadline := timeutil.Now().Add(testingTargetDuration * 2)
getCurrentMaxClosed := func() ctpb.Entry {
var maxClosed ctpb.Entry
attempts := 0
for attempts == 0 || timeutil.Now().Before(deadline) {
attempts++
store.Cfg.ClosedTimestamp.Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) {
if _, ok := entry.MLAI[rightDesc.RangeID]; ok {
maxClosed = entry
return true
}
return false
})
if _, ok := maxClosed.MLAI[rightDesc.RangeID]; !ok {
// We ran out of closed timestamps to visit without finding one that
// corresponds to rightDesc. It is likely that no closed timestamps have
// been broadcast for rightDesc yet, try again.
continue
}
return maxClosed
}
t.Fatal("Could not find a non-empty closed timestamp update for RHS range.")
return ctpb.Entry{}
}

// We issue a read request with a timestamp that is one logical tick after
// the current max closed timestamp so we know that this timestamp cannot be
// "active" for follower reads until there is a future closed timestamp update
// that contains an MLAI for the RHS range.
postMergeClosedTS := getCurrentMaxClosed().ClosedTimestamp.Next()
baReadRHSAfterSubsume := makeReadBatchRequestForDesc(rightDesc, postMergeClosedTS)

// Poll the store until we see a closed timestamp update for the RHS range.
for {
maxClosed := getCurrentMaxClosed()
if _, ok := maxClosed.MLAI[rightDesc.RangeID]; ok &&
postMergeClosedTS.LessEq(maxClosed.ClosedTimestamp) {
break
}
}
rightReplFollowers := make([]*kvserver.Replica, 0, 2)
for _, repl := range rRepls {
if repl.StoreID() == target.StoreID && repl.NodeID() == target.NodeID {
continue
}
rightReplFollowers = append(rightReplFollowers, repl)
}
notLeaseholderErrs, err := countNotLeaseHolderErrors(baReadRHSAfterSubsume, rightReplFollowers)
if err != nil {
t.Fatal(err)
}
const expectedNLEs = 2
if notLeaseholderErrs != expectedNLEs {
t.Fatalf("Expected %d NotLeaseHolderErrors, but got %d", expectedNLEs, notLeaseholderErrs)
}
}

func getTargetStoreOrFatal(
t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget,
) (store *kvserver.Store) {
for i := 0; i < tc.NumServers(); i++ {
if server := tc.Server(i); server.NodeID() == target.NodeID &&
server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) {
store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID)
if err != nil {
t.Fatal(err)
}
return store
}
}
t.Fatalf("Could not find store for replication target %+v\n", target)
return nil
}

func verifyNotLeaseHolderErrors(
t *testing.T, ba roachpb.BatchRequest, repls []*kvserver.Replica, expectedNLEs int,
) {
notLeaseholderErrs, err := countNotLeaseHolderErrors(ba, repls)
if err != nil {
t.Fatal(err)
}
if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e {
t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a)
}
}

func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replica) (int64, error) {
g, ctx := errgroup.WithContext(context.Background())
var notLeaseholderErrs int64
for i := range repls {
Expand All @@ -467,11 +643,9 @@ func verifyNotLeaseHolderErrors(
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e {
t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a)
return 0, err
}
return notLeaseholderErrs, nil
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
Expand Down Expand Up @@ -524,30 +698,25 @@ func pickRandomTarget(
}
}

// This function creates a test cluster that is prepared to exercise follower
// reads. The returned test cluster has follower reads enabled using the above
// targetDuration and closeFraction. In addition to the newly minted test
// cluster, this function returns a db handle to node 0, a range descriptor for
// the range used by the table `cttest.kv` and the replica objects corresponding
// to the replicas for the range. It is the caller's responsibility to Stop the
// Stopper on the returned test cluster when done.
func setupTestClusterForClosedTimestampTesting(
ctx context.Context, t *testing.T, targetDuration time.Duration,
// setupClusterForClosedTsTestingWithCustomKnobs creates a test cluster that is
// prepared to exercise follower reads. The returned test cluster has follower
// reads enabled using the given targetDuration and above closeFraction. In
// addition to the newly minted test cluster, this function returns a db handle
// to node 0, a range descriptor for the range used by the table `cttest.kv` and
// the replica objects corresponding to the replicas for the range. It is the
// caller's responsibility to Stop the Stopper on the returned test cluster when
// done.
func setupClusterForClosedTSTestingWithCustomKnobs(
ctx context.Context, t *testing.T, targetDuration time.Duration, knobs base.TestingKnobs,
) (
tc serverutils.TestClusterInterface,
db0 *gosql.DB,
kvTableDesc roachpb.RangeDescriptor,
repls []*kvserver.Replica,
) {

tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
RangeFeedPushTxnsInterval: 10 * time.Millisecond,
RangeFeedPushTxnsAge: 20 * time.Millisecond,
},
},
Knobs: knobs,
},
})
db0 = tc.ServerConn(0)
Expand Down Expand Up @@ -619,6 +788,25 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING);
return tc, db0, desc, repls
}

// setupClusterForClosedTsTesting is like setupClusterForClosedTsTestingWithCustomKnobs
// except the default testing knobs that are passed in.
func setupClusterForClosedTSTesting(
ctx context.Context, t *testing.T, targetDuration time.Duration,
) (
tc serverutils.TestClusterInterface,
db0 *gosql.DB,
kvTableDesc roachpb.RangeDescriptor,
repls []*kvserver.Replica,
) {
defaultTestingKnobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
RangeFeedPushTxnsInterval: 10 * time.Millisecond,
RangeFeedPushTxnsAge: 20 * time.Millisecond,
},
}
return setupClusterForClosedTSTestingWithCustomKnobs(ctx, t, targetDuration, defaultTestingKnobs)
}

type respFunc func(*roachpb.BatchResponse, *roachpb.Error) (shouldRetry bool, err error)

// respFuncs returns a respFunc which is passes its arguments to each passed
Expand Down
Loading

0 comments on commit 1afc52b

Please sign in to comment.