diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index b0f6d0246f8e..1d8f2daaf963 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -173,6 +173,6 @@ func Subsume( reply.FreezeStart = cArgs.EvalCtx.Clock().Now() return result.Result{ - Local: result.LocalResult{MaybeWatchForMerge: true}, + Local: result.LocalResult{FreezeStart: reply.FreezeStart}, }, nil } diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 0eb3b9f82226..fbb64b21d378 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/kr/pretty" @@ -63,8 +64,10 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // Call maybeWatchForMerge. - MaybeWatchForMerge bool + // FreezeStart indicates the high water mark timestamp beyond which the range + // is guaranteed to not have served any requests. This value is only set when + // a range merge is in progress. If set, call maybeWatchForMerge. + FreezeStart hlc.Timestamp // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -84,7 +87,7 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - !lResult.MaybeWatchForMerge && + lResult.FreezeStart.IsEmpty() && lResult.Metrics == nil } @@ -97,13 +100,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + "MaybeGossipNodeLiveness:%s FreezeStart:%s", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) + lResult.MaybeGossipNodeLiveness, lResult.FreezeStart) } // DetachEncounteredIntents returns (and removes) those encountered @@ -187,10 +190,10 @@ func coalesceBool(lhs *bool, rhs *bool) { func (p *Result) MergeAndDestroy(q Result) error { if q.Replicated.State != nil { if q.Replicated.State.RaftAppliedIndex != 0 { - return errors.New("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify RaftApplyIndex") } if q.Replicated.State.LeaseAppliedIndex != 0 { - return errors.New("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify RaftApplyIndex") } if p.Replicated.State == nil { p.Replicated.State = &kvserverpb.ReplicaState{} @@ -198,21 +201,21 @@ func (p *Result) MergeAndDestroy(q Result) error { if p.Replicated.State.Desc == nil { p.Replicated.State.Desc = q.Replicated.State.Desc } else if q.Replicated.State.Desc != nil { - return errors.New("conflicting RangeDescriptor") + return errors.AssertionFailedf("conflicting RangeDescriptor") } q.Replicated.State.Desc = nil if p.Replicated.State.Lease == nil { p.Replicated.State.Lease = q.Replicated.State.Lease } else if q.Replicated.State.Lease != nil { - return errors.New("conflicting Lease") + return errors.AssertionFailedf("conflicting Lease") } q.Replicated.State.Lease = nil if p.Replicated.State.TruncatedState == nil { p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState } else if q.Replicated.State.TruncatedState != nil { - return errors.New("conflicting TruncatedState") + return errors.AssertionFailedf("conflicting TruncatedState") } q.Replicated.State.TruncatedState = nil @@ -226,7 +229,7 @@ func (p *Result) MergeAndDestroy(q Result) error { } if q.Replicated.State.Stats != nil { - return errors.New("must not specify Stats") + return errors.AssertionFailedf("must not specify Stats") } if (*q.Replicated.State != kvserverpb.ReplicaState{}) { log.Fatalf(context.TODO(), "unhandled EvalResult: %s", @@ -238,42 +241,42 @@ func (p *Result) MergeAndDestroy(q Result) error { if p.Replicated.Split == nil { p.Replicated.Split = q.Replicated.Split } else if q.Replicated.Split != nil { - return errors.New("conflicting Split") + return errors.AssertionFailedf("conflicting Split") } q.Replicated.Split = nil if p.Replicated.Merge == nil { p.Replicated.Merge = q.Replicated.Merge } else if q.Replicated.Merge != nil { - return errors.New("conflicting Merge") + return errors.AssertionFailedf("conflicting Merge") } q.Replicated.Merge = nil if p.Replicated.ChangeReplicas == nil { p.Replicated.ChangeReplicas = q.Replicated.ChangeReplicas } else if q.Replicated.ChangeReplicas != nil { - return errors.New("conflicting ChangeReplicas") + return errors.AssertionFailedf("conflicting ChangeReplicas") } q.Replicated.ChangeReplicas = nil if p.Replicated.ComputeChecksum == nil { p.Replicated.ComputeChecksum = q.Replicated.ComputeChecksum } else if q.Replicated.ComputeChecksum != nil { - return errors.New("conflicting ComputeChecksum") + return errors.AssertionFailedf("conflicting ComputeChecksum") } q.Replicated.ComputeChecksum = nil if p.Replicated.RaftLogDelta == 0 { p.Replicated.RaftLogDelta = q.Replicated.RaftLogDelta } else if q.Replicated.RaftLogDelta != 0 { - return errors.New("conflicting RaftLogDelta") + return errors.AssertionFailedf("conflicting RaftLogDelta") } q.Replicated.RaftLogDelta = 0 if p.Replicated.AddSSTable == nil { p.Replicated.AddSSTable = q.Replicated.AddSSTable } else if q.Replicated.AddSSTable != nil { - return errors.New("conflicting AddSSTable") + return errors.AssertionFailedf("conflicting AddSSTable") } q.Replicated.AddSSTable = nil @@ -289,7 +292,7 @@ func (p *Result) MergeAndDestroy(q Result) error { if p.Replicated.PrevLeaseProposal == nil { p.Replicated.PrevLeaseProposal = q.Replicated.PrevLeaseProposal } else if q.Replicated.PrevLeaseProposal != nil { - return errors.New("conflicting lease expiration") + return errors.AssertionFailedf("conflicting lease expiration") } q.Replicated.PrevLeaseProposal = nil @@ -331,15 +334,21 @@ func (p *Result) MergeAndDestroy(q Result) error { if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { - return errors.New("conflicting MaybeGossipNodeLiveness") + return errors.AssertionFailedf("conflicting MaybeGossipNodeLiveness") } q.Local.MaybeGossipNodeLiveness = nil + if p.Local.FreezeStart.IsEmpty() { + p.Local.FreezeStart = q.Local.FreezeStart + } else if !q.Local.FreezeStart.IsEmpty() { + return errors.AssertionFailedf("conflicting FreezeStart") + } + q.Local.FreezeStart = hlc.Timestamp{} + coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange) coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue) - coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge) if p.Local.Metrics == nil { p.Local.Metrics = q.Local.Metrics diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 0fda10e841b4..0c31a3bd62b2 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3676,6 +3676,175 @@ func TestInvalidSubsumeRequest(t *testing.T) { } } +// TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range +// can only serve read-only traffic for timestamps that precede the subsumption +// time, but don't contain the subsumption time in their uncertainty interval. +func TestHistoricalReadsAfterSubsume(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + maxOffset := 100 * time.Millisecond + send := func(store *kvserver.Store, + desc *roachpb.RangeDescriptor, + ts hlc.Timestamp, + args roachpb.Request) error { + txn := roachpb.MakeTransaction("test txn", desc.StartKey.AsRawKey(), + 0, ts, maxOffset.Nanoseconds()) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: &txn}, args) + return pErr.GoError() + } + checkRangeNotFound := func(err error) error { + if err == nil { + return errors.Newf("expected RangeNotFoundError, got nil") + } + if _, ok := err.(*roachpb.RangeNotFoundError); !ok { + return err + } + return nil + } + preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1, + Logical: ts.Logical, + } + } + + type testCase struct { + name string + queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp + queryArgsFunc func(key roachpb.Key) roachpb.Request + shouldBlock bool + } + + tests := []testCase{ + // Ensure that a read query for a timestamp older than freezeStart-MaxOffset + // is let through. + { + name: "historical read", + queryTsFunc: preUncertaintyTs, + queryArgsFunc: func(key roachpb.Key) roachpb.Request { + return getArgs(key) + }, + shouldBlock: false, + }, + // Write queries for the same historical timestamp should block (and then + // eventually fail because the range no longer exists). + { + name: "historical write", + queryTsFunc: preUncertaintyTs, + queryArgsFunc: func(key roachpb.Key) roachpb.Request { + return putArgs(key, []byte(`test value`)) + }, + shouldBlock: true, + }, + // Read queries that contain the subsumption time in its uncertainty interval + // should block and eventually fail. + { + name: "historical read with uncertainty", + queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp { + return freezeStart.Prev() + }, + queryArgsFunc: func(key roachpb.Key) roachpb.Request { + return getArgs(key) + }, + shouldBlock: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc := + setupClusterWithSubsumedRange(ctx, t, maxOffset) + defer tc.Stopper().Stop(ctx) + errCh := make(chan error) + go func() { + errCh <- send(store, rhsDesc, test.queryTsFunc(freezeStart), + test.queryArgsFunc(rhsDesc.StartKey.AsRawKey())) + }() + if test.shouldBlock { + waitForBlocked() + // Requests that wait for the merge must fail with a RangeNotFoundError + // because the RHS should cease to exist once the merge completes. + cleanupFunc() + require.NoError(t, checkRangeNotFound(<-errCh)) + } else { + require.NoError(t, <-errCh) + // We cleanup *after* the non-blocking read request succeeds to prevent + // it from racing with the merge commit trigger. + cleanupFunc() + } + }) + } +} + +// setupClusterWithSubsumedRange returns a TestCluster during an ongoing merge +// transaction, such that the merge has been suspended right before the merge +// trigger is evaluated. This leaves the right hand side range of the merge in +// its subsumed state. It is the responsibility of the caller to call +// `cleanupFunc` to unblock the merge and Stop() the tc's Stopper when done. +func setupClusterWithSubsumedRange( + ctx context.Context, t *testing.T, testMaxOffset time.Duration, +) ( + tc serverutils.TestClusterInterface, + store *kvserver.Store, + rhsDesc *roachpb.RangeDescriptor, + freezeStart hlc.Timestamp, + waitForBlocked func(), + cleanupFunc func(), +) { + state := mergeFilterState{ + blockMergeTrigger: make(chan hlc.Timestamp), + finishMergeTxn: make(chan struct{}), + } + var blockedRequestCount int32 + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + MaxOffset: testMaxOffset, + TestingRequestFilter: state.suspendMergeTrigger, + TestingConcurrencyRetryFilter: func(ctx context.Context, + ba roachpb.BatchRequest, + pErr *roachpb.Error) { + if _, ok := pErr.GetDetail().(*roachpb.MergeInProgressError); ok { + atomic.AddInt32(&blockedRequestCount, 1) + } + }, + }, + }, + }, + } + tc = serverutils.StartTestCluster(t, 1, clusterArgs) + ts := tc.Server(0) + stores, _ := ts.GetStores().(*kvserver.Stores) + store, err := stores.GetStore(ts.GetFirstStoreID()) + require.NoError(t, err) + lhsDesc, rhsDesc, err := createSplitRanges(ctx, store) + require.NoError(t, err) + mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + errCh := make(chan error) + go func() { + _, err := kv.SendWrapped(ctx, store.TestSender(), mergeArgs) + errCh <- err.GoError() + }() + freezeStart = <-state.blockMergeTrigger + cleanupFunc = func() { + // Let the merge commit. + close(state.finishMergeTxn) + require.NoError(t, <-errCh) + } + waitForBlocked = func() { + testutils.SucceedsSoon(t, func() error { + if actualBlocked := atomic.LoadInt32(&blockedRequestCount); actualBlocked != 1 { + return errors.Newf("expected 1 blocked request but found %d", actualBlocked) + } + return nil + }) + } + return tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc +} + func BenchmarkStoreRangeMerge(b *testing.B) { ctx := context.Background() var mtc multiTestContext diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 2867b5e72492..5f9e394d3bc7 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -80,6 +80,10 @@ func (f *FilterArgs) InRaftCmd() bool { // processing or non-nil to terminate processing with the returned error. type ReplicaRequestFilter func(context.Context, roachpb.BatchRequest) *roachpb.Error +// ReplicaConcurrencyRetryFilter can be used to examine a concurrency retry +// error before it is handled and its batch is re-evaluated. +type ReplicaConcurrencyRetryFilter func(context.Context, roachpb.BatchRequest, *roachpb.Error) + // ReplicaCommandFilter may be used in tests through the StoreTestingKnobs to // intercept the handling of commands and artificially generate errors. Return // nil to continue with regular processing or non-nil to terminate processing diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 8374d718196f..6d777baa065a 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -260,6 +260,11 @@ type Replica struct { // requests should be held until the completion of the merge is signaled by // the closing of the channel. mergeComplete chan struct{} + // freezeStart indicates the subsumption time of this range when it is the + // right-hand range in an ongoing merge. This range will allow read-only + // traffic below this timestamp, while blocking everything else, until the + // merge completes. + freezeStart hlc.Timestamp // The state of the Raft state machine. state kvserverpb.ReplicaState // Last index/term persisted to the raft log (not necessarily @@ -899,6 +904,12 @@ func (r *Replica) mergeInProgressRLocked() bool { return r.mu.mergeComplete != nil } +func (r *Replica) getFreezeStart() hlc.Timestamp { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mu.freezeStart +} + // setLastReplicaDescriptors sets the the most recently seen replica // descriptors to those contained in the *RaftMessageRequest, acquiring r.mu // to do so. @@ -1121,7 +1132,7 @@ func (r *Replica) checkExecutionCanProceed( ba.EarliestActiveTimestamp(), st, ba.IsAdmin(), ); err != nil { return err - } else if g.HoldingLatches() && st != nil { + } else if g.HoldingLatches() && st.State == kvserverpb.LeaseState_VALID { // Only check for a pending merge if latches are held and the Range // lease is held by this Replica. Without both of these conditions, // checkForPendingMergeRLocked could return false negatives. @@ -1196,9 +1207,47 @@ func (r *Replica) checkForPendingMergeRLocked(ba *roachpb.BatchRequest) error { if ba.IsSingleSubsumeRequest() { return nil } - // The replica is being merged into its left-hand neighbor. This request - // cannot proceed until the merge completes, signaled by the closing of the - // channel. + + // The range is being merged into its left-hand neighbor. + if ba.IsReadOnly() { + freezeStart := r.getFreezeStart() + ts := ba.Timestamp + if ba.Txn != nil { + ts.Forward(ba.Txn.MaxTimestamp) + } + if ts.Less(freezeStart) { + // When the max timestamp of a read request is less than the subsumption + // time recorded by this Range (freezeStart), we're guaranteed that none + // of the writes accepted by the leaseholder for the keyspan (which could + // be a part of the subsuming range if the merge succeeded, or part of + // this range if it didn't) for timestamps after the subsumption timestamp + // could have causally preceded the current request. Letting such requests + // go through does not violate any of the invariants guaranteed by + // Subsume(). + // + // NB: It would be incorrect to serve this read request if freezeStart + // were in its uncertainty window. For the sake of contradiction, consider + // the following scenario, if such a request were allowed to proceed: + // 1. This range gets subsumed, `maybeWatchForMerge` is called and the + // `mergeCompleteCh` channel is set up. + // 2. A read request *that succeeds the subsumption in real time* comes in + // for a timestamp that contains `freezeStart` in its uncertainty interval + // before the `mergeCompleteCh` channel is removed. Let's say the read + // timestamp of this request is X (with X <= freezeStart), and let's + // denote its uncertainty interval by [X, Y). + // 3. By the time this request reaches `checkForPendingMergeRLocked`, the + // merge has committed so all subsequent requests are directed to the + // leaseholder of the (subsuming) left-hand range but this pre-merge range + // hasn't been destroyed yet. + // 4. If the (post-merge) left-hand side leaseholder had accepted any new + // writes with timestamps in the window [freezeStart, Y), we would + // potentially have a stale read, as any of the writes in this window could + // have causally preceded the aforementioned read. + return nil + } + } + // This request cannot proceed until the merge completes, signaled by the + // closing of the channel. // // It is very important that this check occur after we have acquired latches // from the spanlatch manager. Only after we release these latches are we @@ -1341,9 +1390,10 @@ func (ec *endCmds) done( } // maybeWatchForMerge checks whether a merge of this replica into its left -// neighbor is in its critical phase and, if so, arranges to block all requests -// until the merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context) error { +// neighbor is in its critical phase and, if so, arranges to block all requests, +// except for read-only requests that are older than `freezeStart`, until the +// merge completes. +func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timestamp) error { desc := r.Desc() descKey := keys.RangeDescriptorKey(desc.StartKey) _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), @@ -1375,6 +1425,12 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { r.mu.Unlock() return nil } + // Note that if the merge txn retries for any reason (for example, if the + // left-hand side range undergoes a lease transfer before the merge + // completes), the right-hand side range will get re-subsumed. This will + // lead to `freezeStart` being overwritten with the new subsumption time. + // This is fine. + r.mu.freezeStart = freezeStart r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete // channel is installed. (If the RHS is quiescent when the merge commits, any @@ -1494,6 +1550,7 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { // error. If the merge aborted, the requests will be handled normally. r.mu.mergeComplete = nil close(mergeCompleteCh) + r.mu.freezeStart.Reset() r.mu.Unlock() r.raftMu.Unlock() }) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 319f25872259..9628d0e4befe 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -338,7 +338,13 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - if err := r.maybeWatchForMerge(ctx); err != nil { + // NB: If the subsumed range changes leaseholders after subsumption, + // `freezeStart` will be zero and we will effectively be blocking all read + // requests. + // TODO(aayush): In the future, if we permit co-operative lease transfers + // when a range is subsumed, it should be relatively straightforward to + // allow historical reads on the subsumed RHS after such lease transfers. + if err := r.maybeWatchForMerge(ctx, hlc.Timestamp{} /* freezeStart */); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -610,8 +616,8 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if lResult.MaybeWatchForMerge { - log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") + if !lResult.FreezeStart.IsEmpty() { + log.Fatalf(ctx, "LocalEvalResult.FreezeStart should have been handled and reset: %s", lResult.FreezeStart) } if lResult.AcquiredLocks != nil { diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 0527f2cc5136..3a8ff19c16d2 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/kr/pretty" ) @@ -165,14 +166,14 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if lResult.MaybeWatchForMerge { - // A merge is (likely) about to be carried out, and this replica needs - // to block all traffic until the merge either commits or aborts. See + if !lResult.FreezeStart.IsEmpty() { + // A merge is (likely) about to be carried out, and this replica needs to + // block all non-read traffic until the merge either commits or aborts. See // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx); err != nil { + if err := r.maybeWatchForMerge(ctx, lResult.FreezeStart); err != nil { return roachpb.NewError(err) } - lResult.MaybeWatchForMerge = false + lResult.FreezeStart = hlc.Timestamp{} } if !lResult.IsZero() { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 3341dbcb71ae..69dd78d9d90c 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -312,6 +312,9 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // error. It must have also handed back ownership of the concurrency // guard without having already released the guard's latches. g.AssertLatches() + if filter := r.store.cfg.TestingKnobs.TestingConcurrencyRetryFilter; filter != nil { + filter(ctx, *ba, pErr) + } switch t := pErr.GetDetail().(type) { case *roachpb.WriteIntentError: // Drop latches, but retain lock wait-queues. diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index d735ca362abb..326222c132dd 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -43,6 +43,10 @@ type StoreTestingKnobs struct { // be evaluated. TestingLatchFilter kvserverbase.ReplicaRequestFilter + // TestingConcurrencyRetryFilter is called before a concurrency retry error is + // handled and the batch is retried. + TestingConcurrencyRetryFilter kvserverbase.ReplicaConcurrencyRetryFilter + // TestingProposalFilter is called before proposing each command. TestingProposalFilter kvserverbase.ReplicaProposalFilter