Skip to content

Commit

Permalink
kvserver: allow historical reads on subsumed ranges
Browse files Browse the repository at this point in the history
Currently, we block all requests on the RHS of a merge after it is
subsumed. While we need to block all write & admin requests, we need
only block "recent" read requests. In particular, if a read request for
the RHS is old enough that its freeze timestamp is outside the request's
uncertainty window, it is safe to let it through. This makes range
merges a little bit less disruptive to foreground traffic.

Release note: None
  • Loading branch information
aayushshah15 committed Aug 5, 2020
1 parent 71e1113 commit 5b8252b
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,6 @@ func Subsume(
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()

return result.Result{
Local: result.LocalResult{MaybeWatchForMerge: true},
Local: result.LocalResult{FreezeStart: reply.FreezeStart},
}, nil
}
49 changes: 29 additions & 20 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
Expand Down Expand Up @@ -63,8 +64,10 @@ type LocalResult struct {
MaybeAddToSplitQueue bool
// Call MaybeGossipNodeLiveness with the specified Span, if set.
MaybeGossipNodeLiveness *roachpb.Span
// Call maybeWatchForMerge.
MaybeWatchForMerge bool
// FreezeStart indicates the high water mark timestamp beyond which the range
// is guaranteed to not have served any requests. This value is only set when
// a range merge is in progress. If set, call maybeWatchForMerge.
FreezeStart hlc.Timestamp

// Metrics contains counters which are to be passed to the
// metrics subsystem.
Expand All @@ -84,7 +87,7 @@ func (lResult *LocalResult) IsZero() bool {
!lResult.MaybeGossipSystemConfig &&
!lResult.MaybeGossipSystemConfigIfHaveFailure &&
lResult.MaybeGossipNodeLiveness == nil &&
!lResult.MaybeWatchForMerge &&
lResult.FreezeStart.IsEmpty() &&
lResult.Metrics == nil
}

Expand All @@ -97,13 +100,13 @@ func (lResult *LocalResult) String() string {
"#updated txns: %d #end txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t "+
"MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
"MaybeGossipNodeLiveness:%s FreezeStart:%s",
lResult.Reply,
len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks),
len(lResult.UpdatedTxns), len(lResult.EndTxns),
lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig,
lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
lResult.MaybeGossipNodeLiveness, lResult.FreezeStart)
}

