diff --git a/pkg/cli/start.go b/pkg/cli/start.go index dc94e8e5cabd..b2bf11d71c0d 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -33,7 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/geo/geos" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/security/clientsecopts" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -1106,7 +1106,7 @@ func reportServerInfo( nodeID := serverCfg.BaseConfig.IDContainer.Get() if serverCfg.SQLConfig.TenantID.IsSystem() { if initialStart { - if nodeID == kvserver.FirstNodeID { + if nodeID == kvstorage.FirstNodeID { buf.Printf("status:\tinitialized new cluster\n") } else { buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n") diff --git a/pkg/kv/kvserver/kvstorage/init.go b/pkg/kv/kvserver/kvstorage/init.go index 9a7e96771148..af34e058a5f9 100644 --- a/pkg/kv/kvserver/kvstorage/init.go +++ b/pkg/kv/kvserver/kvstorage/init.go @@ -26,6 +26,123 @@ import ( "github.com/cockroachdb/errors" ) +// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster. +const FirstNodeID = roachpb.NodeID(1) + +// FirstStoreID is the StoreID assigned to the first store on the node with ID +// FirstNodeID. +const FirstStoreID = roachpb.StoreID(1) + +// InitEngine writes a new store ident to the underlying engine. To +// ensure that no crufty data already exists in the engine, it scans +// the engine contents before writing the new store ident. The engine +// should be completely empty save for a cluster version, which must +// already have been persisted to it. Returns an error if this is not +// the case. +func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { + exIdent, err := ReadStoreIdent(ctx, eng) + if err == nil { + return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) + } + if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + return err + } + + if err := checkCanInitializeEngine(ctx, eng); err != nil { + return errors.Wrap(err, "while trying to initialize engine") + } + + batch := eng.NewBatch() + if err := storage.MVCCPutProto( + ctx, + batch, + nil, + keys.StoreIdentKey(), + hlc.Timestamp{}, + hlc.ClockTimestamp{}, + nil, + &ident, + ); err != nil { + batch.Close() + return err + } + if err := batch.Commit(true /* sync */); err != nil { + return errors.Wrap(err, "persisting engine initialization data") + } + + return nil +} + +// checkCanInitializeEngine ensures that the engine is empty except for a +// cluster version, which must be present. +func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { + // See if this is an already-bootstrapped store. + ident, err := ReadStoreIdent(ctx, eng) + if err == nil { + return errors.Errorf("engine already initialized as %s", ident.String()) + } else if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + return errors.Wrap(err, "unable to read store ident") + } + // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a + // cluster version, cached settings and nothing else? Note that there is one + // cluster version key and many cached settings key, and the cluster version + // key precedes the cached settings. + // + // We use an EngineIterator to ensure that there are no keys that cannot be + // parsed as MVCCKeys (e.g. lock table keys) in the engine. + iter := eng.NewEngineIterator(storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + UpperBound: roachpb.KeyMax, + }) + defer iter.Close() + valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin}) + if !valid { + if err == nil { + return errors.New("no cluster version found on uninitialized engine") + } + return err + } + getMVCCKey := func() (storage.MVCCKey, error) { + if _, hasRange := iter.HasPointAndRange(); hasRange { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return storage.MVCCKey{}, err + } + return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds) + } + var k storage.EngineKey + k, err = iter.EngineKey() + if err != nil { + return storage.MVCCKey{}, err + } + if !k.IsMVCCKey() { + return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k) + } + return k.ToMVCCKey() + } + var k storage.MVCCKey + if k, err = getMVCCKey(); err != nil { + return err + } + if !k.Key.Equal(keys.StoreClusterVersionKey()) { + return errors.New("no cluster version found on uninitialized engine") + } + valid, err = iter.NextEngineKey() + for valid { + // Only allowed to find cached cluster settings on an uninitialized + // engine. + if k, err = getMVCCKey(); err != nil { + return err + } + if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil { + return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String()) + } + // There may be more cached cluster settings, so continue iterating. + valid, err = iter.NextEngineKey() + } + return err +} + // IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such // as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in // principle exist at any RangeID, and this helper efficiently discovers all the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cff7a26dcc3e..f3907716bd8f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2698,76 +2698,6 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64, return hlcUpperBound, nil } -// checkCanInitializeEngine ensures that the engine is empty except for a -// cluster version, which must be present. -func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { - // See if this is an already-bootstrapped store. - ident, err := kvstorage.ReadStoreIdent(ctx, eng) - if err == nil { - return errors.Errorf("engine already initialized as %s", ident.String()) - } else if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { - return errors.Wrap(err, "unable to read store ident") - } - // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a - // cluster version, cached settings and nothing else? Note that there is one - // cluster version key and many cached settings key, and the cluster version - // key precedes the cached settings. - // - // We use an EngineIterator to ensure that there are no keys that cannot be - // parsed as MVCCKeys (e.g. lock table keys) in the engine. - iter := eng.NewEngineIterator(storage.IterOptions{ - KeyTypes: storage.IterKeyTypePointsAndRanges, - UpperBound: roachpb.KeyMax, - }) - defer iter.Close() - valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin}) - if !valid { - if err == nil { - return errors.New("no cluster version found on uninitialized engine") - } - return err - } - getMVCCKey := func() (storage.MVCCKey, error) { - if _, hasRange := iter.HasPointAndRange(); hasRange { - bounds, err := iter.EngineRangeBounds() - if err != nil { - return storage.MVCCKey{}, err - } - return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds) - } - var k storage.EngineKey - k, err = iter.EngineKey() - if err != nil { - return storage.MVCCKey{}, err - } - if !k.IsMVCCKey() { - return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k) - } - return k.ToMVCCKey() - } - var k storage.MVCCKey - if k, err = getMVCCKey(); err != nil { - return err - } - if !k.Key.Equal(keys.StoreClusterVersionKey()) { - return errors.New("no cluster version found on uninitialized engine") - } - valid, err = iter.NextEngineKey() - for valid { - // Only allowed to find cached cluster settings on an uninitialized - // engine. - if k, err = getMVCCKey(); err != nil { - return err - } - if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil { - return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String()) - } - // There may be more cached cluster settings, so continue iterating. - valid, err = iter.NextEngineKey() - } - return err -} - // GetReplica fetches a replica by Range ID. Returns an error if no replica is found. // // See also GetReplicaIfExists for a more perfomant version. diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index b1665c4d198e..bed46a0ba29b 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -23,56 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) -// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster. -const FirstNodeID = roachpb.NodeID(1) - -// FirstStoreID is the StoreID assigned to the first store on the node with ID -// FirstNodeID. -const FirstStoreID = roachpb.StoreID(1) - -// InitEngine writes a new store ident to the underlying engine. To -// ensure that no crufty data already exists in the engine, it scans -// the engine contents before writing the new store ident. The engine -// should be completely empty save for a cluster version, which must -// already have been persisted to it. Returns an error if this is not -// the case. -func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { - exIdent, err := kvstorage.ReadStoreIdent(ctx, eng) - if err == nil { - return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) - } - if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { - return err - } - - if err := checkCanInitializeEngine(ctx, eng); err != nil { - return errors.Wrap(err, "while trying to initialize engine") - } - - batch := eng.NewBatch() - if err := storage.MVCCPutProto( - ctx, - batch, - nil, - keys.StoreIdentKey(), - hlc.Timestamp{}, - hlc.ClockTimestamp{}, - nil, - &ident, - ); err != nil { - batch.Close() - return err - } - if err := batch.Commit(true /* sync */); err != nil { - return errors.Wrap(err, "persisting engine initialization data") - } - - return nil -} - // WriteInitialClusterData writes initialization data to an engine. It creates // system ranges (filling in meta1 and meta2) and the default zone config. // @@ -113,9 +65,9 @@ func WriteInitialClusterData( // Initialize various sequence generators. var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value - nodeIDVal.SetInt(int64(FirstNodeID)) + nodeIDVal.SetInt(int64(kvstorage.FirstNodeID)) // The caller will initialize the stores with ids FirstStoreID, ..., FirstStoreID+numStores-1. - storeIDVal.SetInt(int64(FirstStoreID) + int64(numStores) - 1) + storeIDVal.SetInt(int64(kvstorage.FirstStoreID) + int64(numStores) - 1) // The last range has id = len(splits) + 1 rangeIDVal.SetInt(int64(len(splits) + 1)) @@ -129,7 +81,7 @@ func WriteInitialClusterData( // // [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens. // [2]: See `(*NodeLiveness).Start` for where that happens. - livenessRecord := livenesspb.Liveness{NodeID: FirstNodeID, Epoch: 0} + livenessRecord := livenesspb.Liveness{NodeID: kvstorage.FirstNodeID, Epoch: 0} if err := livenessVal.SetProto(&livenessRecord); err != nil { return err } @@ -137,7 +89,7 @@ func WriteInitialClusterData( roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal}, roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal}, roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}, - roachpb.KeyValue{Key: keys.NodeLivenessKey(FirstNodeID), Value: livenessVal}) + roachpb.KeyValue{Key: keys.NodeLivenessKey(kvstorage.FirstNodeID), Value: livenessVal}) // firstRangeMS is going to accumulate the stats for the first range, as we // write the meta records for all the other ranges. @@ -182,8 +134,8 @@ func WriteInitialClusterData( const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { - NodeID: FirstNodeID, - StoreID: FirstStoreID, + NodeID: kvstorage.FirstNodeID, + StoreID: kvstorage.FirstStoreID, ReplicaID: firstReplicaID, }, } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 739fe928a024..7e9352a79200 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -264,7 +264,7 @@ func createTestStoreWithoutStart( cv = clusterversion.ClusterVersion{Version: opts.bootstrapVersion} } require.NoError(t, WriteClusterVersion(ctx, eng, cv)) - if err := InitEngine( + if err := kvstorage.InitEngine( ctx, eng, storeIdent, ); err != nil { t.Fatal(err) @@ -490,7 +490,7 @@ func TestInitializeEngineErrors(t *testing.T) { stopper.AddCloser(eng) // Bootstrap should fail if engine has no cluster version yet. - err := InitEngine(ctx, eng, testIdent) + err := kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "no cluster version") require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) @@ -507,7 +507,7 @@ func TestInitializeEngineErrors(t *testing.T) { require.ErrorIs(t, err, &kvstorage.NotBootstrappedError{}) // Bootstrap should fail on non-empty engine. - err = InitEngine(ctx, eng, testIdent) + err = kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "cannot be bootstrapped") // Bootstrap should fail on MVCC range key in engine. @@ -517,7 +517,7 @@ func TestInitializeEngineErrors(t *testing.T) { Timestamp: hlc.MinTimestamp, }, storage.MVCCValue{})) - err = InitEngine(ctx, eng, testIdent) + err = kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "found mvcc range key") } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 5ba452010169..b109656bbc5e 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -419,6 +419,7 @@ go_test( "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", diff --git a/pkg/server/init.go b/pkg/server/init.go index 2b345c22b92a..016a7e8cbd20 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -550,7 +550,7 @@ func (s *initServer) initializeFirstStoreAfterJoin( if err != nil { return nil, err } - if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, firstEngine, sIdent); err != nil { return nil, err } diff --git a/pkg/server/initial_sql.go b/pkg/server/initial_sql.go index 61d379284318..45c8d7629060 100644 --- a/pkg/server/initial_sql.go +++ b/pkg/server/initial_sql.go @@ -14,7 +14,7 @@ import ( "context" "fmt" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -30,7 +30,7 @@ import ( func (s *Server) RunInitialSQL( ctx context.Context, startSingleNode bool, adminUser, adminPassword string, ) error { - newCluster := s.InitialStart() && s.NodeID() == kvserver.FirstNodeID + newCluster := s.InitialStart() && s.NodeID() == kvstorage.FirstNodeID if !newCluster { // The initial SQL code only runs the first time the cluster is initialized. return nil diff --git a/pkg/server/node.go b/pkg/server/node.go index 69b0defe0c9a..214899dc2fe6 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -320,13 +320,13 @@ func bootstrapCluster( sIdent := roachpb.StoreIdent{ ClusterID: clusterID, - NodeID: kvserver.FirstNodeID, - StoreID: kvserver.FirstStoreID + roachpb.StoreID(i), + NodeID: kvstorage.FirstNodeID, + StoreID: kvstorage.FirstStoreID + roachpb.StoreID(i), } // Initialize the engine backing the store with the store ident and cluster // version. - if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil { return nil, err } @@ -655,7 +655,7 @@ func (n *Node) initializeAdditionalStores( StoreID: startID, } for _, eng := range engines { - if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil { return err } diff --git a/pkg/server/node_tombstone_storage_test.go b/pkg/server/node_tombstone_storage_test.go index 9e9c3d75862a..384e184c554c 100644 --- a/pkg/server/node_tombstone_storage_test.go +++ b/pkg/server/node_tombstone_storage_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -41,7 +42,7 @@ func TestNodeTombstoneStorage(t *testing.T) { require.NoError(t, err) for i := range engs { require.NoError(t, kvserver.WriteClusterVersion(ctx, engs[i], clusterversion.TestingClusterVersion)) - require.NoError(t, kvserver.InitEngine(ctx, engs[i], roachpb.StoreIdent{ + require.NoError(t, kvstorage.InitEngine(ctx, engs[i], roachpb.StoreIdent{ ClusterID: id, NodeID: 1, StoreID: roachpb.StoreID(1 + i), diff --git a/pkg/testutils/localtestcluster/BUILD.bazel b/pkg/testutils/localtestcluster/BUILD.bazel index 9921218f9843..d16746fc458d 100644 --- a/pkg/testutils/localtestcluster/BUILD.bazel +++ b/pkg/testutils/localtestcluster/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/sidetransport", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 3a64df49b7e0..7683056c3760 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -211,7 +212,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil { t.Fatalf("unable to write cluster version: %s", err) } - if err := kvserver.InitEngine( + if err := kvstorage.InitEngine( ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1}, ); err != nil { t.Fatalf("unable to start local test cluster: %s", err)