Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,kvstorage: move InitEngine #95432

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/security/clientsecopts"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func reportServerInfo(
nodeID := serverCfg.BaseConfig.IDContainer.Get()
if serverCfg.SQLConfig.TenantID.IsSystem() {
if initialStart {
if nodeID == kvserver.FirstNodeID {
if nodeID == kvstorage.FirstNodeID {
buf.Printf("status:\tinitialized new cluster\n")
} else {
buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n")
Expand Down
117 changes: 117 additions & 0 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,123 @@ import (
"github.com/cockroachdb/errors"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
const FirstNodeID = roachpb.NodeID(1)

// FirstStoreID is the StoreID assigned to the first store on the node with ID
// FirstNodeID.
const FirstStoreID = roachpb.StoreID(1)

// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty save for a cluster version, which must
// already have been persisted to it. Returns an error if this is not
// the case.
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
if err := storage.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&ident,
); err != nil {
batch.Close()
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// checkCanInitializeEngine ensures that the engine is empty except for a
// cluster version, which must be present.
func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
// See if this is an already-bootstrapped store.
ident, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine already initialized as %s", ident.String())
} else if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return errors.Wrap(err, "unable to read store ident")
}
// Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a
// cluster version, cached settings and nothing else? Note that there is one
// cluster version key and many cached settings key, and the cluster version
// key precedes the cached settings.
//
// We use an EngineIterator to ensure that there are no keys that cannot be
// parsed as MVCCKeys (e.g. lock table keys) in the engine.
iter := eng.NewEngineIterator(storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
if !valid {
if err == nil {
return errors.New("no cluster version found on uninitialized engine")
}
return err
}
getMVCCKey := func() (storage.MVCCKey, error) {
if _, hasRange := iter.HasPointAndRange(); hasRange {
bounds, err := iter.EngineRangeBounds()
if err != nil {
return storage.MVCCKey{}, err
}
return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds)
}
var k storage.EngineKey
k, err = iter.EngineKey()
if err != nil {
return storage.MVCCKey{}, err
}
if !k.IsMVCCKey() {
return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k)
}
return k.ToMVCCKey()
}
var k storage.MVCCKey
if k, err = getMVCCKey(); err != nil {
return err
}
if !k.Key.Equal(keys.StoreClusterVersionKey()) {
return errors.New("no cluster version found on uninitialized engine")
}
valid, err = iter.NextEngineKey()
for valid {
// Only allowed to find cached cluster settings on an uninitialized
// engine.
if k, err = getMVCCKey(); err != nil {
return err
}
if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil {
return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String())
}
// There may be more cached cluster settings, so continue iterating.
valid, err = iter.NextEngineKey()
}
return err
}

// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such
// as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in
// principle exist at any RangeID, and this helper efficiently discovers all the
Expand Down
70 changes: 0 additions & 70 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2698,76 +2698,6 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64,
return hlcUpperBound, nil
}

// checkCanInitializeEngine ensures that the engine is empty except for a
// cluster version, which must be present.
func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
// See if this is an already-bootstrapped store.
ident, err := kvstorage.ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine already initialized as %s", ident.String())
} else if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) {
return errors.Wrap(err, "unable to read store ident")
}
// Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a
// cluster version, cached settings and nothing else? Note that there is one
// cluster version key and many cached settings key, and the cluster version
// key precedes the cached settings.
//
// We use an EngineIterator to ensure that there are no keys that cannot be
// parsed as MVCCKeys (e.g. lock table keys) in the engine.
iter := eng.NewEngineIterator(storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
if !valid {
if err == nil {
return errors.New("no cluster version found on uninitialized engine")
}
return err
}
getMVCCKey := func() (storage.MVCCKey, error) {
if _, hasRange := iter.HasPointAndRange(); hasRange {
bounds, err := iter.EngineRangeBounds()
if err != nil {
return storage.MVCCKey{}, err
}
return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds)
}
var k storage.EngineKey
k, err = iter.EngineKey()
if err != nil {
return storage.MVCCKey{}, err
}
if !k.IsMVCCKey() {
return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k)
}
return k.ToMVCCKey()
}
var k storage.MVCCKey
if k, err = getMVCCKey(); err != nil {
return err
}
if !k.Key.Equal(keys.StoreClusterVersionKey()) {
return errors.New("no cluster version found on uninitialized engine")
}
valid, err = iter.NextEngineKey()
for valid {
// Only allowed to find cached cluster settings on an uninitialized
// engine.
if k, err = getMVCCKey(); err != nil {
return err
}
if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil {
return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String())
}
// There may be more cached cluster settings, so continue iterating.
valid, err = iter.NextEngineKey()
}
return err
}