// DetachEncounteredIntents returns (and removes) those encountered
Expand Down Expand Up @@ -187,32 +190,32 @@ func coalesceBool(lhs *bool, rhs *bool) {
func (p *Result) MergeAndDestroy(q Result) error {
if q.Replicated.State != nil {
if q.Replicated.State.RaftAppliedIndex != 0 {
return errors.New("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify RaftApplyIndex")
}
if q.Replicated.State.LeaseAppliedIndex != 0 {
return errors.New("must not specify RaftApplyIndex")
return errors.AssertionFailedf("must not specify RaftApplyIndex")
}
if p.Replicated.State == nil {
p.Replicated.State = &kvserverpb.ReplicaState{}
}
if p.Replicated.State.Desc == nil {
p.Replicated.State.Desc = q.Replicated.State.Desc
} else if q.Replicated.State.Desc != nil {
return errors.New("conflicting RangeDescriptor")
return errors.AssertionFailedf("conflicting RangeDescriptor")
}
q.Replicated.State.Desc = nil

if p.Replicated.State.Lease == nil {
p.Replicated.State.Lease = q.Replicated.State.Lease
} else if q.Replicated.State.Lease != nil {
return errors.New("conflicting Lease")
return errors.AssertionFailedf("conflicting Lease")
}
q.Replicated.State.Lease = nil

if p.Replicated.State.TruncatedState == nil {
p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState
} else if q.Replicated.State.TruncatedState != nil {
return errors.New("conflicting TruncatedState")
return errors.AssertionFailedf("conflicting TruncatedState")
}
q.Replicated.State.TruncatedState = nil

Expand All @@ -226,7 +229,7 @@ func (p *Result) MergeAndDestroy(q Result) error {
}

if q.Replicated.State.Stats != nil {
return errors.New("must not specify Stats")
return errors.AssertionFailedf("must not specify Stats")
}
if (*q.Replicated.State != kvserverpb.ReplicaState{}) {
log.Fatalf(context.TODO(), "unhandled EvalResult: %s",
Expand All @@ -238,42 +241,42 @@ func (p *Result) MergeAndDestroy(q Result) error {
if p.Replicated.Split == nil {
p.Replicated.Split = q.Replicated.Split
} else if q.Replicated.Split != nil {
return errors.New("conflicting Split")
return errors.AssertionFailedf("conflicting Split")
}
q.Replicated.Split = nil

if p.Replicated.Merge == nil {
p.Replicated.Merge = q.Replicated.Merge
} else if q.Replicated.Merge != nil {
return errors.New("conflicting Merge")
return errors.AssertionFailedf("conflicting Merge")
}
q.Replicated.Merge = nil

if p.Replicated.ChangeReplicas == nil {
p.Replicated.ChangeReplicas = q.Replicated.ChangeReplicas
} else if q.Replicated.ChangeReplicas != nil {
return errors.New("conflicting ChangeReplicas")
return errors.AssertionFailedf("conflicting ChangeReplicas")
}
q.Replicated.ChangeReplicas = nil

if p.Replicated.ComputeChecksum == nil {
p.Replicated.ComputeChecksum = q.Replicated.ComputeChecksum
} else if q.Replicated.ComputeChecksum != nil {
return errors.New("conflicting ComputeChecksum")
return errors.AssertionFailedf("conflicting ComputeChecksum")
}
q.Replicated.ComputeChecksum = nil

if p.Replicated.RaftLogDelta == 0 {
p.Replicated.RaftLogDelta = q.Replicated.RaftLogDelta
} else if q.Replicated.RaftLogDelta != 0 {
return errors.New("conflicting RaftLogDelta")
return errors.AssertionFailedf("conflicting RaftLogDelta")
}
q.Replicated.RaftLogDelta = 0

if p.Replicated.AddSSTable == nil {
p.Replicated.AddSSTable = q.Replicated.AddSSTable
} else if q.Replicated.AddSSTable != nil {
return errors.New("conflicting AddSSTable")
return errors.AssertionFailedf("conflicting AddSSTable")
}
q.Replicated.AddSSTable = nil

Expand All @@ -289,7 +292,7 @@ func (p *Result) MergeAndDestroy(q Result) error {
if p.Replicated.PrevLeaseProposal == nil {
p.Replicated.PrevLeaseProposal = q.Replicated.PrevLeaseProposal
} else if q.Replicated.PrevLeaseProposal != nil {
return errors.New("conflicting lease expiration")
return errors.AssertionFailedf("conflicting lease expiration")
}
q.Replicated.PrevLeaseProposal = nil

Expand Down Expand Up @@ -331,15 +334,21 @@ func (p *Result) MergeAndDestroy(q Result) error {
if p.Local.MaybeGossipNodeLiveness == nil {
p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness
} else if q.Local.MaybeGossipNodeLiveness != nil {
return errors.New("conflicting MaybeGossipNodeLiveness")
return errors.AssertionFailedf("conflicting MaybeGossipNodeLiveness")
}
q.Local.MaybeGossipNodeLiveness = nil

if p.Local.FreezeStart.IsEmpty() {
p.Local.FreezeStart = q.Local.FreezeStart
} else if !q.Local.FreezeStart.IsEmpty() {
return errors.AssertionFailedf("conflicting FreezeStart")
}
q.Local.FreezeStart = hlc.Timestamp{}

coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange)
coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig)
coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure)
coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue)
coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge)

if p.Local.Metrics == nil {
p.Local.Metrics = q.Local.Metrics
Expand Down
169 changes: 169 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3676,6 +3676,175 @@ func TestInvalidSubsumeRequest(t *testing.T) {
}
}

// TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range
// can only serve read-only traffic for timestamps that precede the subsumption
// time, but don't contain the subsumption time in their uncertainty interval.
func TestHistoricalReadsAfterSubsume(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

maxOffset := 100 * time.Millisecond
send := func(store *kvserver.Store,
desc *roachpb.RangeDescriptor,
ts hlc.Timestamp,
args roachpb.Request) error {
txn := roachpb.MakeTransaction("test txn", desc.StartKey.AsRawKey(),
0, ts, maxOffset.Nanoseconds())
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: &txn}, args)
return pErr.GoError()
}
checkRangeNotFound := func(err error) error {
if err == nil {
return errors.Newf("expected RangeNotFoundError, got nil")
}
if _, ok := err.(*roachpb.RangeNotFoundError); !ok {
return err
}
return nil
}
preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp {
return hlc.Timestamp{
WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1,
Logical: ts.Logical,
}
}

type testCase struct {
name string
queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp
queryArgsFunc func(key roachpb.Key) roachpb.Request
shouldBlock bool
}

tests := []testCase{
// Ensure that a read query for a timestamp older than freezeStart-MaxOffset
// is let through.
{
name: "historical read",
queryTsFunc: preUncertaintyTs,
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
return getArgs(key)
},
shouldBlock: false,
},
// Write queries for the same historical timestamp should block (and then
// eventually fail because the range no longer exists).
{
name: "historical write",
queryTsFunc: preUncertaintyTs,
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
return putArgs(key, []byte(`test value`))
},
shouldBlock: true,
},
// Read queries that contain the subsumption time in its uncertainty interval
// should block and eventually fail.
{
name: "historical read with uncertainty",
queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp {
return freezeStart.Prev()
},
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
return getArgs(key)
},
shouldBlock: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc :=
setupClusterWithSubsumedRange(ctx, t, maxOffset)
defer tc.Stopper().Stop(ctx)
errCh := make(chan error)
go func() {
errCh <- send(store, rhsDesc, test.queryTsFunc(freezeStart),
test.queryArgsFunc(rhsDesc.StartKey.AsRawKey()))
}()
if test.shouldBlock {
waitForBlocked()
// Requests that wait for the merge must fail with a RangeNotFoundError
// because the RHS should cease to exist once the merge completes.
cleanupFunc()
require.NoError(t, checkRangeNotFound(<-errCh))
} else {
require.NoError(t, <-errCh)
// We cleanup *after* the non-blocking read request succeeds to prevent
// it from racing with the merge commit trigger.
cleanupFunc()
}
})
}
}

