diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index 1507480fcc91..002bd590c968 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -27,9 +27,6 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -54,6 +51,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" ) func adminMergeArgs(key roachpb.Key) *roachpb.AdminMergeRequest { @@ -1920,7 +1919,7 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) { } } - // Verify that the abandoned ranges on store2 can only GC'd from left to + // Verify that the abandoned ranges on store2 can only be GC'd from left to // right. if err := store2.ManualReplicaGC(repls[2]); err != nil { t.Fatal(err) @@ -1962,6 +1961,73 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) { } } +// TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected verifies +// that the replica GC queue will clean up an abandoned RHS replica whose +// destroyStatus is destroyReasonMergePending. The RHS replica ends up in this +// state when its merge watcher goroutine notices that the merge committed, and +// thus marks it as destroyed with reason destroyReasonMergePending, but the +// corresponding LHS is rebalanced off the store before it can apply the merge +// trigger. The replica GC queue would previously refuse to GC the abandoned +// RHS, as it interpreted destroyReasonMergePending to mean that the RHS replica +// had already been garbage collected. +func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testing.T) { + defer leaktest.AfterTest(t)() + + t.Skip("this test occasionally flakes if the RHS quiesces") + + ctx := context.Background() + storeCfg := storage.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + mtc := &multiTestContext{storeConfig: &storeCfg} + mtc.Start(t, 3) + defer mtc.Stop() + store0, store2 := mtc.Store(0), mtc.Store(2) + + mtc.replicateRange(roachpb.RangeID(1), 1, 2) + lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0) + if err != nil { + t.Fatal(err) + } + + // Make store2 the leaseholder for the RHS. + mtc.transferLease(ctx, rhsDesc.RangeID, 0, 2) + + // Start dropping all Raft traffic to the LHS replica on store2 so that it + // won't be aware that there is a merge in progress. + mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{ + rangeID: lhsDesc.RangeID, + RaftMessageHandler: store2, + }) + + // Perform the merge. The LHS replica on store2 whon't hear about this merge + // and thus won't subsume its RHS replica. The RHS replica's merge watcher + // goroutine will, however, notice the merge and mark the RHS replica as + // destroyed with reason destroyReasonMergePending. + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := client.SendWrapped(ctx, store0.TestSender(), args) + if pErr != nil { + t.Fatal(pErr) + } + + // Remove the merged range from store2. Its replicas of both the LHS and RHS + // are now eligible for GC. + mtc.unreplicateRange(lhsDesc.RangeID, 2) + + // Note that we purposely do not call store.ManualReplicaGC here, as that + // calls replicaGCQueue.process directly, bypassing the logic in + // baseQueue.MaybeAdd and baseQueue.Add. We specifically want to test that + // queuing logic, which has been broken in the past. + testutils.SucceedsSoon(t, func() error { + if _, err := store2.GetReplica(lhsDesc.RangeID); err == nil { + return errors.New("lhs not destroyed") + } + if _, err := store2.GetReplica(rhsDesc.RangeID); err == nil { + return errors.New("rhs not destroyed") + } + return nil + }) +} + func TestStoreRangeMergeDeadFollowerBeforeTxn(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index e7cf874a73e4..d88ed72be550 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -714,7 +714,7 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err } if reason, err := repl.IsDestroyed(); err != nil { - if !bq.queueConfig.processDestroyedReplicas || reason != destroyReasonRemovalPending { + if !bq.queueConfig.processDestroyedReplicas || reason == destroyReasonRemoved { if log.V(3) { log.Infof(queueCtx, "replica destroyed (%s); skipping", err) } diff --git a/pkg/storage/replica_gc_queue.go b/pkg/storage/replica_gc_queue.go index 9effb72612d9..5654ee3795e0 100644 --- a/pkg/storage/replica_gc_queue.go +++ b/pkg/storage/replica_gc_queue.go @@ -259,6 +259,11 @@ func (rgcq *replicaGCQueue) process( if leftReplyDesc := rs[0]; !leftDesc.Equal(leftReplyDesc) { log.VEventf(ctx, 1, "left neighbor %s not up-to-date with meta descriptor %s; cannot safely GC range yet", leftDesc, leftReplyDesc) + // Chances are that the left replica needs to be GC'd. Since we don't + // have definitive proof, queue it with a low priority. + if _, err := rgcq.Add(leftRepl, replicaGCPriorityDefault); err != nil { + log.Errorf(ctx, "unable to add %s to replica GC queue: %s", leftRepl, err) + } return nil } }