// GetReplica fetches a replica by Range ID. Returns an error if no replica is found.
//
// See also GetReplicaIfExists for a more perfomant version.
Expand Down
60 changes: 6 additions & 54 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
const FirstNodeID = roachpb.NodeID(1)

// FirstStoreID is the StoreID assigned to the first store on the node with ID
// FirstNodeID.
const FirstStoreID = roachpb.StoreID(1)

// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty save for a cluster version, which must
// already have been persisted to it. Returns an error if this is not
// the case.
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := kvstorage.ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
if err := storage.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&ident,
); err != nil {
batch.Close()
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// WriteInitialClusterData writes initialization data to an engine. It creates
// system ranges (filling in meta1 and meta2) and the default zone config.
//
Expand Down Expand Up @@ -113,9 +65,9 @@ func WriteInitialClusterData(
// Initialize various sequence generators.
var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value

nodeIDVal.SetInt(int64(FirstNodeID))
nodeIDVal.SetInt(int64(kvstorage.FirstNodeID))
// The caller will initialize the stores with ids FirstStoreID, ..., FirstStoreID+numStores-1.
storeIDVal.SetInt(int64(FirstStoreID) + int64(numStores) - 1)
storeIDVal.SetInt(int64(kvstorage.FirstStoreID) + int64(numStores) - 1)
// The last range has id = len(splits) + 1
rangeIDVal.SetInt(int64(len(splits) + 1))

Expand All @@ -129,15 +81,15 @@ func WriteInitialClusterData(
//
// [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens.
// [2]: See `(*NodeLiveness).Start` for where that happens.
livenessRecord := livenesspb.Liveness{NodeID: FirstNodeID, Epoch: 0}
livenessRecord := livenesspb.Liveness{NodeID: kvstorage.FirstNodeID, Epoch: 0}
if err := livenessVal.SetProto(&livenessRecord); err != nil {
return err
}
initialValues = append(initialValues,
roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal},
roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal},
roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal},
roachpb.KeyValue{Key: keys.NodeLivenessKey(FirstNodeID), Value: livenessVal})
roachpb.KeyValue{Key: keys.NodeLivenessKey(kvstorage.FirstNodeID), Value: livenessVal})

// firstRangeMS is going to accumulate the stats for the first range, as we
// write the meta records for all the other ranges.
Expand Down Expand Up @@ -182,8 +134,8 @@ func WriteInitialClusterData(
const firstReplicaID = 1
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: FirstNodeID,
StoreID: FirstStoreID,
NodeID: kvstorage.FirstNodeID,
StoreID: kvstorage.FirstStoreID,
ReplicaID: firstReplicaID,
},
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func createTestStoreWithoutStart(
cv = clusterversion.ClusterVersion{Version: opts.bootstrapVersion}
}
require.NoError(t, WriteClusterVersion(ctx, eng, cv))
if err := InitEngine(
if err := kvstorage.InitEngine(
ctx, eng, storeIdent,
); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestInitializeEngineErrors(t *testing.T) {
stopper.AddCloser(eng)

// Bootstrap should fail if engine has no cluster version yet.
err := InitEngine(ctx, eng, testIdent)
err := kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "no cluster version")

require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
Expand All @@ -507,7 +507,7 @@ func TestInitializeEngineErrors(t *testing.T) {
require.ErrorIs(t, err, &kvstorage.NotBootstrappedError{})

// Bootstrap should fail on non-empty engine.
err = InitEngine(ctx, eng, testIdent)
err = kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "cannot be bootstrapped")

// Bootstrap should fail on MVCC range key in engine.
Expand All @@ -517,7 +517,7 @@ func TestInitializeEngineErrors(t *testing.T) {
Timestamp: hlc.MinTimestamp,
}, storage.MVCCValue{}))

err = InitEngine(ctx, eng, testIdent)
err = kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "found mvcc range key")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ go_test(
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/kvstorage",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (s *initServer) initializeFirstStoreAfterJoin(
if err != nil {
return nil, err
}
if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil {
if err := kvstorage.InitEngine(ctx, firstEngine, sIdent); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/server/initial_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand All @@ -30,7 +30,7 @@ import (
func (s *Server) RunInitialSQL(
ctx context.Context, startSingleNode bool, adminUser, adminPassword string,
) error {
newCluster := s.InitialStart() && s.NodeID() == kvserver.FirstNodeID
newCluster := s.InitialStart() && s.NodeID() == kvstorage.FirstNodeID
if !newCluster {
// The initial SQL code only runs the first time the cluster is initialized.
return nil
Expand Down
Loading