// setupClusterWithSubsumedRange returns a TestCluster during an ongoing merge
// transaction, such that the merge has been suspended right before the merge
// trigger is evaluated. This leaves the right hand side range of the merge in
// its subsumed state. It is the responsibility of the caller to call
// `cleanupFunc` to unblock the merge and Stop() the tc's Stopper when done.
func setupClusterWithSubsumedRange(
ctx context.Context, t *testing.T, testMaxOffset time.Duration,
) (
tc serverutils.TestClusterInterface,
store *kvserver.Store,
rhsDesc *roachpb.RangeDescriptor,
freezeStart hlc.Timestamp,
waitForBlocked func(),
cleanupFunc func(),
) {
state := mergeFilterState{
blockMergeTrigger: make(chan hlc.Timestamp),
finishMergeTxn: make(chan struct{}),
}
var blockedRequestCount int32
clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
MaxOffset: testMaxOffset,
TestingRequestFilter: state.suspendMergeTrigger,
TestingConcurrencyRetryFilter: func(ctx context.Context,
ba roachpb.BatchRequest,
pErr *roachpb.Error) {
if _, ok := pErr.GetDetail().(*roachpb.MergeInProgressError); ok {
atomic.AddInt32(&blockedRequestCount, 1)
}
},
},
},
},
}
tc = serverutils.StartTestCluster(t, 1, clusterArgs)
ts := tc.Server(0)
stores, _ := ts.GetStores().(*kvserver.Stores)
store, err := stores.GetStore(ts.GetFirstStoreID())
require.NoError(t, err)
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store)
require.NoError(t, err)
mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
errCh := make(chan error)
go func() {
_, err := kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
errCh <- err.GoError()
}()
freezeStart = <-state.blockMergeTrigger
cleanupFunc = func() {
// Let the merge commit.
close(state.finishMergeTxn)
require.NoError(t, <-errCh)
}
waitForBlocked = func() {
testutils.SucceedsSoon(t, func() error {
if actualBlocked := atomic.LoadInt32(&blockedRequestCount); actualBlocked != 1 {
return errors.Newf("expected 1 blocked request but found %d", actualBlocked)
}
return nil
})
}
return tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc
}

func BenchmarkStoreRangeMerge(b *testing.B) {
ctx := context.Background()
var mtc multiTestContext
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (f *FilterArgs) InRaftCmd() bool {
// processing or non-nil to terminate processing with the returned error.
type ReplicaRequestFilter func(context.Context, roachpb.BatchRequest) *roachpb.Error

// ReplicaConcurrencyRetryFilter can be used to examine a concurrency retry
// error before it is handled and its batch is re-evaluated.
type ReplicaConcurrencyRetryFilter func(context.Context, roachpb.BatchRequest, *roachpb.Error)

// ReplicaCommandFilter may be used in tests through the StoreTestingKnobs to
// intercept the handling of commands and artificially generate errors. Return
// nil to continue with regular processing or non-nil to terminate processing
Expand Down
Loading

0 comments on commit 5b8252b

Please sign in to comment.