From 1afc52bc258e7a6aa0fe5a2de54557d2752e7c6c Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 16 Jun 2020 01:40:42 -0400 Subject: [PATCH] kvserver: prevent follower reads during merge critical state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this commit, during a merge, the RHS leaseholder’s store could continue broadcasting (actionable) closed timestamp updates even after it had been subsumed. This allowed the followers to be able to serve follower reads past the subsumption time of RHS. Additionally after the merge, if the LHS had a lower closed timestamp than the RHS, it could allow writes to the keyspan owned by RHS at timestamps lower than the RHS’s max closed timestamp. This commit fixes this bug by requiring that the followers catch up to a LeaseAppliedIndex that belongs to the entry succeeding the Subsume request. Fixes #44878 Release note (bug fix): Fixes a bug that allowed follower reads during the critical state of a merge transaction. kvserver: improve testing around range merges and closedts interaction Regression test for #44878 This commits adds a unit test to ensure that, during a range merge, closed timestamps that are emitted by the RHS leaseholder’s store cannot be activated by the follower replicas until the merge commits. Release note: None --- pkg/kv/kvserver/addressing_test.go | 6 +- pkg/kv/kvserver/batcheval/result/result.go | 5 + pkg/kv/kvserver/closed_timestamp_test.go | 244 ++++++++++++++++-- pkg/kv/kvserver/consistency_queue.go | 8 +- pkg/kv/kvserver/helpers_test.go | 8 +- pkg/kv/kvserver/log.go | 8 +- pkg/kv/kvserver/queue.go | 2 +- pkg/kv/kvserver/queue_concurrency_test.go | 2 +- pkg/kv/kvserver/queue_helpers_testutil.go | 2 +- pkg/kv/kvserver/queue_test.go | 8 +- pkg/kv/kvserver/raft_log_queue.go | 4 +- pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/replica.go | 32 ++- pkg/kv/kvserver/replica_application_result.go | 4 +- .../replica_application_state_machine.go | 4 +- pkg/kv/kvserver/replica_backpressure.go | 4 +- pkg/kv/kvserver/replica_closedts.go | 2 +- pkg/kv/kvserver/replica_command.go | 21 +- pkg/kv/kvserver/replica_consistency.go | 6 +- pkg/kv/kvserver/replica_follower_read.go | 8 +- pkg/kv/kvserver/replica_gc_queue.go | 4 +- pkg/kv/kvserver/replica_gossip.go | 4 +- pkg/kv/kvserver/replica_init.go | 14 +- pkg/kv/kvserver/replica_metrics.go | 2 +- pkg/kv/kvserver/replica_proposal.go | 6 +- pkg/kv/kvserver/replica_proposal_quota.go | 6 +- pkg/kv/kvserver/replica_raft.go | 12 +- pkg/kv/kvserver/replica_raftstorage.go | 2 +- pkg/kv/kvserver/replica_range_lease.go | 16 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/kv/kvserver/replica_rangefeed_test.go | 4 +- pkg/kv/kvserver/replica_read.go | 22 +- pkg/kv/kvserver/replica_send.go | 12 +- pkg/kv/kvserver/replica_sideload_test.go | 8 +- pkg/kv/kvserver/replica_split_load.go | 4 +- pkg/kv/kvserver/replica_test.go | 10 +- pkg/kv/kvserver/replica_write.go | 6 +- pkg/kv/kvserver/replicate_queue.go | 6 +- pkg/kv/kvserver/split_delay_helper.go | 6 +- pkg/kv/kvserver/store.go | 118 ++++----- pkg/kv/kvserver/store_raft.go | 18 +- pkg/kv/kvserver/store_rebalancer_test.go | 4 +- pkg/kv/kvserver/store_send.go | 18 +- pkg/kv/kvserver/store_snapshot.go | 8 +- pkg/kv/kvserver/store_test.go | 70 ++--- pkg/kv/kvserver/ts_maintenance_queue.go | 4 +- 46 files changed, 499 insertions(+), 271 deletions(-) diff --git a/pkg/kv/kvserver/addressing_test.go b/pkg/kv/kvserver/addressing_test.go index 9979a43784b3..fa3c27e56c34 100644 --- a/pkg/kv/kvserver/addressing_test.go +++ b/pkg/kv/kvserver/addressing_test.go @@ -137,14 +137,14 @@ func TestUpdateRangeAddressing(t *testing.T) { tcsf := kvcoord.NewTxnCoordSenderFactory( kvcoord.TxnCoordSenderFactoryConfig{ AmbientCtx: actx, - Settings: store.cfg.Settings, - Clock: store.cfg.Clock, + Settings: store.Cfg.Settings, + Clock: store.Cfg.Clock, Stopper: stopper, Metrics: kvcoord.MakeTxnMetrics(time.Second), }, store.TestSender(), ) - db := kv.NewDB(actx, tcsf, store.cfg.Clock) + db := kv.NewDB(actx, tcsf, store.Cfg.Clock) ctx := context.Background() txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) if err := txn.Run(ctx, b); err != nil { diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 0eb3b9f82226..d474f0d99c0a 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -14,6 +14,7 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -65,6 +66,10 @@ type LocalResult struct { MaybeGossipNodeLiveness *roachpb.Span // Call maybeWatchForMerge. MaybeWatchForMerge bool + // ReleaseFunc, when it's not the empty function, stores the callback returned + // by minprop.Tracker.Track(). It is used to record the LeaseAppliedIndex for + // a SubsumeRequest. + ReleaseFunc closedts.ReleaseFunc // Metrics contains counters which are to be passed to the // metrics subsystem. diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 8f5fe7ee9928..1edb6bbd2edc 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -25,11 +25,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -52,7 +54,7 @@ func TestClosedTimestampCanServe(t *testing.T) { } ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -112,7 +114,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { t.Skip("skipping under race") } ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -185,7 +187,7 @@ func TestClosedTimestampCanServeWithConflictingIntent(t *testing.T) { t.Skip("https://github.com/cockroachdb/cockroach/issues/50091") ctx := context.Background() - tc, _, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, _, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) @@ -266,7 +268,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { t.Skip("skipping under race") } ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { t.Fatal(err) @@ -347,7 +349,7 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, time.Hour) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, time.Hour) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -385,7 +387,7 @@ func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { } ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -419,14 +421,14 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { } ctx := context.Background() - tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db0, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { t.Fatal(err) } - // Verify that we can serve a follower read at a timestamp. Wait if necessary. + // Verify that we can serve a follower read at a timestamp. Wait if necessary ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} baRead := makeReadBatchRequestForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { @@ -448,9 +450,183 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { verifyNotLeaseHolderErrors(t, baQueryTxn, repls, 2) } +func TestClosedTimestampsNotActionableDuringMergeCriticalState(t *testing.T) { + defer leaktest.AfterTest(t)() + if util.RaceEnabled { + // Limiting how long transactions can run does not work well with race + // unless we're extremely lenient, which drives up the test duration. + t.Skip("skipping under race") + } + + t.Run("without lease transfer", func(t *testing.T) { + verifyClosedTSInactiveDuringMergeCriticalState(t) + }) +} + +func verifyClosedTSInactiveDuringMergeCriticalState(t *testing.T) { + ctx := context.Background() + mergeTriggerReached := make(chan struct{}) + blockMergeTrigger := make(chan struct{}) + knobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + for _, req := range ba.Requests { + if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + // We block the LHS leaseholder from applying the merge trigger and + // then wait until the RHS follower replicas have completely caught + // up to their leaseholder. + mergeTriggerReached <- struct{}{} + <-blockMergeTrigger + } + } + return nil + }, + }, + } + + tc, db0, desc, repls := setupClusterForClosedTSTestingWithCustomKnobs(ctx, t, testingTargetDuration, + knobs) + defer tc.Stopper().Stop(ctx) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil { + t.Fatal(err) + } + // Start by ensuring that the values can be read from all replicas at ts. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + baRead := makeReadBatchRequestForDesc(desc, ts) + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(2)) + }) + // Manually split the table to have easier access to descriptors. + tableID, err := getTableID(db0, "cttest", "kv") + if err != nil { + t.Fatalf("failed to lookup ids: %+v", err) + } + + idxPrefix := keys.SystemSQLCodec.IndexPrefix(uint32(tableID), 1) + k, err := sqlbase.EncodeTableKey(idxPrefix, tree.NewDInt(2), encoding.Ascending) + if err != nil { + t.Fatalf("failed to encode split key: %+v", err) + } + tcImpl := tc.(*testcluster.TestCluster) + leftDesc, rightDesc := tcImpl.SplitRangeOrFatal(t, k) + if err := tcImpl.WaitForFullReplication(); err != nil { + t.Fatal(err) + } + rRepls := replsForRange(ctx, t, tc, rightDesc) + g, _ := errgroup.WithContext(context.Background()) + defer func() { + // Unblock the leaseholder so it can finally commit the merge. + close(blockMergeTrigger) + if err := g.Wait(); err != nil { + t.Error(err) + } + }() + + // Merge the ranges back together. The LHS leaseholder should block right + // before the merge trigger request is sent. + g.Go(func() error { + if _, err := tc.Server(0).MergeRanges(leftDesc.StartKey.AsRawKey()); err != nil { + return err + } + return nil + }) + + // We now have the partially merged critical state, fully replicated. + <-mergeTriggerReached + target := getCurrentLeaseholder(t, tc, rightDesc) + store := getTargetStoreOrFatal(t, tc, target) + deadline := timeutil.Now().Add(testingTargetDuration * 2) + getCurrentMaxClosed := func() ctpb.Entry { + var maxClosed ctpb.Entry + attempts := 0 + for attempts == 0 || timeutil.Now().Before(deadline) { + attempts++ + store.Cfg.ClosedTimestamp.Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { + if _, ok := entry.MLAI[rightDesc.RangeID]; ok { + maxClosed = entry + return true + } + return false + }) + if _, ok := maxClosed.MLAI[rightDesc.RangeID]; !ok { + // We ran out of closed timestamps to visit without finding one that + // corresponds to rightDesc. It is likely that no closed timestamps have + // been broadcast for rightDesc yet, try again. + continue + } + return maxClosed + } + t.Fatal("Could not find a non-empty closed timestamp update for RHS range.") + return ctpb.Entry{} + } + + // We issue a read request with a timestamp that is one logical tick after + // the current max closed timestamp so we know that this timestamp cannot be + // "active" for follower reads until there is a future closed timestamp update + // that contains an MLAI for the RHS range. + postMergeClosedTS := getCurrentMaxClosed().ClosedTimestamp.Next() + baReadRHSAfterSubsume := makeReadBatchRequestForDesc(rightDesc, postMergeClosedTS) + + // Poll the store until we see a closed timestamp update for the RHS range. + for { + maxClosed := getCurrentMaxClosed() + if _, ok := maxClosed.MLAI[rightDesc.RangeID]; ok && + postMergeClosedTS.LessEq(maxClosed.ClosedTimestamp) { + break + } + } + rightReplFollowers := make([]*kvserver.Replica, 0, 2) + for _, repl := range rRepls { + if repl.StoreID() == target.StoreID && repl.NodeID() == target.NodeID { + continue + } + rightReplFollowers = append(rightReplFollowers, repl) + } + notLeaseholderErrs, err := countNotLeaseHolderErrors(baReadRHSAfterSubsume, rightReplFollowers) + if err != nil { + t.Fatal(err) + } + const expectedNLEs = 2 + if notLeaseholderErrs != expectedNLEs { + t.Fatalf("Expected %d NotLeaseHolderErrors, but got %d", expectedNLEs, notLeaseholderErrs) + } +} + +func getTargetStoreOrFatal( + t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (store *kvserver.Store) { + for i := 0; i < tc.NumServers(); i++ { + if server := tc.Server(i); server.NodeID() == target.NodeID && + server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { + store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) + if err != nil { + t.Fatal(err) + } + return store + } + } + t.Fatalf("Could not find store for replication target %+v\n", target) + return nil +} + func verifyNotLeaseHolderErrors( t *testing.T, ba roachpb.BatchRequest, repls []*kvserver.Replica, expectedNLEs int, ) { + notLeaseholderErrs, err := countNotLeaseHolderErrors(ba, repls) + if err != nil { + t.Fatal(err) + } + if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { + t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + } +} + +func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replica) (int64, error) { g, ctx := errgroup.WithContext(context.Background()) var notLeaseholderErrs int64 for i := range repls { @@ -467,11 +643,9 @@ func verifyNotLeaseHolderErrors( }) } if err := g.Wait(); err != nil { - t.Fatal(err) - } - if a, e := notLeaseholderErrs, int64(expectedNLEs); a != e { - t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + return 0, err } + return notLeaseholderErrs, nil } // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. @@ -524,30 +698,25 @@ func pickRandomTarget( } } -// This function creates a test cluster that is prepared to exercise follower -// reads. The returned test cluster has follower reads enabled using the above -// targetDuration and closeFraction. In addition to the newly minted test -// cluster, this function returns a db handle to node 0, a range descriptor for -// the range used by the table `cttest.kv` and the replica objects corresponding -// to the replicas for the range. It is the caller's responsibility to Stop the -// Stopper on the returned test cluster when done. -func setupTestClusterForClosedTimestampTesting( - ctx context.Context, t *testing.T, targetDuration time.Duration, +// setupClusterForClosedTsTestingWithCustomKnobs creates a test cluster that is +// prepared to exercise follower reads. The returned test cluster has follower +// reads enabled using the given targetDuration and above closeFraction. In +// addition to the newly minted test cluster, this function returns a db handle +// to node 0, a range descriptor for the range used by the table `cttest.kv` and +// the replica objects corresponding to the replicas for the range. It is the +// caller's responsibility to Stop the Stopper on the returned test cluster when +// done. +func setupClusterForClosedTSTestingWithCustomKnobs( + ctx context.Context, t *testing.T, targetDuration time.Duration, knobs base.TestingKnobs, ) ( tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor, repls []*kvserver.Replica, ) { - tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - RangeFeedPushTxnsInterval: 10 * time.Millisecond, - RangeFeedPushTxnsAge: 20 * time.Millisecond, - }, - }, + Knobs: knobs, }, }) db0 = tc.ServerConn(0) @@ -619,6 +788,25 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING); return tc, db0, desc, repls } +// setupClusterForClosedTsTesting is like setupClusterForClosedTsTestingWithCustomKnobs +// except the default testing knobs that are passed in. +func setupClusterForClosedTSTesting( + ctx context.Context, t *testing.T, targetDuration time.Duration, +) ( + tc serverutils.TestClusterInterface, + db0 *gosql.DB, + kvTableDesc roachpb.RangeDescriptor, + repls []*kvserver.Replica, +) { + defaultTestingKnobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + RangeFeedPushTxnsInterval: 10 * time.Millisecond, + RangeFeedPushTxnsAge: 20 * time.Millisecond, + }, + } + return setupClusterForClosedTSTestingWithCustomKnobs(ctx, t, targetDuration, defaultTestingKnobs) +} + type respFunc func(*roachpb.BatchResponse, *roachpb.Error) (shouldRetry bool, err error) // respFuncs returns a respFunc which is passes its arguments to each passed diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 40e0bb3e3328..94d68e93751c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -83,7 +83,7 @@ func (q *consistencyQueue) shouldQueue( } shouldQ, priority := true, float64(0) - if !repl.store.cfg.TestingKnobs.DisableLastProcessedCheck { + if !repl.store.Cfg.TestingKnobs.DisableLastProcessedCheck { lpTS, err := repl.getQueueLastProcessed(ctx, q.name) if err != nil { return false, 0 @@ -93,9 +93,9 @@ func (q *consistencyQueue) shouldQueue( } } // Check if all replicas are live. Some tests run without a NodeLiveness configured. - if repl.store.cfg.NodeLiveness != nil { + if repl.store.Cfg.NodeLiveness != nil { for _, rep := range repl.Desc().Replicas().All() { - if live, err := repl.store.cfg.NodeLiveness.IsLive(rep.NodeID); err != nil { + if live, err := repl.store.Cfg.NodeLiveness.IsLive(rep.NodeID); err != nil { log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err) return false, 0 } else if !live { @@ -149,7 +149,7 @@ func (q *consistencyQueue) process( } return nil } - if fn := repl.store.cfg.TestingKnobs.ConsistencyTestingKnobs.ConsistencyQueueResultHook; fn != nil { + if fn := repl.store.Cfg.TestingKnobs.ConsistencyTestingKnobs.ConsistencyQueueResultHook; fn != nil { fn(resp) } return nil diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e02fd98d1e16..d009a6dec320 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -46,7 +46,7 @@ import ( ) func (s *Store) Transport() *RaftTransport { - return s.cfg.Transport + return s.Cfg.Transport } func (s *Store) FindTargetAndTransferLease( @@ -192,13 +192,13 @@ func (s *Store) ReservationCount() int { // ClearClosedTimestampStorage clears the closed timestamp storage of all // knowledge about closed timestamps. func (s *Store) ClearClosedTimestampStorage() { - s.cfg.ClosedTimestamp.Storage.Clear() + s.Cfg.ClosedTimestamp.Storage.Clear() } // RequestClosedTimestamp instructs the closed timestamp client to request the // relevant node to publish its MLAI for the provided range. func (s *Store) RequestClosedTimestamp(nodeID roachpb.NodeID, rangeID roachpb.RangeID) { - s.cfg.ClosedTimestamp.Clients.Request(nodeID, rangeID) + s.Cfg.ClosedTimestamp.Clients.Request(nodeID, rangeID) } // AssertInvariants verifies that the store's bookkeping is self-consistent. It @@ -208,7 +208,7 @@ func (s *Store) AssertInvariants() { s.mu.RLock() defer s.mu.RUnlock() s.mu.replicas.Range(func(_ int64, p unsafe.Pointer) bool { - ctx := s.cfg.AmbientCtx.AnnotateCtx(context.Background()) + ctx := s.Cfg.AmbientCtx.AnnotateCtx(context.Background()) repl := (*Replica)(p) // We would normally need to hold repl.raftMu. Otherwise we can observe an // initialized replica that is not in s.replicasByKey, e.g., if we race with diff --git a/pkg/kv/kvserver/log.go b/pkg/kv/kvserver/log.go index a1d5d4f064cc..27505fe868ed 100644 --- a/pkg/kv/kvserver/log.go +++ b/pkg/kv/kvserver/log.go @@ -79,7 +79,7 @@ func (s *Store) insertRangeLogEvent( s.metrics.RangeRemoves.Inc(1) } - rows, err := s.cfg.SQLExecutor.ExecEx(ctx, "log-range-event", txn, + rows, err := s.Cfg.SQLExecutor.ExecEx(ctx, "log-range-event", txn, sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, insertEventTableStmt, args...) if err != nil { @@ -100,7 +100,7 @@ func (s *Store) insertRangeLogEvent( func (s *Store) logSplit( ctx context.Context, txn *kv.Txn, updatedDesc, newDesc roachpb.RangeDescriptor, ) error { - if !s.cfg.LogRangeEvents { + if !s.Cfg.LogRangeEvents { return nil } return s.insertRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{ @@ -124,7 +124,7 @@ func (s *Store) logSplit( func (s *Store) logMerge( ctx context.Context, txn *kv.Txn, updatedLHSDesc, rhsDesc roachpb.RangeDescriptor, ) error { - if !s.cfg.LogRangeEvents { + if !s.Cfg.LogRangeEvents { return nil } return s.insertRangeLogEvent(ctx, txn, kvserverpb.RangeLogEvent{ @@ -153,7 +153,7 @@ func (s *Store) logChange( reason kvserverpb.RangeLogEventReason, details string, ) error { - if !s.cfg.LogRangeEvents { + if !s.Cfg.LogRangeEvents { return nil } diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index d6d3331ce9ab..99f9cfa610dd 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -443,7 +443,7 @@ func newBaseQueue( cfg.addOrMaybeAddSemSize = 20 } - ambient := store.cfg.AmbientCtx + ambient := store.Cfg.AmbientCtx ambient.AddLogTag(name, nil) if !cfg.acceptsUnsplitRanges && !cfg.needsSystemConfig { diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index f1f421f0ec16..8dc6cd034904 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -68,7 +68,7 @@ func TestBaseQueueConcurrent(t *testing.T) { // we'd set up an interface against the *Store as well, similar to // replicaInQueue, but this isn't an ideal world. Deal with it. store := &Store{ - cfg: StoreConfig{ + Cfg: StoreConfig{ Clock: hlc.NewClock(hlc.UnixNano, time.Second), AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, DefaultZoneConfig: zonepb.DefaultZoneConfigRef(), diff --git a/pkg/kv/kvserver/queue_helpers_testutil.go b/pkg/kv/kvserver/queue_helpers_testutil.go index 31cf64f7154f..b28ee9e14357 100644 --- a/pkg/kv/kvserver/queue_helpers_testutil.go +++ b/pkg/kv/kvserver/queue_helpers_testutil.go @@ -35,7 +35,7 @@ func forceScanAndProcess(s *Store, q *baseQueue) error { } newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { - q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now()) + q.maybeAdd(context.Background(), repl, s.Cfg.Clock.Now()) return true }) diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 118db866c180..e202d4682d13 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -511,9 +511,9 @@ func TestNeedsSystemConfig(t *testing.T) { // Use a gossip instance that won't have the system config available in it. // bqNeedsSysCfg will not add the replica or process it without a system config. rpcContext := rpc.NewContext(rpc.ContextOptions{ - AmbientCtx: tc.store.cfg.AmbientCtx, + AmbientCtx: tc.store.Cfg.AmbientCtx, Config: &base.Config{Insecure: true}, - Clock: tc.store.cfg.Clock, + Clock: tc.store.Cfg.Clock, Stopper: stopper, Settings: cluster.MakeTestingClusterSettings(), }) @@ -620,13 +620,13 @@ func TestAcceptsUnsplitRanges(t *testing.T) { }, } - bq := makeTestBaseQueue("test", testQueue, s, s.cfg.Gossip, queueConfig{maxSize: 2}) + bq := makeTestBaseQueue("test", testQueue, s, s.Cfg.Gossip, queueConfig{maxSize: 2}) bq.Start(stopper) // Check our config. var sysCfg *config.SystemConfig testutils.SucceedsSoon(t, func() error { - sysCfg = s.cfg.Gossip.GetSystemConfig() + sysCfg = s.Cfg.Gossip.GetSystemConfig() if sysCfg == nil { return errors.New("system config not yet present") } diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index d735d71980ec..bb30efdb6d1e 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -167,7 +167,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err // RangeMaxBytes). This captures the heuristic that at some point, it's more // efficient to catch up via a snapshot than via applying a long tail of log // entries. - targetSize := r.store.cfg.RaftLogTruncationThreshold + targetSize := r.store.Cfg.RaftLogTruncationThreshold if targetSize > *r.mu.zone.RangeMaxBytes { targetSize = *r.mu.zone.RangeMaxBytes } @@ -206,7 +206,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err ctx, raftStatus.Progress, r.descRLocked().Replicas().All(), func(replicaID roachpb.ReplicaID) bool { return r.mu.lastUpdateTimes.isFollowerActiveSince( - ctx, replicaID, now, r.store.cfg.RangeLeaseActiveDuration()) + ctx, replicaID, now, r.store.Cfg.RangeLeaseActiveDuration()) }, ) log.Eventf(ctx, "raft status after lastUpdateTimes check: %+v", raftStatus.Progress) diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 2911dab29e89..790e3dc9ace8 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -114,7 +114,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( // the replicate queue. Either way, no point in also sending it a snapshot of // type RAFT. if repDesc.GetType() == roachpb.LEARNER { - if fn := repl.store.cfg.TestingKnobs.ReplicaSkipLearnerSnapshot; fn != nil && fn() { + if fn := repl.store.Cfg.TestingKnobs.ReplicaSkipLearnerSnapshot; fn != nil && fn() { return nil } snapType = SnapshotRequest_LEARNER diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index f37a677192a2..ae221a824f40 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" @@ -654,8 +655,8 @@ func (r *Replica) SetZoneConfig(zone *zonepb.ZoneConfig) { // larger than the default value. When the store starts up it sets the // zone for the replica to this default value; later on it overwrites it // with a new instance even if the value is the same as the default. - r.mu.zone != r.store.cfg.DefaultZoneConfig && - r.mu.zone != r.store.cfg.DefaultSystemZoneConfig { + r.mu.zone != r.store.Cfg.DefaultZoneConfig && + r.mu.zone != r.store.Cfg.DefaultSystemZoneConfig { r.mu.largestPreviousMaxRangeSizeBytes = *r.mu.zone.RangeMaxBytes } else if r.mu.largestPreviousMaxRangeSizeBytes > 0 && @@ -717,7 +718,7 @@ func (r *Replica) GetNodeLocality() roachpb.Locality { // ClusterSettings returns the node's ClusterSettings. func (r *Replica) ClusterSettings() *cluster.Settings { - return r.store.cfg.Settings + return r.store.Cfg.Settings } // StoreID returns the Replica's StoreID. @@ -727,7 +728,7 @@ func (r *Replica) StoreID() roachpb.StoreID { // EvalKnobs returns the EvalContext's Knobs. func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs { - return r.store.cfg.TestingKnobs.EvalKnobs + return r.store.Cfg.TestingKnobs.EvalKnobs } // Clock returns the hlc clock shared by this replica. @@ -1037,7 +1038,7 @@ func (r *Replica) State() kvserverpb.RangeInfo { allReplicas := desc.Replicas().All() for i := range allReplicas { replDesc := &allReplicas[i] - r.store.cfg.ClosedTimestamp.Storage.VisitDescending(replDesc.NodeID, func(e ctpb.Entry) (done bool) { + r.store.Cfg.ClosedTimestamp.Storage.VisitDescending(replDesc.NodeID, func(e ctpb.Entry) (done bool) { mlai, found := e.MLAI[r.RangeID] if !found { return false // not done @@ -1326,9 +1327,10 @@ func (ec *endCmds) done( // maybeWatchForMerge checks whether a merge of this replica into its left // neighbor is in its critical phase and, if so, arranges to block all requests // until the merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context) error { +func (r *Replica) maybeWatchForMerge(ctx context.Context, untrack closedts.ReleaseFunc) error { desc := r.Desc() descKey := keys.RangeDescriptorKey(desc.StartKey) + defer untrack(ctx, 0, 0, 0) // covers all error returns below _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), storage.MVCCGetOptions{Inconsistent: true}) if err != nil { @@ -1365,6 +1367,20 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { // range in case it managed to quiesce between when the Subsume request // arrived and now, which is rare but entirely legal. r.unquiesceLocked() + // We prevent replicas from being able to serve follower reads on timestamps + // that fall between the subsumption time (FreezeStart) and the timestamp at + // which the merge transaction commits or aborts (i.e. when a merge is in its + // critical state), by requiring follower replicas to catch up to the + // *succeeding* LeaseAppliedIndex. + // + // NB: The above statement relies on the invariant that the LAI that + // follows a Subsume request will be applied only after the merge aborts, as + // the RHS replicas will get destroyed if the merge successfully commits. This + // invariant is upheld because the only Raft proposals allowed after a range + // has been subsumed are lease requests, which do not bump the LAI. + // + // See https://github.com/cockroachdb/cockroach/issues/44878 for discussion. + untrack(ctx, ctpb.Epoch(r.mu.state.Lease.Epoch), r.RangeID, ctpb.LAI(r.mu.state.LeaseAppliedIndex+1)) r.mu.Unlock() taskCtx := r.AnnotateCtx(context.Background()) @@ -1605,14 +1621,14 @@ func EnableLeaseHistory(maxEntries int) func() { func (r *Replica) GetExternalStorage( ctx context.Context, dest roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { - return r.store.cfg.ExternalStorage(ctx, dest) + return r.store.Cfg.ExternalStorage(ctx, dest) } // GetExternalStorageFromURI returns an ExternalStorage object, based on the given URI. func (r *Replica) GetExternalStorageFromURI( ctx context.Context, uri string, ) (cloud.ExternalStorage, error) { - return r.store.cfg.ExternalStorageFromURI(ctx, uri) + return r.store.Cfg.ExternalStorageFromURI(ctx, uri) } func (r *Replica) markSystemConfigGossipSuccess() { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 95ad607c0e02..2c52f7117cf4 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -111,7 +111,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { } var pErr *roachpb.Error - if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { + if filter := r.store.Cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { var newPropRetry int newPropRetry, pErr = filter(kvserverbase.ApplyFilterArgs{ CmdID: cmd.idKey, @@ -203,7 +203,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( return nil } - minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) + minTS, untrack := r.store.Cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error paths below // NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp. if p.Request.Timestamp.Less(minTS) { diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 67a586123739..1f38fdede4a6 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -133,7 +133,7 @@ func (r *Replica) shouldApplyCommand( cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr( ctx, cmd.idKey, &cmd.raftCmd, cmd.IsLocal(), replicaState, ) - if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { + if filter := r.store.Cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { var newPropRetry int newPropRetry, cmd.forcedErr = filter(kvserverbase.ApplyFilterArgs{ CmdID: cmd.idKey, @@ -588,7 +588,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( if res.AddSSTable != nil { copied := addSSTablePreApply( ctx, - b.r.store.cfg.Settings, + b.r.store.Cfg.Settings, b.r.store.engine, b.r.raftMu.sideloaded, cmd.ent.Term, diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 7cd8e83f0c94..bb0b40f5299a 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -113,7 +113,7 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool { // larger than that by more than backpressureByteTolerance (see that comment for // further explanation). func (r *Replica) shouldBackpressureWrites() bool { - mult := backpressureRangeSizeMultiplier.Get(&r.store.cfg.Settings.SV) + mult := backpressureRangeSizeMultiplier.Get(&r.store.Cfg.Settings.SV) if mult == 0 { // Disabled. return false @@ -125,7 +125,7 @@ func (r *Replica) shouldBackpressureWrites() bool { if !exceeded { return false } - if bytesOver > backpressureByteTolerance.Get(&r.store.cfg.Settings.SV) { + if bytesOver > backpressureByteTolerance.Get(&r.store.Cfg.Settings.SV) { return false } return true diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index f34a8c63bb35..c47d09ec7ecd 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -33,7 +33,7 @@ func (r *Replica) EmitMLAI() { // of the current LAI to trigger a re-broadcast of this range's LAI. if isLeaseholder && epoch > 0 { ctx := r.AnnotateCtx(context.Background()) - _, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) + _, untrack := r.store.Cfg.ClosedTimestamp.Tracker.Track(ctx) untrack(ctx, ctpb.Epoch(epoch), r.RangeID, ctpb.LAI(lai)) } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 10aabe75219b..dff031621fb2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -227,7 +227,6 @@ func splitTxnAttempt( } b := txn.NewBatch() - // Write range descriptor for right hand side of the split. rightDescKey := keys.RangeDescriptorKey(rightDesc.StartKey) if err := updateRangeDescriptor(b, rightDescKey, nil, rightDesc); err != nil { @@ -611,7 +610,7 @@ func (r *Replica) AdminMerge( // replica of the RHS too early. The comment on // TestStoreRangeMergeUninitializedLHSFollower explains the situation in full. if err := waitForReplicasInit( - ctx, r.store.cfg.NodeDialer, origLeftDesc.RangeID, origLeftDesc.Replicas().All(), + ctx, r.store.Cfg.NodeDialer, origLeftDesc.RangeID, origLeftDesc.Replicas().All(), ); err != nil { return errors.Wrap(err, "waiting for all left-hand replicas to initialize") } @@ -734,7 +733,7 @@ func (r *Replica) AdminMerge( rhsSnapshotRes := br.(*roachpb.SubsumeResponse) err = waitForApplication( - ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, mergeReplicas, + ctx, r.store.Cfg.NodeDialer, rightDesc.RangeID, mergeReplicas, rhsSnapshotRes.LeaseAppliedIndex) if err != nil { return errors.Wrap(err, "waiting for all right-hand replicas to catch up") @@ -1033,7 +1032,7 @@ func (r *Replica) changeReplicasImpl( if _, err := maybeLeaveAtomicChangeReplicas(ctx, r.store, r.Desc()); err != nil { return nil, err } - if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() { + if fn := r.store.Cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() { return nil, err } // Don't leave a learner replica lying around if we didn't succeed in @@ -1274,7 +1273,7 @@ func (r *Replica) atomicReplicationChange( return nil, errors.Errorf("programming error: cannot promote replica of type %s", rDesc.Type) } - if fn := r.store.cfg.TestingKnobs.ReplicaSkipLearnerSnapshot; fn != nil && fn() { + if fn := r.store.Cfg.TestingKnobs.ReplicaSkipLearnerSnapshot; fn != nil && fn() { continue } @@ -1299,7 +1298,7 @@ func (r *Replica) atomicReplicationChange( } if adds := chgs.Additions(); len(adds) > 0 { - if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil && fn(adds) { + if fn := r.store.Cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil && fn(adds) { return desc, nil } } @@ -1319,7 +1318,7 @@ func (r *Replica) atomicReplicationChange( return nil, err } - if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterJointConfig; fn != nil && fn() { + if fn := r.store.Cfg.TestingKnobs.ReplicaAddStopAfterJointConfig; fn != nil && fn() { return desc, nil } @@ -1877,9 +1876,9 @@ func (r *Replica) sendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } - if err := r.store.cfg.Transport.SendSnapshot( + if err := r.store.Cfg.Transport.SendSnapshot( ctx, - &r.store.cfg.RaftConfig, + &r.store.Cfg.RaftConfig, r.store.allocator.storePool, req, snap, @@ -2106,7 +2105,7 @@ func (s *Store) AdminRelocateRange( // Done. return ctx.Err() } - if fn := s.cfg.TestingKnobs.BeforeRelocateOne; fn != nil { + if fn := s.Cfg.TestingKnobs.BeforeRelocateOne; fn != nil { fn(ops, leaseTarget, err) } @@ -2151,7 +2150,7 @@ func (s *Store) relocateOne( `range %s had non-voter replicas: %v`, desc, desc.Replicas()) } - sysCfg := s.cfg.Gossip.GetSystemConfig() + sysCfg := s.Cfg.Gossip.GetSystemConfig() if sysCfg == nil { return nil, nil, fmt.Errorf("no system config available, unable to perform RelocateRange") } diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 71e3f9075db9..774c15c574f0 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -149,7 +149,7 @@ func (r *Replica) CheckConsistency( curSnap := results[shaToIdxs[sha][0]].Response.Snapshot if sha != minoritySHA && minoritySnap != nil && curSnap != nil { diff := diffRange(curSnap, minoritySnap) - if report := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.BadChecksumReportDiff; report != nil { + if report := r.store.Cfg.TestingKnobs.ConsistencyTestingKnobs.BadChecksumReportDiff; report != nil { report(*r.store.Ident, diff) } _, _ = fmt.Fprintf(&buf, "====== diff(%x, [minority]) ======\n", sha) @@ -228,7 +228,7 @@ func (r *Replica) CheckConsistency( }); err != nil { log.Infof(ctx, "while retrieving cluster bootstrap version: %s", err) // Intentionally continue with the assumption that it's the current version. - v = r.store.cfg.Settings.Version.ActiveVersion(ctx).Version + v = r.store.Cfg.Settings.Version.ActiveVersion(ctx).Version } // For clusters that ever ran <19.1, we're not so sure that the stats are // consistent. Verify this only for clusters that started out on 19.1 or @@ -301,7 +301,7 @@ type ConsistencyCheckResult struct { func (r *Replica) collectChecksumFromReplica( ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, ) (CollectChecksumResponse, error) { - conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) + conn, err := r.store.Cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { return CollectChecksumResponse{}, errors.Wrapf(err, "could not dial node ID %d", replica.NodeID) diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 45e84b8cb8e6..60c738cd48db 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -44,7 +44,7 @@ func (r *Replica) canServeFollowerRead( lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && (!ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower (ba.Txn == nil || !ba.Txn.IsLocking()) && // followerreadsccl.txnCanPerformFollowerRead - FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) { + FollowerReadsEnabled.Get(&r.store.Cfg.Settings.SV) { // There's no known reason that a non-VOTER_FULL replica couldn't serve follower // reads (or RangeFeed), but as of the time of writing, these are expected @@ -70,7 +70,7 @@ func (r *Replica) canServeFollowerRead( if !canServeFollowerRead { // We can't actually serve the read based on the closed timestamp. // Signal the clients that we want an update so that future requests can succeed. - r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) + r.store.Cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) if false { // NB: this can't go behind V(x) because the log message created by the @@ -78,7 +78,7 @@ func (r *Replica) canServeFollowerRead( // using logspy. log.Warningf(ctx, "can't serve follower read for %s at epo %d, storage is %s", ba.Timestamp, lErr.Lease.Epoch, - r.store.cfg.ClosedTimestamp.Storage.(*ctstorage.MultiStorage).StringForNodes(lErr.LeaseHolder.NodeID), + r.store.Cfg.ClosedTimestamp.Storage.(*ctstorage.MultiStorage).StringForNodes(lErr.LeaseHolder.NodeID), ) } } @@ -118,7 +118,7 @@ func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) { if lease.Expiration != nil { return hlc.Timestamp{}, false } - maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( + maxClosed := r.store.Cfg.ClosedTimestamp.Provider.MaxClosed( lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai)) maxClosed.Forward(lease.Start) maxClosed.Forward(initialMaxClosed) diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index f645c1bd6d71..dd8f0728cd94 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -163,8 +163,8 @@ func (rgcq *replicaGCQueue) shouldQueue( // has probably already been removed from its raft group but doesn't know it. // Without this, node decommissioning can stall on such dormant ranges. // Make sure NodeLiveness isn't nil because it can be in tests/benchmarks. - if repl.store.cfg.NodeLiveness != nil { - if liveness, err := repl.store.cfg.NodeLiveness.Self(); err == nil && liveness.Decommissioning { + if repl.store.Cfg.NodeLiveness != nil { + if liveness, err := repl.store.Cfg.NodeLiveness.Self(); err == nil && liveness.Decommissioning { return true, replicaGCPriorityDefault } } diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index 77d4f3e636bd..c6f43fec5283 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -38,7 +38,7 @@ func (r *Replica) gossipFirstRange(ctx context.Context) { } if err := r.store.Gossip().AddInfo( gossip.KeySentinel, r.store.ClusterID().GetBytes(), - r.store.cfg.SentinelGossipTTL()); err != nil { + r.store.Cfg.SentinelGossipTTL()); err != nil { log.Errorf(ctx, "failed to gossip sentinel: %+v", err) } if log.V(1) { @@ -284,7 +284,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error { log.Errorf(ctx, "failed to gossip cluster ID: %+v", err) } - if r.store.cfg.TestingKnobs.DisablePeriodicGossips { + if r.store.Cfg.TestingKnobs.DisablePeriodicGossips { return nil } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 4baf883fcfba..d80f2f232e11 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -65,7 +65,7 @@ func newUnloadedReplica( log.Fatalf(context.TODO(), "cannot construct a replica for range %d with a 0 replica ID", desc.RangeID) } r := &Replica{ - AmbientContext: store.cfg.AmbientCtx, + AmbientContext: store.Cfg.AmbientCtx, RangeID: desc.RangeID, store: store, abortSpan: abortspan.New(desc.RangeID), @@ -86,10 +86,10 @@ func newUnloadedReplica( r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true - r.mu.zone = store.cfg.DefaultZoneConfig + r.mu.zone = store.Cfg.DefaultZoneConfig r.mu.replicaID = replicaID split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { - return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) + return float64(SplitByLoadQPSThreshold.Get(&store.Cfg.Settings.SV)) }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} @@ -98,8 +98,8 @@ func newUnloadedReplica( if leaseHistoryMaxEntries > 0 { r.leaseHistory = newLeaseHistory() } - if store.cfg.StorePool != nil { - r.leaseholderStats = newReplicaStats(store.Clock(), store.cfg.StorePool.getNodeLocalityString) + if store.Cfg.StorePool != nil { + r.leaseholderStats = newReplicaStats(store.Clock(), store.Cfg.StorePool.getNodeLocalityString) } // Pass nil for the localityOracle because we intentionally don't track the // origin locality of write load. @@ -174,7 +174,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) // any). This is so that, after a restart, we don't propose under old leases. // If the replica is being created through a split, this value will be // overridden. - if !r.store.cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart { + if !r.store.Cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart { // Only do this if there was a previous lease. This shouldn't be important // to do but consider that the first lease which is obtained is back-dated // to a zero start timestamp (and this de-flakes some tests). If we set the @@ -190,7 +190,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) ssBase := r.Engine().GetAuxiliaryDir() if r.raftMu.sideloaded, err = newDiskSideloadStorage( - r.store.cfg.Settings, + r.store.Cfg.Settings, desc.RangeID, replicaID, ssBase, diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 21cb2f33a81e..68ccbdbb5d6c 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -70,7 +70,7 @@ func (r *Replica) Metrics( return calcReplicaMetrics( ctx, now, - &r.store.cfg.RaftConfig, + &r.store.Cfg.RaftConfig, zone, livenessMap, clusterNodes, diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 105c329e09a8..5347b998f9a8 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + ctpb2 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -283,7 +284,7 @@ A file preventing this node from restarting was placed at: log.Warningf(ctx, "%v", err) } - if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { + if p := r.store.Cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { p(*r.store.Ident) } else { time.Sleep(10 * time.Second) @@ -338,7 +339,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - if err := r.maybeWatchForMerge(ctx); err != nil { + noopReleaseFunc := func(_ context.Context, _ ctpb2.Epoch, _ roachpb.RangeID, _ ctpb2.LAI) {} + if err := r.maybeWatchForMerge(ctx, noopReleaseFunc); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index 7e60ec80836d..c61e38a074c4 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -101,7 +101,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // through the code paths where we acquire quota from the pool. To // offset this we reset the quota pool whenever leadership changes // hands. - r.mu.proposalQuota = quotapool.NewIntPool(r.rangeStr.String(), uint64(r.store.cfg.RaftProposalQuota)) + r.mu.proposalQuota = quotapool.NewIntPool(r.rangeStr.String(), uint64(r.store.Cfg.RaftProposalQuota)) r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time) r.mu.lastUpdateTimes.updateOnBecomeLeader(r.mu.state.Desc.Replicas().All(), timeutil.Now()) } else if r.mu.proposalQuota != nil { @@ -150,13 +150,13 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // will stall), whereas for quiescing the downside is lower. if !r.mu.lastUpdateTimes.isFollowerActiveSince( - ctx, rep.ReplicaID, now, r.store.cfg.RangeLeaseActiveDuration(), + ctx, rep.ReplicaID, now, r.store.Cfg.RangeLeaseActiveDuration(), ) { return } // Only consider followers that that have "healthy" RPC connections. - if err := r.store.cfg.NodeDialer.ConnHealth(rep.NodeID, r.connectionClass.get()); err != nil { + if err := r.store.Cfg.NodeDialer.ConnHealth(rep.NodeID, r.connectionClass.get()); err != nil { return } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 219ffdd49373..290021e2f628 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -152,7 +152,7 @@ func (r *Replica) evalAndPropose( // commands can evaluate but then be blocked on quota, which has worse memory // behavior. quotaSize := uint64(proposal.command.Size()) - if maxSize := uint64(MaxCommandSize.Get(&r.store.cfg.Settings.SV)); quotaSize > maxSize { + if maxSize := uint64(MaxCommandSize.Get(&r.store.Cfg.Settings.SV)); quotaSize > maxSize { return nil, nil, 0, roachpb.NewError(errors.Errorf( "command is too large: %d bytes (max: %d)", quotaSize, maxSize, )) @@ -702,7 +702,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // were not persisted to disk, it wouldn't be a problem because raft does not // infer the that entries are persisted on the node that sends a snapshot. commitStart := timeutil.Now() - if err := batch.Commit(rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil { + if err := batch.Commit(rd.MustSync && !disableSyncRaftLog.Get(&r.store.Cfg.Settings.SV)); err != nil { const expl = "while committing batch" return stats, expl, errors.Wrap(err, expl) } @@ -918,7 +918,7 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) { r.mu.ticks++ r.mu.internalRaftGroup.Tick() - refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks + refreshAtDelta := r.store.Cfg.RaftElectionTimeoutTicks if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 { refreshAtDelta = knob } @@ -1214,10 +1214,10 @@ func (r *Replica) sendRaftMessageRequest(ctx context.Context, req *RaftMessageRe if log.V(4) { log.Infof(ctx, "sending raft request %+v", req) } - ok := r.store.cfg.Transport.SendAsync(req, r.connectionClass.get()) + ok := r.store.Cfg.Transport.SendAsync(req, r.connectionClass.get()) // TODO(peter): Looping over all of the outgoing Raft message queues to // update this stat on every send is a bit expensive. - r.store.metrics.RaftEnqueuedPending.Update(r.store.cfg.Transport.queuedMessageCount()) + r.store.metrics.RaftEnqueuedPending.Update(r.store.Cfg.Transport.queuedMessageCount()) return ok } @@ -1372,7 +1372,7 @@ func (r *Replica) withRaftGroupLocked( raft.Storage((*replicaRaftStorage)(r)), uint64(r.mu.replicaID), r.mu.state.RaftAppliedIndex, - r.store.cfg, + r.store.Cfg, &raftLogger{ctx: ctx}, )) if err != nil { diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 324fe2c7f5e2..f9e7ed4467b0 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -915,7 +915,7 @@ func (r *Replica) applySnapshot( stats.subsumedReplicas = timeutil.Now() // Ingest all SSTs atomically. - if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if fn := r.store.Cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { if err := fn(inSnap, snapType, inSnap.SSTStorageScratch.SSTs()); err != nil { return err } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 1e23b690efd3..1d814f9fdd91 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -221,10 +221,10 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( if p.repl.requiresExpiringLeaseRLocked() { reqLease.Expiration = &hlc.Timestamp{} - *reqLease.Expiration = status.Timestamp.Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0) + *reqLease.Expiration = status.Timestamp.Add(int64(p.repl.store.Cfg.RangeLeaseActiveDuration()), 0) } else { // Get the liveness for the next lease holder and set the epoch in the lease request. - liveness, err := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) + liveness, err := p.repl.store.Cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) if err != nil { llHandle.resolve(roachpb.NewError(&roachpb.LeaseRejectedError{ Existing: status.Lease, @@ -335,7 +335,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // If this replica is previous & next lease holder, manually heartbeat to become live. if status.Lease.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID { - if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil { + if err = p.repl.store.Cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil { log.Errorf(ctx, "%v", err) } } else if status.Liveness.Epoch == status.Lease.Epoch { @@ -343,13 +343,13 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // However, we only do so in the event that the next leaseholder is // considered live at this time. If not, there's no sense in // incrementing the expired leaseholder's epoch. - if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil { + if live, liveErr := p.repl.store.Cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil { err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = %v)", status.Liveness.NodeID, nextLeaseHolder.NodeID, liveErr) if log.V(1) { log.Infof(ctx, "%v", err) } - } else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil { + } else if err = p.repl.store.Cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil { // If we get ErrEpochAlreadyIncremented, someone else beat // us to it. This proves that the target node is truly // dead *now*, but it doesn't prove that it was dead at @@ -544,7 +544,7 @@ func (r *Replica) leaseStatus( expiration = lease.GetExpiration() } else { var err error - status.Liveness, err = r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) + status.Liveness, err = r.store.Cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) if err != nil || status.Liveness.Epoch < lease.Epoch { // If lease validity can't be determined (e.g. gossip is down // and liveness info isn't available for owner), we can neither @@ -586,7 +586,7 @@ func (r *Replica) leaseStatus( // including the node liveness table must use expiration leases to avoid // circular dependencies on the node liveness table. func (r *Replica) requiresExpiringLeaseRLocked() bool { - return r.store.cfg.NodeLiveness == nil || !r.store.cfg.EnableEpochRangeLeases || + return r.store.Cfg.NodeLiveness == nil || !r.store.Cfg.EnableEpochRangeLeases || r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax)) } @@ -943,7 +943,7 @@ func (r *Replica) redirectOnOrAcquireLease( // already an extension pending. _, requestPending := r.mu.pendingLeaseRequest.RequestPending() if !requestPending && r.requiresExpiringLeaseRLocked() { - renewal := status.Lease.Expiration.Add(-r.store.cfg.RangeLeaseRenewalDuration().Nanoseconds(), 0) + renewal := status.Lease.Expiration.Add(-r.store.Cfg.RangeLeaseRenewalDuration().Nanoseconds(), 0) if renewal.LessEq(timestamp) { if log.V(2) { log.Infof(ctx, "extending lease %s at %s", status.Lease, timestamp) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index a5c212512b25..013dca4ab5ea 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -130,7 +130,7 @@ func (i iteratorWithCloser) Close() { func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { - if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.Cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } @@ -600,7 +600,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(ctx context.Context) { // If the closed timestamp is sufficiently stale, signal that we want an // update to the leaseholder so that it will eventually begin to progress // again. - slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.cfg.Settings.SV) + slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.Cfg.Settings.SV) if d := timeutil.Since(closedTS.GoTime()); d > slowClosedTSThresh { m := r.store.metrics.RangeFeedMetrics if m.RangeFeedSlowClosedTimestampLogN.ShouldLog() { @@ -680,6 +680,6 @@ func (r *Replica) ensureClosedTimestampStarted(ctx context.Context) *roachpb.Err // Request fixes any issues where we've missed a closed timestamp update or // where we're not connected to receive them from this node in the first // place. - r.store.cfg.ClosedTimestamp.Clients.Request(leaseholderNodeID, r.RangeID) + r.store.Cfg.ClosedTimestamp.Clients.Request(leaseholderNodeID, r.RangeID) return nil } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 5c159fc0707f..cc70d43deff5 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -720,7 +720,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc, db, _, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, _, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) @@ -831,7 +831,7 @@ func TestReplicaRangefeedNudgeSlowClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc, db, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t, testingTargetDuration) + tc, db, desc, repls := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration) defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 6d04e0d884ac..29c2ff6db0bd 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -15,6 +15,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -122,12 +124,12 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") + untrack := r.maybeTrackProposal(ctx, ba) for retries := 0; ; retries++ { if retries > 0 { log.VEventf(ctx, 2, "server-side retry of batch") } br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, true /* readOnly */) - // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) { @@ -142,11 +144,27 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( EncounteredIntents: res.Local.DetachEncounteredIntents(), Metrics: res.Local.Metrics, } + untrack(ctx, 0, 0, 0) return nil, res, pErr } + res.Local.ReleaseFunc = untrack return br, res, nil } +func (r *Replica) maybeTrackProposal( + ctx context.Context, ba *roachpb.BatchRequest, +) closedts.ReleaseFunc { + if ba.IsSingleSubsumeRequest() { + // We start tracking SubsumeRequests as part of our guarantee to never + // broadcast a closed timestamp entry between when we evaluate the Subsume + // request and the time that we run maybeWatchForMerge for the first time. + // See comment in maybeWatchForMerge() for details. + _, untrack := r.store.Cfg.ClosedTimestamp.Tracker.Track(ctx) + return untrack + } + return func(_ context.Context, _ ctpb.Epoch, _ roachpb.RangeID, _ ctpb.LAI) {} +} + func (r *Replica) handleReadOnlyLocalEvalResult( ctx context.Context, ba *roachpb.BatchRequest, lResult result.LocalResult, ) *roachpb.Error { @@ -169,7 +187,7 @@ func (r *Replica) handleReadOnlyLocalEvalResult( // A merge is (likely) about to be carried out, and this replica needs // to block all traffic until the merge either commits or aborts. See // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx); err != nil { + if err := r.maybeWatchForMerge(ctx, lResult.ReleaseFunc); err != nil { return roachpb.NewError(err) } lResult.MaybeWatchForMerge = false diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 0360a6f3456f..404d2bb6e499 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -80,7 +80,7 @@ func (r *Replica) sendWithRangeID( return nil, roachpb.NewError(err) } - if filter := r.store.cfg.TestingKnobs.TestingRequestFilter; filter != nil { + if filter := r.store.Cfg.TestingKnobs.TestingRequestFilter; filter != nil { if pErr := filter(ctx, *ba); pErr != nil { return nil, pErr } @@ -110,7 +110,7 @@ func (r *Replica) sendWithRangeID( if pErr != nil { log.Eventf(ctx, "replica.Send got error: %s", pErr) } else { - if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil { + if filter := r.store.Cfg.TestingKnobs.TestingResponseFilter; filter != nil { pErr = filter(ctx, *ba, br) } } @@ -238,7 +238,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return br, nil } - if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { + if filter := r.store.Cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(ctx, *ba); pErr != nil { return nil, pErr } @@ -334,7 +334,7 @@ func (r *Replica) handleWriteIntentError( pErr *roachpb.Error, t *roachpb.WriteIntentError, ) (*concurrency.Guard, *roachpb.Error) { - if r.store.cfg.TestingKnobs.DontPushOnWriteIntentError { + if r.store.Cfg.TestingKnobs.DontPushOnWriteIntentError { return g, pErr } // g's latches will be dropped, but it retains its spot in lock wait-queues. @@ -352,7 +352,7 @@ func (r *Replica) handleTransactionPushError( // into the txnWaitQueue in order to await further updates to the unpushed // txn's status. We check ShouldPushImmediately to avoid retrying // non-queueable PushTxnRequests (see #18191). - dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures + dontRetry := r.store.Cfg.TestingKnobs.DontRetryPushTxnFailures if !dontRetry && ba.IsSinglePushTxnRequest() { pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) dontRetry = txnwait.ShouldPushImmediately(pushReq) @@ -371,7 +371,7 @@ func (r *Replica) handleIndeterminateCommitError( pErr *roachpb.Error, t *roachpb.IndeterminateCommitError, ) *roachpb.Error { - if r.store.cfg.TestingKnobs.DontRecoverIndeterminateCommits { + if r.store.Cfg.TestingKnobs.DontRecoverIndeterminateCommits { return pErr } // On an indeterminate commit error, attempt to recover and finalize the diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index ec09e840aced..e3185979bbe2 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -819,8 +819,8 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} if err := sendSnapshot( ctx, - &tc.store.cfg.RaftConfig, - tc.store.cfg.Settings, + &tc.store.Cfg.RaftConfig, + tc.store.Cfg.Settings, mockSender, &fakeStorePool{}, SnapshotRequest_Header{State: os.State, Priority: SnapshotRequest_RECOVERY}, @@ -941,8 +941,8 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} err = sendSnapshot( ctx, - &tc.store.cfg.RaftConfig, - tc.store.cfg.Settings, + &tc.store.Cfg.RaftConfig, + tc.store.Cfg.Settings, mockSender, &fakeStorePool{}, SnapshotRequest_Header{State: failingOS.State, Priority: SnapshotRequest_RECOVERY}, diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index a0b1ef0b017d..174c2e1236e7 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -43,14 +43,14 @@ var SplitByLoadMergeDelay = settings.RegisterNonNegativeDurationSetting( // SplitByLoadQPSThreshold returns the QPS request rate for a given replica. func (r *Replica) SplitByLoadQPSThreshold() float64 { - return float64(SplitByLoadQPSThreshold.Get(&r.store.cfg.Settings.SV)) + return float64(SplitByLoadQPSThreshold.Get(&r.store.Cfg.Settings.SV)) } // SplitByLoadEnabled returns whether load based splitting is enabled. // Although this is a method of *Replica, the configuration is really global, // shared across all stores. func (r *Replica) SplitByLoadEnabled() bool { - return SplitByLoadEnabled.Get(&r.store.cfg.Settings.SV) && + return SplitByLoadEnabled.Get(&r.store.Cfg.Settings.SV) && !r.store.TestingKnobs().DisableLoadBasedSplitting } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index e41215396540..f5f0bf8a7335 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -190,7 +190,7 @@ type testContext struct { } func (tc *testContext) Clock() *hlc.Clock { - return tc.store.cfg.Clock + return tc.store.Cfg.Clock } // Start initializes the test context with a single range covering the @@ -985,7 +985,7 @@ func TestReplicaLease(t *testing.T) { // Test that leases with invalid times are rejected. // Start leases at a point that avoids overlapping with the existing lease. - leaseDuration := tc.store.cfg.RangeLeaseActiveDuration() + leaseDuration := tc.store.Cfg.RangeLeaseActiveDuration() start := hlc.Timestamp{WallTime: (time.Second + leaseDuration).Nanoseconds(), Logical: 0} for _, lease := range []roachpb.Lease{ {Start: start, Expiration: &hlc.Timestamp{}}, @@ -7792,7 +7792,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } r := tc.repl - electionTicks := tc.store.cfg.RaftElectionTimeoutTicks + electionTicks := tc.store.Cfg.RaftElectionTimeoutTicks { // The verifications of the reproposal counts below rely on r.mu.ticks // starting with a value of 0 (modulo electionTicks). Move the replica into @@ -8960,7 +8960,7 @@ func TestCommandTooLarge(t *testing.T) { defer stopper.Stop(context.Background()) tc.Start(t, stopper) - st := tc.store.cfg.Settings + st := tc.store.Cfg.Settings st.Manual.Store(true) MaxCommandSize.Override(&st.SV, 1024) @@ -10198,7 +10198,7 @@ func TestReplicaNotifyLockTableOn1PC(t *testing.T) { tc.Start(t, stopper) // Disable txn liveness pushes. See below for why. - st := tc.store.cfg.Settings + st := tc.store.Cfg.Settings st.Manual.Store(true) concurrency.LockTableLivenessPushDelay.Override(&st.SV, 24*time.Hour) diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 534f0b5f4418..4e4588ac3dbd 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -82,7 +82,7 @@ func (r *Replica) executeWriteBatch( return nil, g, roachpb.NewError(err) } - minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) + minTS, untrack := r.store.Cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error returns below // Examine the timestamp cache for preceding commands which require this @@ -206,7 +206,7 @@ func (r *Replica) executeWriteBatch( r.store.metrics.SlowRaftRequests.Inc(1) log.Errorf(ctx, "range unavailable: %v", - rangeUnavailableMessage(r.Desc(), r.store.cfg.NodeLiveness.GetIsLiveMap(), + rangeUnavailableMessage(r.Desc(), r.store.Cfg.NodeLiveness.GetIsLiveMap(), r.RaftStatus(), ba, timeutil.Since(startPropTime))) case <-ctxDone: // If our context was canceled, return an AmbiguousResultError, @@ -572,7 +572,7 @@ func (r *Replica) evaluateWriteBatchWrapper( func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *storage.OpLoggerBatch) { batch := r.store.Engine().NewBatch() var opLogger *storage.OpLoggerBatch - if r.isSystemRange() || RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + if r.isSystemRange() || RangefeedEnabled.Get(&r.store.Cfg.Settings.SV) { // TODO(nvanbenschoten): once we get rid of the RangefeedEnabled // cluster setting we'll need a way to turn this on when any // replica (not just the leaseholder) wants it and off when no diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index a2b48678b866..f23341005440 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -199,7 +199,7 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep updateFn() }) } - if nl := store.cfg.NodeLiveness; nl != nil { // node liveness is nil for some unittests + if nl := store.Cfg.NodeLiveness; nl != nil { // node liveness is nil for some unittests nl.RegisterCallback(func(_ roachpb.NodeID) { updateFn() }) @@ -443,7 +443,7 @@ func (rq *replicateQueue) addOrReplace( // a replica. removeIdx = -1 } - st := rq.store.cfg.Settings + st := rq.store.Cfg.Settings if !st.Version.IsActive(ctx, clusterversion.VersionAtomicChangeReplicas) { // If we can't swap yet, don't. removeIdx = -1 @@ -1009,7 +1009,7 @@ func (rq *replicateQueue) changeReplicas( func (rq *replicateQueue) canTransferLease() bool { if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil { - minInterval := minLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV) + minInterval := minLeaseTransferInterval.Get(&rq.store.Cfg.Settings.SV) return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval } return true diff --git a/pkg/kv/kvserver/split_delay_helper.go b/pkg/kv/kvserver/split_delay_helper.go index 8be806a68335..4450128b2bcd 100644 --- a/pkg/kv/kvserver/split_delay_helper.go +++ b/pkg/kv/kvserver/split_delay_helper.go @@ -39,7 +39,7 @@ func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, * ctx, raftStatus.Progress, r.descRLocked().Replicas().All(), func(replicaID roachpb.ReplicaID) bool { return r.mu.lastUpdateTimes.isFollowerActiveSince( - ctx, replicaID, timeutil.Now(), r.store.cfg.RangeLeaseActiveDuration()) + ctx, replicaID, timeutil.Now(), r.store.Cfg.RangeLeaseActiveDuration()) }, ) } @@ -75,7 +75,7 @@ func (sdh *splitDelayHelper) NumAttempts() int { // delay introduced here needs to make sure that the snapshot queue // processes at a higher rate than splits happen, so the number of attempts // will typically be much higher than what's suggested by maybeDropMsgApp. - return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks + return (*Replica)(sdh).store.Cfg.RaftDelaySplitToSuppressSnapshotTicks } func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration { @@ -83,7 +83,7 @@ func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration { r := (*Replica)(sdh) select { - case <-time.After(r.store.cfg.RaftTickInterval): + case <-time.After(r.store.Cfg.RaftTickInterval): case <-ctx.Done(): } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3d52101df69a..a3216b730441 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -383,7 +383,7 @@ func (rs *storeReplicaVisitor) EstimatedCount() int { // to one physical device. type Store struct { Ident *roachpb.StoreIdent // pointer to catch access before Start() is called - cfg StoreConfig + Cfg StoreConfig db *kv.DB engine storage.Engine // The underlying key-value store compactor *compactor.Compactor // Schedules compaction of the engine @@ -790,7 +790,7 @@ func NewStore( log.Fatalf(ctx, "invalid store configuration: %+v", &cfg) } s := &Store{ - cfg: cfg, + Cfg: cfg, db: cfg.DB, // TODO(tschottdorf): remove redundancy. engine: eng, nodeDesc: nodeDesc, @@ -837,7 +837,7 @@ func NewStore( s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) s.compactor = compactor.NewCompactor( - s.cfg.Settings, + s.Cfg.Settings, s.engine, func() (roachpb.StoreCapacity, error) { return s.Capacity(false /* useCached */) @@ -903,20 +903,20 @@ func NewStore( int(concurrentRangefeedItersLimit.Get(&cfg.Settings.SV))) }) - if s.cfg.Gossip != nil { + if s.Cfg.Gossip != nil { // Add range scanner and configure with queues. s.scanner = newReplicaScanner( - s.cfg.AmbientCtx, s.cfg.Clock, cfg.ScanInterval, + s.Cfg.AmbientCtx, s.Cfg.Clock, cfg.ScanInterval, cfg.ScanMinIdleTime, cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s), ) - s.gcQueue = newGCQueue(s, s.cfg.Gossip) - s.mergeQueue = newMergeQueue(s, s.db, s.cfg.Gossip) - s.splitQueue = newSplitQueue(s, s.db, s.cfg.Gossip) - s.replicateQueue = newReplicateQueue(s, s.cfg.Gossip, s.allocator) - s.replicaGCQueue = newReplicaGCQueue(s, s.db, s.cfg.Gossip) - s.raftLogQueue = newRaftLogQueue(s, s.db, s.cfg.Gossip) - s.raftSnapshotQueue = newRaftSnapshotQueue(s, s.cfg.Gossip) - s.consistencyQueue = newConsistencyQueue(s, s.cfg.Gossip) + s.gcQueue = newGCQueue(s, s.Cfg.Gossip) + s.mergeQueue = newMergeQueue(s, s.db, s.Cfg.Gossip) + s.splitQueue = newSplitQueue(s, s.db, s.Cfg.Gossip) + s.replicateQueue = newReplicateQueue(s, s.Cfg.Gossip, s.allocator) + s.replicaGCQueue = newReplicaGCQueue(s, s.db, s.Cfg.Gossip) + s.raftLogQueue = newRaftLogQueue(s, s.db, s.Cfg.Gossip) + s.raftSnapshotQueue = newRaftSnapshotQueue(s, s.Cfg.Gossip) + s.consistencyQueue = newConsistencyQueue(s, s.Cfg.Gossip) // NOTE: If more queue types are added, please also add them to the list of // queues on the EnqueueRange debug page as defined in // pkg/ui/src/views/reports/containers/enqueueRange/index.tsx @@ -924,9 +924,9 @@ func NewStore( s.gcQueue, s.mergeQueue, s.splitQueue, s.replicateQueue, s.replicaGCQueue, s.raftLogQueue, s.raftSnapshotQueue, s.consistencyQueue) - if s.cfg.TimeSeriesDataStore != nil { + if s.Cfg.TimeSeriesDataStore != nil { s.tsMaintenanceQueue = newTimeSeriesMaintenanceQueue( - s, s.db, s.cfg.Gossip, s.cfg.TimeSeriesDataStore, + s, s.db, s.Cfg.Gossip, s.Cfg.TimeSeriesDataStore, ) s.scanner.AddQueues(s.tsMaintenanceQueue) } @@ -973,12 +973,12 @@ func (s *Store) String() string { // ClusterSettings returns the node's ClusterSettings. func (s *Store) ClusterSettings() *cluster.Settings { - return s.cfg.Settings + return s.Cfg.Settings } // AnnotateCtx is a convenience wrapper; see AmbientContext. func (s *Store) AnnotateCtx(ctx context.Context) context.Context { - return s.cfg.AmbientCtx.AnnotateCtx(ctx) + return s.Cfg.AmbientCtx.AnnotateCtx(ctx) } // SetDraining (when called with 'true') causes incoming lease transfers to be @@ -1180,7 +1180,7 @@ func (s *Store) SetDraining(drain bool, reporter func(int, string)) { // We've seen all the replicas once. Now we're going to iterate // until they're all gone, up to the configured timeout. - transferTimeout := raftLeadershipTransferWait.Get(&s.cfg.Settings.SV) + transferTimeout := raftLeadershipTransferWait.Get(&s.Cfg.Settings.SV) if err := contextutil.RunWithTimeout(ctx, "wait for raft leadership transfer", transferTimeout, func(ctx context.Context) error { @@ -1371,7 +1371,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.Ident = &ident // Set the store ID for logging. - s.cfg.AmbientCtx.AddLogTag("s", s.StoreID()) + s.Cfg.AmbientCtx.AddLogTag("s", s.StoreID()) ctx = s.AnnotateCtx(ctx) log.Event(ctx, "read store identity") @@ -1389,13 +1389,13 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { return errors.Errorf("node id:%d does not equal the one in node descriptor:%d", s.Ident.NodeID, s.nodeDesc.NodeID) } // Always set gossip NodeID before gossiping any info. - if s.cfg.Gossip != nil { - s.cfg.Gossip.NodeID.Set(ctx, s.Ident.NodeID) + if s.Cfg.Gossip != nil { + s.Cfg.Gossip.NodeID.Set(ctx, s.Ident.NodeID) } // Create ID allocators. idAlloc, err := idalloc.NewAllocator(idalloc.Options{ - AmbientCtx: s.cfg.AmbientCtx, + AmbientCtx: s.Cfg.AmbientCtx, Key: keys.RangeIDGenerator, Incrementer: idalloc.DBIncrementer(s.db), BlockSize: rangeIDAllocCount, @@ -1407,25 +1407,25 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Create the intent resolver. s.intentResolver = intentresolver.New(intentresolver.Config{ - Clock: s.cfg.Clock, + Clock: s.Cfg.Clock, DB: s.db, Stopper: stopper, - TaskLimit: s.cfg.IntentResolverTaskLimit, - AmbientCtx: s.cfg.AmbientCtx, - TestingKnobs: s.cfg.TestingKnobs.IntentResolverKnobs, - RangeDescriptorCache: s.cfg.RangeDescriptorCache, + TaskLimit: s.Cfg.IntentResolverTaskLimit, + AmbientCtx: s.Cfg.AmbientCtx, + TestingKnobs: s.Cfg.TestingKnobs.IntentResolverKnobs, + RangeDescriptorCache: s.Cfg.RangeDescriptorCache, }) s.metrics.registry.AddMetricStruct(s.intentResolver.Metrics) // Create the recovery manager. s.recoveryMgr = txnrecovery.NewManager( - s.cfg.AmbientCtx, s.cfg.Clock, s.db, stopper, + s.Cfg.AmbientCtx, s.Cfg.Clock, s.db, stopper, ) s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics()) s.rangeIDAlloc = idAlloc - now := s.cfg.Clock.Now() + now := s.Cfg.Clock.Now() s.startedAt = now.WallTime // Iterate over all range descriptors, ignoring uncommitted versions @@ -1501,27 +1501,27 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } // Start Raft processing goroutines. - s.cfg.Transport.Listen(s.StoreID(), s) + s.Cfg.Transport.Listen(s.StoreID(), s) s.processRaft(ctx) // Register a callback to unquiesce any ranges with replicas on a // node transitioning from non-live to live. - if s.cfg.NodeLiveness != nil { - s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) + if s.Cfg.NodeLiveness != nil { + s.Cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) } // Gossip is only ever nil while bootstrapping a cluster and // in unittests. - if s.cfg.Gossip != nil { + if s.Cfg.Gossip != nil { // Register update channel for any changes to the system config. // This may trigger splits along structured boundaries, // and update max range bytes. - gossipUpdateC := s.cfg.Gossip.RegisterSystemConfigChannel() + gossipUpdateC := s.Cfg.Gossip.RegisterSystemConfigChannel() s.stopper.RunWorker(ctx, func(context.Context) { for { select { case <-gossipUpdateC: - cfg := s.cfg.Gossip.GetSystemConfig() + cfg := s.Cfg.Gossip.GetSystemConfig() s.systemGossipUpdate(cfg) case <-s.stopper.ShouldStop(): return @@ -1540,7 +1540,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // from returning (as doing so might prevent Gossip from ever connecting). s.stopper.RunWorker(ctx, func(context.Context) { select { - case <-s.cfg.Gossip.Connected: + case <-s.Cfg.Gossip.Connected: s.scanner.Start(s.stopper) case <-s.stopper.ShouldStop(): return @@ -1548,7 +1548,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { }) } - if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { + if !s.Cfg.TestingKnobs.DisableAutomaticLeaseRenewal { s.startLeaseRenewer(ctx) } @@ -1557,7 +1557,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { if s.replicateQueue != nil { s.storeRebalancer = NewStoreRebalancer( - s.cfg.AmbientCtx, s.cfg.Settings, s.replicateQueue, s.replRankings) + s.Cfg.AmbientCtx, s.Cfg.Settings, s.replicateQueue, s.replRankings) s.storeRebalancer.Start(ctx, s.stopper) } @@ -1593,7 +1593,7 @@ func (s *Store) startGossip() { return pErr.GoError() } - if s.cfg.TestingKnobs.DisablePeriodicGossips { + if s.Cfg.TestingKnobs.DisablePeriodicGossips { wakeReplica = func(context.Context, *Replica) error { return errPeriodicGossipsDisabled } @@ -1613,7 +1613,7 @@ func (s *Store) startGossip() { return repl.maybeGossipFirstRange(ctx).GoError() }, description: "first range descriptor", - interval: s.cfg.SentinelGossipTTL() / 2, + interval: s.Cfg.SentinelGossipTTL() / 2, }, { key: keys.SystemConfigSpan.Key, @@ -1692,7 +1692,7 @@ func (s *Store) startLeaseRenewer(ctx context.Context) { // up more often that strictly necessary, but it's more maintainable than // attempting to accurately determine exactly when each iteration of a // lease expires and when we should attempt to renew it as a result. - renewalDuration := s.cfg.RangeLeaseActiveDuration() / 5 + renewalDuration := s.Cfg.RangeLeaseActiveDuration() / 5 for { s.renewableLeases.Range(func(k int64, v unsafe.Pointer) bool { repl := (*Replica)(v) @@ -1730,7 +1730,7 @@ func (s *Store) startClosedTimestampRangefeedSubscriber(ctx context.Context) { ch := make(chan ctpb.Entry, 8) const name = "closedts-rangefeed-subscriber" if err := s.stopper.RunAsyncTask(ctx, name, func(ctx context.Context) { - s.cfg.ClosedTimestamp.Provider.Subscribe(ctx, ch) + s.Cfg.ClosedTimestamp.Provider.Subscribe(ctx, ch) }); err != nil { return } @@ -1808,7 +1808,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { // For every range, update its zone config and check if it needs to // be split or merged. - now := s.cfg.Clock.Now() + now := s.Cfg.Clock.Now() newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { key := repl.Desc().StartKey zone, err := sysCfg.GetZoneConfigForKey(key) @@ -1816,7 +1816,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { if log.V(1) { log.Infof(context.TODO(), "failed to get zone config for key %s", key) } - zone = s.cfg.DefaultZoneConfig + zone = s.Cfg.DefaultZoneConfig } repl.SetZoneConfig(zone) s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { @@ -1857,9 +1857,9 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // the usual periodic interval. Re-gossip more rapidly for RangeCount // changes because allocators with stale information are much more // likely to make bad decisions. - rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction + rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.Cfg.GossipWhenCapacityDeltaExceedsFraction atomic.StoreInt32(&s.gossipRangeCountdown, int32(math.Ceil(math.Min(rangeCountdown, 3)))) - leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.cfg.GossipWhenCapacityDeltaExceedsFraction + leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.Cfg.GossipWhenCapacityDeltaExceedsFraction atomic.StoreInt32(&s.gossipLeaseCountdown, int32(math.Ceil(math.Max(leaseCountdown, 1)))) syncutil.StoreFloat64(&s.gossipQueriesPerSecondVal, storeDesc.Capacity.QueriesPerSecond) syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, storeDesc.Capacity.WritesPerSecond) @@ -1867,7 +1867,7 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // Unique gossip key per store. gossipStoreKey := gossip.MakeStoreKey(storeDesc.StoreID) // Gossip store descriptor. - return s.cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) + return s.Cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) } type capacityChangeEvent int @@ -1884,7 +1884,7 @@ const ( // immediate gossip of this store's descriptor, to include updated // capacity information. func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityChangeEvent) { - if s.cfg.TestingKnobs.DisableLeaseCapacityGossip && (cce == leaseAddEvent || cce == leaseRemoveEvent) { + if s.Cfg.TestingKnobs.DisableLeaseCapacityGossip && (cce == leaseAddEvent || cce == leaseRemoveEvent) { return } @@ -2161,16 +2161,16 @@ func (s *Store) ClusterID() uuid.UUID { return s.Ident.ClusterID } func (s *Store) StoreID() roachpb.StoreID { return s.Ident.StoreID } // Clock accessor. -func (s *Store) Clock() *hlc.Clock { return s.cfg.Clock } +func (s *Store) Clock() *hlc.Clock { return s.Cfg.Clock } // Engine accessor. func (s *Store) Engine() storage.Engine { return s.engine } // DB accessor. -func (s *Store) DB() *kv.DB { return s.cfg.DB } +func (s *Store) DB() *kv.DB { return s.Cfg.DB } // Gossip accessor. -func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip } +func (s *Store) Gossip() *gossip.Gossip { return s.Cfg.Gossip } // Compactor accessor. func (s *Store) Compactor() *compactor.Compactor { return s.compactor } @@ -2179,7 +2179,7 @@ func (s *Store) Compactor() *compactor.Compactor { return s.compactor } func (s *Store) Stopper() *stop.Stopper { return s.stopper } // TestingKnobs accessor. -func (s *Store) TestingKnobs() *StoreTestingKnobs { return &s.cfg.TestingKnobs } +func (s *Store) TestingKnobs() *StoreTestingKnobs { return &s.Cfg.TestingKnobs } // IsDraining accessor. func (s *Store) IsDraining() bool { @@ -2219,7 +2219,7 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) { return capacity, err } - now := s.cfg.Clock.Now() + now := s.Cfg.Clock.Now() var leaseCount int32 var rangeCount int32 var logicalBytes int64 @@ -2375,10 +2375,10 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { behindCount int64 ) - timestamp := s.cfg.Clock.Now() + timestamp := s.Cfg.Clock.Now() var livenessMap IsLiveMap - if s.cfg.NodeLiveness != nil { - livenessMap = s.cfg.NodeLiveness.GetIsLiveMap() + if s.Cfg.NodeLiveness != nil { + livenessMap = s.Cfg.NodeLiveness.GetIsLiveMap() } clusterNodes := s.ClusterNodeCount() @@ -2521,7 +2521,7 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { // expected ClusterNodeCount to avoid catching the cluster while the first node // is initialized but the other nodes are not. func (s *Store) ClusterNodeCount() int { - return s.cfg.StorePool.ClusterNodeCount() + return s.Cfg.StorePool.ClusterNodeCount() } // HotReplicaInfo contains a range descriptor and its QPS. @@ -2608,7 +2608,7 @@ func (s *Store) ManuallyEnqueue( return nil, nil, errors.Errorf("unknown queue type %q", queueName) } - sysCfg := s.cfg.Gossip.GetSystemConfig() + sysCfg := s.Cfg.Gossip.GetSystemConfig() if sysCfg == nil { return nil, nil, errors.New("cannot run queue without a valid system config; make sure the cluster " + "has been initialized and all nodes connected to it") @@ -2632,7 +2632,7 @@ func (s *Store) ManuallyEnqueue( if !skipShouldQueue { log.Eventf(ctx, "running %s.shouldQueue", queueName) - shouldQueue, priority := queue.shouldQueue(ctx, s.cfg.Clock.Now(), repl, sysCfg) + shouldQueue, priority := queue.shouldQueue(ctx, s.Cfg.Clock.Now(), repl, sysCfg) log.Eventf(ctx, "shouldQueue=%v, priority=%f", shouldQueue, priority) if !shouldQueue { return collect(), nil, nil diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f7f90bbeecfe..166a933e533a 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -566,7 +566,7 @@ func (s *Store) nodeIsLiveCallback(nodeID roachpb.NodeID) { } func (s *Store) processRaft(ctx context.Context) { - if s.cfg.TestingKnobs.DisableProcessRaft { + if s.Cfg.TestingKnobs.DisableProcessRaft { return } @@ -577,12 +577,12 @@ func (s *Store) processRaft(ctx context.Context) { s.stopper.RunWorker(ctx, s.raftTickLoop) s.stopper.RunWorker(ctx, s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { - s.cfg.Transport.Stop(s.StoreID()) + s.Cfg.Transport.Stop(s.StoreID()) })) } func (s *Store) raftTickLoop(ctx context.Context) { - ticker := time.NewTicker(s.cfg.RaftTickInterval) + ticker := time.NewTicker(s.Cfg.RaftTickInterval) defer ticker.Stop() var rangeIDs []roachpb.RangeID @@ -592,7 +592,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { case <-ticker.C: rangeIDs = rangeIDs[:0] // Update the liveness map. - if s.cfg.NodeLiveness != nil { + if s.Cfg.NodeLiveness != nil { s.updateLivenessMap() } @@ -617,11 +617,11 @@ func (s *Store) raftTickLoop(ctx context.Context) { } func (s *Store) updateLivenessMap() { - nextMap := s.cfg.NodeLiveness.GetIsLiveMap() + nextMap := s.Cfg.NodeLiveness.GetIsLiveMap() for nodeID, entry := range nextMap { if entry.IsLive { // Make sure we ask all live nodes for closed timestamp updates. - s.cfg.ClosedTimestamp.Clients.EnsureClient(nodeID) + s.Cfg.ClosedTimestamp.Clients.EnsureClient(nodeID) continue } // Liveness claims that this node is down, but ConnHealth gets the last say @@ -636,7 +636,7 @@ func (s *Store) updateLivenessMap() { // Raft transport, so ConnHealth should usually indicate a real problem if // it gives us an error back. The check can also have false positives if the // node goes down after populating the map, but that matters even less. - entry.IsLive = (s.cfg.NodeDialer.ConnHealth(nodeID, rpc.SystemClass) == nil) + entry.IsLive = (s.Cfg.NodeDialer.ConnHealth(nodeID, rpc.SystemClass) == nil) nextMap[nodeID] = entry } s.livenessMap.Store(nextMap) @@ -646,7 +646,7 @@ func (s *Store) updateLivenessMap() { // beneficial to have it run on a faster cycle than once per tick, so that // the delay does not impact latency-sensitive features such as quiescence. func (s *Store) coalescedHeartbeatsLoop(ctx context.Context) { - ticker := time.NewTicker(s.cfg.CoalescedHeartbeatsInterval) + ticker := time.NewTicker(s.Cfg.CoalescedHeartbeatsInterval) defer ticker.Stop() for { @@ -699,7 +699,7 @@ func (s *Store) sendQueuedHeartbeatsToNode( log.Infof(ctx, "sending raft request (coalesced) %+v", chReq) } - if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { + if !s.Cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok { (*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID) diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index a72b388e9958..4a74632f78d0 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -81,7 +81,7 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { for _, r := range ranges { repl := &Replica{store: s} repl.mu.state.Desc = &roachpb.RangeDescriptor{} - repl.mu.zone = s.cfg.DefaultZoneConfig + repl.mu.zone = s.Cfg.DefaultZoneConfig for _, storeID := range r.storeIDs { repl.mu.state.Desc.InternalReplicas = append(repl.mu.state.Desc.InternalReplicas, roachpb.ReplicaDescriptor{ NodeID: roachpb.NodeID(storeID), @@ -262,7 +262,7 @@ func TestChooseReplicaToRebalance(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { - s.cfg.DefaultZoneConfig.NumReplicas = proto.Int32(int32(len(tc.storeIDs))) + s.Cfg.DefaultZoneConfig.NumReplicas = proto.Int32(int32(len(tc.storeIDs))) loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}}) hottestRanges := rr.topQPS() _, targets := sr.chooseReplicaToRebalance( diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index ee90d0dc46dc..b4a4f1354a89 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -86,20 +86,20 @@ func (s *Store) Send( return nil, roachpb.NewError(err) } - if s.cfg.TestingKnobs.ClockBeforeSend != nil { - s.cfg.TestingKnobs.ClockBeforeSend(s.cfg.Clock, ba) + if s.Cfg.TestingKnobs.ClockBeforeSend != nil { + s.Cfg.TestingKnobs.ClockBeforeSend(s.Cfg.Clock, ba) } // Update our clock with the incoming request timestamp. This advances the // local node's clock to a high water mark from all nodes with which it has // interacted. - if s.cfg.TestingKnobs.DisableMaxOffsetCheck { - s.cfg.Clock.Update(ba.Timestamp) + if s.Cfg.TestingKnobs.DisableMaxOffsetCheck { + s.Cfg.Clock.Update(ba.Timestamp) } else { // If the command appears to come from a node with a bad clock, // reject it now before we reach that point. var err error - if err = s.cfg.Clock.UpdateAndCheckMaxOffset(ctx, ba.Timestamp); err != nil { + if err = s.Cfg.Clock.UpdateAndCheckMaxOffset(ctx, ba.Timestamp); err != nil { return nil, roachpb.NewError(err) } } @@ -127,7 +127,7 @@ func (s *Store) Send( // Update our clock with the outgoing response txn timestamp // (if timestamp has been forwarded). if ba.Timestamp.Less(br.Txn.WriteTimestamp) { - s.cfg.Clock.Update(br.Txn.WriteTimestamp) + s.Cfg.Clock.Update(br.Txn.WriteTimestamp) } } } else { @@ -135,7 +135,7 @@ func (s *Store) Send( // Update our clock with the outgoing response timestamp. // (if timestamp has been forwarded). if ba.Timestamp.Less(br.Timestamp) { - s.cfg.Clock.Update(br.Timestamp) + s.Cfg.Clock.Update(br.Timestamp) } } } @@ -143,7 +143,7 @@ func (s *Store) Send( // We get the latest timestamp - we know that any // write with a higher timestamp we run into later must // have started after this point in (absolute) time. - now := s.cfg.Clock.Now() + now := s.Cfg.Clock.Now() if pErr != nil { pErr.Now = now } else { @@ -160,7 +160,7 @@ func (s *Store) Send( // this node, in which case the following is a no-op). if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok { txnClone := ba.Txn.Clone() - txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, s.cfg.Clock.Now()) + txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, s.Cfg.Clock.Now()) ba.Txn = txnClone } } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 4576d487579e..c66642fea8cf 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -509,7 +509,7 @@ func (s *Store) reserveSnapshot( // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. } else if header.CanDecline { - storeDesc, ok := s.cfg.StorePool.getStoreDescriptor(s.StoreID()) + storeDesc, ok := s.Cfg.StorePool.getStoreDescriptor(s.StoreID()) if ok && (!maxCapacityCheck(storeDesc) || header.RangeSize > storeDesc.Capacity.Available) { return nil, snapshotStoreTooFullMsg, nil } @@ -726,7 +726,7 @@ func (s *Store) shouldAcceptSnapshotData( func (s *Store) receiveSnapshot( ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream, ) error { - if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { + if fn := s.Cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { return sendSnapshotError(stream, err) } @@ -780,9 +780,9 @@ func (s *Store) receiveSnapshot( } ss = &kvBatchSnapshotStrategy{ - raftCfg: &s.cfg.RaftConfig, + raftCfg: &s.Cfg.RaftConfig, scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID), - sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), + sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.Cfg.Settings.SV), } defer ss.Close(ctx) default: diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 2b940a113186..b6ea7ee623af 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -1090,7 +1090,7 @@ func TestStoreObservedTimestamp(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(context.Background()) store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) - txn := newTransaction("test", test.key, 1, store.cfg.Clock) + txn := newTransaction("test", test.key, 1, store.Cfg.Clock) txn.MaxTimestamp = hlc.MaxTimestamp pArgs := putArgs(test.key, []byte("value")) h := roachpb.Header{ @@ -1158,7 +1158,7 @@ func TestStoreAnnotateNow(t *testing.T) { var txn *roachpb.Transaction pArgs := putArgs(test.key, []byte("value")) if useTxn { - txn = newTransaction("test", test.key, 1, store.cfg.Clock) + txn = newTransaction("test", test.key, 1, store.Cfg.Clock) txn.MaxTimestamp = hlc.MaxTimestamp assignSeqNumsForReqs(txn, &pArgs) } @@ -1242,12 +1242,12 @@ func TestStoreSendUpdateTime(t *testing.T) { defer stopper.Stop(context.Background()) store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) args := getArgs([]byte("a")) - reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0) + reqTS := store.Cfg.Clock.Now().Add(store.Cfg.Clock.MaxOffset().Nanoseconds(), 0) _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args) if pErr != nil { t.Fatal(pErr) } - ts := store.cfg.Clock.Now() + ts := store.Cfg.Clock.Now() if ts.WallTime != reqTS.WallTime || ts.Logical <= reqTS.Logical { t.Errorf("expected store clock to advance to %s; got %s", reqTS, ts) } @@ -1270,9 +1270,9 @@ func TestStoreSendWithZeroTime(t *testing.T) { } // The Logical time will increase over the course of the command // execution so we can only rely on comparing the WallTime. - if br.Timestamp.WallTime != store.cfg.Clock.Now().WallTime { + if br.Timestamp.WallTime != store.Cfg.Clock.Now().WallTime { t.Errorf("expected reply to have store clock time %s; got %s", - store.cfg.Clock.Now(), br.Timestamp) + store.Cfg.Clock.Now(), br.Timestamp) } } @@ -1286,7 +1286,7 @@ func TestStoreSendWithClockOffset(t *testing.T) { store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) args := getArgs([]byte("a")) // Set args timestamp to exceed max offset. - reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0) + reqTS := store.Cfg.Clock.Now().Add(store.Cfg.Clock.MaxOffset().Nanoseconds()+1, 0) _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args) if !testutils.IsPError(pErr, "remote wall time is too far ahead") { t.Errorf("unexpected error: %v", pErr) @@ -1466,13 +1466,13 @@ func TestStoreSetRangesMaxBytes(t *testing.T) { expMaxBytes int64 }{ {store.LookupReplica(roachpb.RKeyMin), - *store.cfg.DefaultZoneConfig.RangeMaxBytes}, + *store.Cfg.DefaultZoneConfig.RangeMaxBytes}, {splitTestRange( store, roachpb.RKeyMin, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID)), t), 1 << 20}, {splitTestRange( store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID)), roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+1)), t), - *store.cfg.DefaultZoneConfig.RangeMaxBytes}, + *store.Cfg.DefaultZoneConfig.RangeMaxBytes}, {splitTestRange( store, roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+1)), roachpb.RKey(keys.SystemSQLCodec.TablePrefix(baseID+2)), t), 2 << 20}, @@ -1532,8 +1532,8 @@ func TestStoreResolveWriteIntent(t *testing.T) { for i, resolvable := range []bool{true, false} { key := roachpb.Key(fmt.Sprintf("key-%d", i)) - pusher := newTransaction("test", key, 1, store.cfg.Clock) - pushee := newTransaction("test", key, 1, store.cfg.Clock) + pusher := newTransaction("test", key, 1, store.Cfg.Clock) + pushee := newTransaction("test", key, 1, store.Cfg.Clock) if resolvable { pushee.Priority = enginepb.MinTxnPriority pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. @@ -1600,8 +1600,8 @@ func TestStoreResolveWriteIntentRollback(t *testing.T) { store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) key := roachpb.Key("a") - pusher := newTransaction("test", key, 1, store.cfg.Clock) - pushee := newTransaction("test", key, 1, store.cfg.Clock) + pusher := newTransaction("test", key, 1, store.Cfg.Clock) + pushee := newTransaction("test", key, 1, store.Cfg.Clock) pushee.Priority = enginepb.MinTxnPriority pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. @@ -1726,8 +1726,8 @@ func TestStoreResolveWriteIntentPushOnRead(t *testing.T) { } } - pusher := newTransaction("pusher", key, 1, store.cfg.Clock) - pushee := newTransaction("pushee", key, 1, store.cfg.Clock) + pusher := newTransaction("pusher", key, 1, store.Cfg.Clock) + pushee := newTransaction("pushee", key, 1, store.Cfg.Clock) // Set transaction priorities. if tc.pusherWillWin { @@ -1749,7 +1749,7 @@ func TestStoreResolveWriteIntentPushOnRead(t *testing.T) { } // Determine the timestamp to read at. - readTs := store.cfg.Clock.Now() + readTs := store.Cfg.Clock.Now() // Give the pusher a previous observed timestamp equal to this read // timestamp. This ensures that the pusher doesn't need to push the // intent any higher just to push it out of its uncertainty window. @@ -1757,10 +1757,10 @@ func TestStoreResolveWriteIntentPushOnRead(t *testing.T) { // If the pushee is already pushed, update the transaction record. if tc.pusheeAlreadyPushed { - pushedTs := store.cfg.Clock.Now() + pushedTs := store.Cfg.Clock.Now() pushee.WriteTimestamp.Forward(pushedTs) pushee.ReadTimestamp.Forward(pushedTs) - hb, hbH := heartbeatArgs(pushee, store.cfg.Clock.Now()) + hb, hbH := heartbeatArgs(pushee, store.Cfg.Clock.Now()) if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), hbH, &hb); pErr != nil { t.Fatal(pErr) } @@ -1826,7 +1826,7 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) { store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) key := roachpb.Key("a") - pushee := newTransaction("test", key, 1, store.cfg.Clock) + pushee := newTransaction("test", key, 1, store.Cfg.Clock) // First, write the pushee's txn via HeartbeatTxn request. hb, hbH := heartbeatArgs(pushee, pushee.WriteTimestamp) @@ -1842,7 +1842,7 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) { } // Now, try to read outside a transaction. - getTS := store.cfg.Clock.Now() // accessed later + getTS := store.Cfg.Clock.Now() // accessed later { gArgs := getArgs(key) if reply, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ @@ -1857,7 +1857,7 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) { { // Next, try to write outside of a transaction. We will succeed in pushing txn. - putTS := store.cfg.Clock.Now() + putTS := store.Cfg.Clock.Now() args.Value.SetBytes([]byte("value2")) if _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ Timestamp: putTS, @@ -1946,8 +1946,8 @@ func TestStoreReadInconsistent(t *testing.T) { priority = -1 } args.Value.SetBytes([]byte("value2")) - txnA := newTransaction("testA", keyA, priority, store.cfg.Clock) - txnB := newTransaction("testB", keyB, priority, store.cfg.Clock) + txnA := newTransaction("testA", keyA, priority, store.Cfg.Clock) + txnB := newTransaction("testB", keyB, priority, store.Cfg.Clock) for _, txn := range []*roachpb.Transaction{txnA, txnB} { args.Key = txn.Key assignSeqNumsForReqs(txn, &args) @@ -2248,7 +2248,7 @@ func TestStoreScanIntents(t *testing.T) { if test.canPush { priority = roachpb.MinUserPriority } - txn = newTransaction(fmt.Sprintf("test-%d", i), key, priority, store.cfg.Clock) + txn = newTransaction(fmt.Sprintf("test-%d", i), key, priority, store.Cfg.Clock) } args := putArgs(key, []byte(fmt.Sprintf("value%02d", j))) assignSeqNumsForReqs(txn, &args) @@ -2338,7 +2338,7 @@ func TestStoreScanInconsistentResolvesIntents(t *testing.T) { store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) // Lay down 10 intents to scan over. - txn := newTransaction("test", roachpb.Key("foo"), 1, store.cfg.Clock) + txn := newTransaction("test", roachpb.Key("foo"), 1, store.Cfg.Clock) keys := []roachpb.Key{} for j := 0; j < 10; j++ { key := roachpb.Key(fmt.Sprintf("key%02d", j)) @@ -2388,7 +2388,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) { // Lay down two intents from two txns to scan over. key1 := roachpb.Key("bar") - txn1 := newTransaction("test1", key1, 1, store.cfg.Clock) + txn1 := newTransaction("test1", key1, 1, store.Cfg.Clock) args := putArgs(key1, []byte("value1")) assignSeqNumsForReqs(txn1, &args) if _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Txn: txn1}, &args); pErr != nil { @@ -2396,7 +2396,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) { } key2 := roachpb.Key("foo") - txn2 := newTransaction("test2", key2, 1, store.cfg.Clock) + txn2 := newTransaction("test2", key2, 1, store.Cfg.Clock) args = putArgs(key2, []byte("value2")) assignSeqNumsForReqs(txn2, &args) if _, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Txn: txn2}, &args); pErr != nil { @@ -2442,7 +2442,7 @@ func TestStoreScanMultipleIntents(t *testing.T) { // Lay down ten intents from a single txn. key1 := roachpb.Key("key00") key10 := roachpb.Key("key09") - txn := newTransaction("test", key1, 1, store.cfg.Clock) + txn := newTransaction("test", key1, 1, store.Cfg.Clock) ba := roachpb.BatchRequest{} for i := 0; i < 10; i++ { pArgs := putArgs(roachpb.Key(fmt.Sprintf("key%02d", i)), []byte("value")) @@ -2483,7 +2483,7 @@ func TestStoreBadRequests(t *testing.T) { defer stopper.Stop(context.Background()) store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) - txn := newTransaction("test", roachpb.Key("a"), 1 /* priority */, store.cfg.Clock) + txn := newTransaction("test", roachpb.Key("a"), 1 /* priority */, store.Cfg.Clock) args1 := getArgs(roachpb.Key("a")) args1.EndKey = roachpb.Key("b") @@ -3105,9 +3105,9 @@ func TestReserveSnapshotFullnessLimit(t *testing.T) { desc.Capacity.Available = 1 desc.Capacity.Used = desc.Capacity.Capacity - desc.Capacity.Available - s.cfg.StorePool.detailsMu.Lock() - s.cfg.StorePool.getStoreDetailLocked(desc.StoreID).desc = desc - s.cfg.StorePool.detailsMu.Unlock() + s.Cfg.StorePool.detailsMu.Lock() + s.Cfg.StorePool.getStoreDetailLocked(desc.StoreID).desc = desc + s.Cfg.StorePool.detailsMu.Unlock() // A declinable snapshot to a nearly full store should be rejected. cleanupRejected, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ @@ -3147,9 +3147,9 @@ func TestReserveSnapshotFullnessLimit(t *testing.T) { // available disk space should be rejected. desc.Capacity.Available = desc.Capacity.Capacity / 2 desc.Capacity.Used = desc.Capacity.Capacity - desc.Capacity.Available - s.cfg.StorePool.detailsMu.Lock() - s.cfg.StorePool.getStoreDetailLocked(desc.StoreID).desc = desc - s.cfg.StorePool.detailsMu.Unlock() + s.Cfg.StorePool.detailsMu.Lock() + s.Cfg.StorePool.getStoreDetailLocked(desc.StoreID).desc = desc + s.Cfg.StorePool.detailsMu.Unlock() // A declinable snapshot to a nearly full store should be rejected. cleanupRejected2, rejectionMsg, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index 0af9f20b4d33..ec3f79a23a9a 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -103,7 +103,7 @@ func newTimeSeriesMaintenanceQueue( // Begin logging messages if we exceed our planned memory usage by // more than triple. TimeSeriesMaintenanceMemoryBudget*3, - store.cfg.Settings, + store.Cfg.Settings, ), } q.baseQueue = newBaseQueue( @@ -126,7 +126,7 @@ func newTimeSeriesMaintenanceQueue( func (q *timeSeriesMaintenanceQueue) shouldQueue( ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig, ) (shouldQ bool, priority float64) { - if !repl.store.cfg.TestingKnobs.DisableLastProcessedCheck { + if !repl.store.Cfg.TestingKnobs.DisableLastProcessedCheck { lpTS, err := repl.getQueueLastProcessed(ctx, q.name) if err != nil { return false, 0