From 9a78c72a50f607f08c9870a41ebf881f28894a60 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 14 Apr 2020 15:34:41 +0200 Subject: [PATCH] server: rework cluster version initialization A central invariant around cluster versions is that when an update arrives, we need to persist it before exposing it through the version setting. This was not true during server start time, as described in: https://github.com/cockroachdb/cockroach/pull/47235#issuecomment-611419920 In short, we were registering the callback to persist to the engines *after* Gossip had already connected, opening up a window during which a cluster version bump simply would not be persisted. In the acceptance/version-upgrade test, this would manifest as nodes refusing to start because their binary version had proceeded to far beyond the persisted version. At the time of writing and before this commit, this would happen in perhaps 1-10% of local runs on a linux machine (rarely on OSX). The buggy code was originally written when the startup sequence was a lot more intricate and we were, as I recall, under pressure to deliver. Now, with recent refactors around the `initServer`, we're in a really good place to solve this while also simplifying the whole story. In this commit, - persist the cluster version on *all* engines, not just the initialized ones; this completely decouples the timing of when the engines get initialized from when we can set up the persistence callback and makes everything *much simpler*. - stop opportunistically backfilling the cluster version. It is now done once at the beginning, in the right place, without FUD later on. - remove the cluster version persistence from engine initialization. Anyone initializing an engine must put a cluster version on it first. In a running server, this happens during initServer creation time, way before there are any moving parts in the system. In tests, extra writes were added as needed. - set up the callback with Gossip before starting Gossip, and make sure (via an assertion) that this property does not rot. By setting up the callback before Gossip starts, we make sure there isn't a period during which Gossip receives an update but doesn't have the callback yet. - as a result of all of the above, take all knowledge of cluster version init away from `*Node` and `*Stores`. As a last note, we are planning to stop using Gossip for this version business in 20.1, though this too will be facilitated by this change. Release note (bug fix): Avoid a condition during rapid version upgrades where a node would refuse to start, claiming "[a store is] too old for running version". Before this bug fix, the workaround is to decommission the node, delete the store directory, and re-add it to the cluster as a new node. --- pkg/clusterversion/setting.go | 21 ++- pkg/cmd/roachtest/versionupgrade.go | 19 ++- pkg/gossip/gossip.go | 11 ++ pkg/kv/kvserver/client_test.go | 14 +- pkg/kv/kvserver/replica_test.go | 4 +- pkg/kv/kvserver/store.go | 39 ++++-- pkg/kv/kvserver/store_bootstrap.go | 21 +-- pkg/kv/kvserver/store_test.go | 31 +++-- pkg/kv/kvserver/stores.go | 129 +++--------------- pkg/kv/kvserver/stores_test.go | 120 ++++++++-------- pkg/server/init.go | 29 +++- pkg/server/node.go | 80 ++++------- pkg/server/node_test.go | 7 +- pkg/server/server.go | 80 ++++++++++- .../localtestcluster/local_test_cluster.go | 7 +- pkg/testutils/serverutils/test_server_shim.go | 2 +- 16 files changed, 318 insertions(+), 296 deletions(-) diff --git a/pkg/clusterversion/setting.go b/pkg/clusterversion/setting.go index db914510fb6a..25ea7bdfd69b 100644 --- a/pkg/clusterversion/setting.go +++ b/pkg/clusterversion/setting.go @@ -87,15 +87,24 @@ func (cv *clusterVersionSetting) initialize( ctx context.Context, version roachpb.Version, sv *settings.Values, ) error { if ver := cv.activeVersionOrEmpty(ctx, sv); ver != (ClusterVersion{}) { - // Allow initializing a second time as long as it's setting the version to - // what it was already set. This is useful in tests that use - // MakeTestingClusterSettings() which initializes the version, and the - // start a server which again initializes it. + // Allow initializing a second time as long as it's not regressing. + // + // This is useful in tests that use MakeTestingClusterSettings() which + // initializes the version, and the start a server which again + // initializes it once more. + // + // It's also used in production code during bootstrap, where the version + // is first initialized to BinaryMinSupportedVersion and then + // re-initialized to BootstrapVersion (=BinaryVersion). + if version.Less(ver.Version) { + return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s", + version, ver) + } if version == ver.Version { + // Don't trigger callbacks, etc, a second time. return nil } - return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s", - version, ver) + // Now version > ver.Version. } if err := cv.validateSupportedVersionInner(ctx, version, sv); err != nil { return err diff --git a/pkg/cmd/roachtest/versionupgrade.go b/pkg/cmd/roachtest/versionupgrade.go index 6c0c51935b66..2f9db4993453 100644 --- a/pkg/cmd/roachtest/versionupgrade.go +++ b/pkg/cmd/roachtest/versionupgrade.go @@ -72,10 +72,7 @@ DROP TABLE test.t; } func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVersion string) { - // This is ugly, but we can't pass `--encrypt=false` to old versions of - // Cockroach. - // - // TODO(tbg): revisit as old versions are aged out of this test. + // This test uses fixtures and we do not have encrypted fixtures right now. c.encryptDefault = false // Set the bool within to true to create a new fixture for this test. This @@ -104,6 +101,18 @@ func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVers waitForUpgradeStep(c.All()), testFeaturesStep, + // Work around a bug in <= 20.1 that exists at the time of writing. The + // bug would sometimes cause the cluster version (predecessorVersion) + // to not be persisted on the engines. In that case, moving to the next + // version would fail. For details, see: + // + // https://github.com/cockroachdb/cockroach/pull/47358 + // + // TODO(tbg): remove this when we stop running this test against 20.1 + // or any earlier release. Or, make sure the <=20.1 fixtures all have + // the bumped cluster version on all stores. + binaryUpgradeStep(c.All(), predecessorVersion), + // NB: at this point, cluster and binary version equal predecessorVersion, // and auto-upgrades are on. @@ -350,7 +359,7 @@ func waitForUpgradeStep(nodes nodeListOption) versionStep { } } - t.l.Printf("%s: nodes %v are upgraded\n", newVersion) + t.l.Printf("%s: nodes %v are upgraded\n", newVersion, nodes) // TODO(nvanbenschoten): add upgrade qualification step. } diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index 56101655fdd7..cd054c9e5098 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -218,6 +218,8 @@ type Storage interface { // During bootstrapping, the bootstrap list contains candidates for // entry to the gossip network. type Gossip struct { + started bool // for assertions + *server // Embedded gossip RPC server Connected chan struct{} // Closed upon initial connection @@ -384,6 +386,13 @@ func NewTestWithLocality( return gossip } +// AssertNotStarted fatals if the Gossip instance was already started. +func (g *Gossip) AssertNotStarted(ctx context.Context) { + if g.started { + log.Fatalf(ctx, "Gossip instance was already started") + } +} + // GetNodeMetrics returns the gossip node metrics. func (g *Gossip) GetNodeMetrics() *Metrics { return g.server.GetNodeMetrics() @@ -1244,6 +1253,8 @@ func (g *Gossip) MaxHops() uint32 { // This method starts bootstrap loop, gossip server, and client // management in separate goroutines and returns. func (g *Gossip) Start(advertAddr net.Addr, resolvers []resolver.Resolver) { + g.AssertNotStarted(context.Background()) + g.started = true g.setResolvers(resolvers) g.server.start(advertAddr) // serve gossip protocol g.bootstrap() // bootstrap gossip client diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 445e620ea77a..a94f9a37eb53 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -141,9 +141,7 @@ func createTestStoreWithOpts( nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(), storeCfg.DefaultZoneConfig, ) storeCfg.ScanMaxIdleTime = 1 * time.Second - stores := kvserver.NewStores( - ac, storeCfg.Clock, - clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion) + stores := kvserver.NewStores(ac, storeCfg.Clock) if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil { t.Fatal(err) @@ -177,8 +175,9 @@ func createTestStoreWithOpts( // TODO(bdarnell): arrange to have the transport closed. ctx := context.Background() if !opts.dontBootstrap { + require.NoError(t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) if err := kvserver.InitEngine( - ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion, + ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, ); err != nil { t.Fatal(err) } @@ -880,10 +879,11 @@ func (m *multiTestContext) addStore(idx int) { ctx := context.Background() if needBootstrap { + require.NoError(m.t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) if err := kvserver.InitEngine(ctx, eng, roachpb.StoreIdent{ NodeID: roachpb.NodeID(idx + 1), StoreID: roachpb.StoreID(idx + 1), - }, clusterversion.TestingClusterVersion); err != nil { + }); err != nil { m.t.Fatal(err) } } @@ -914,9 +914,7 @@ func (m *multiTestContext) addStore(idx int) { m.t.Fatal(err) } - sender := kvserver.NewStores(ambient, clock, - clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, - ) + sender := kvserver.NewStores(ambient, clock) sender.AddStore(store) perReplicaServer := kvserver.MakeServer(&roachpb.NodeDescriptor{NodeID: nodeID}, sender) kvserver.RegisterPerReplicaServer(grpcServer, perReplicaServer) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index e594a7ca762a..11af3e2ceb86 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -240,12 +240,12 @@ func (tc *testContext) StartWithStoreConfigAndVersion( factory := &testSenderFactory{} cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock) + require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv)) if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{ ClusterID: uuid.MakeV4(), NodeID: 1, StoreID: 1, - }, - cv); err != nil { + }); err != nil { t.Fatal(err) } if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 50079f493ed9..e2531e2047bb 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2019,23 +2019,40 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64, return hlcUpperBound, nil } -func checkEngineEmpty(ctx context.Context, eng storage.Engine) error { +// checkCanInitializeEngine ensures that the engine is empty except for a +// cluster version, which must be present. +func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { kvs, err := storage.Scan(eng, roachpb.KeyMin, roachpb.KeyMax, 10) if err != nil { return err } - if len(kvs) > 0 { - // See if this is an already-bootstrapped store. - ident, err := ReadStoreIdent(ctx, eng) - if err != nil { - return errors.Wrap(err, "unable to read store ident") - } - keyVals := make([]string, len(kvs)) - for i, kv := range kvs { - keyVals[i] = fmt.Sprintf("%s: %q", kv.Key, kv.Value) + // See if this is an already-bootstrapped store. + ident, err := ReadStoreIdent(ctx, eng) + if err == nil { + return errors.Errorf("engine already initialized as %s", ident.String()) + } else if !errors.Is(errors.Cause(err), &NotBootstrappedError{}) { + return errors.Wrap(err, "unable to read store ident") + } + + // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain + // a cluster version and nothing else? + + var sawClusterVersion bool + var keyVals []string + for _, kv := range kvs { + if kv.Key.Key.Equal(keys.StoreClusterVersionKey()) { + sawClusterVersion = true + continue } - return errors.Errorf("engine belongs to store %s, contains %s", ident.String(), keyVals) + keyVals = append(keyVals, fmt.Sprintf("%s: %q", kv.Key, kv.Value)) + } + if len(keyVals) > 0 { + return errors.Errorf("engine cannot be bootstrapped, contains:\n%s", keyVals) } + if !sawClusterVersion { + return errors.New("no cluster version found on uninitialized engine") + } + return nil } diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go index 593685861e0c..273fda51da3b 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_bootstrap.go @@ -13,7 +13,6 @@ package kvserver import ( "context" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -29,14 +28,10 @@ import ( // InitEngine writes a new store ident to the underlying engine. To // ensure that no crufty data already exists in the engine, it scans // the engine contents before writing the new store ident. The engine -// should be completely empty. It returns an error if called on a -// non-empty engine. -func InitEngine( - ctx context.Context, - eng storage.Engine, - ident roachpb.StoreIdent, - cv clusterversion.ClusterVersion, -) error { +// should be completely empty save for a cluster version, which must +// already have been persisted to it. Returns an error if this is not +// the case. +func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { exIdent, err := ReadStoreIdent(ctx, eng) if err == nil { return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String()) @@ -45,8 +40,8 @@ func InitEngine( return err } - if err := checkEngineEmpty(ctx, eng); err != nil { - return errors.Wrap(err, "cannot verify empty engine for bootstrap") + if err := checkCanInitializeEngine(ctx, eng); err != nil { + return errors.Wrap(err, "while trying to initialize store") } batch := eng.NewBatch() @@ -62,10 +57,6 @@ func InitEngine( batch.Close() return err } - if err := WriteClusterVersion(ctx, batch, cv); err != nil { - batch.Close() - return errors.Wrap(err, "cannot write cluster version") - } if err := batch.Commit(true /* sync */); err != nil { return errors.Wrap(err, "persisting bootstrap data") } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index a8a5e81b4ce1..518c52792ac4 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -231,8 +231,10 @@ func createTestStoreWithoutStart( cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock) store := NewStore(context.TODO(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) factory.setStore(store) + + require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion)) if err := InitEngine( - context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion, + context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, ); err != nil { t.Fatal(err) } @@ -436,8 +438,9 @@ func TestStoreInitAndBootstrap(t *testing.T) { t.Error("expected failure starting un-bootstrapped store") } + require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion)) // Bootstrap with a fake ident. - if err := InitEngine(ctx, eng, testIdent, clusterversion.TestingClusterVersion); err != nil { + if err := InitEngine(ctx, eng, testIdent); err != nil { t.Fatalf("error bootstrapping store: %+v", err) } @@ -490,9 +493,9 @@ func TestStoreInitAndBootstrap(t *testing.T) { } } -// TestBootstrapOfNonEmptyStore verifies bootstrap failure if engine +// TestInitializeEngineErrors verifies bootstrap failure if engine // is not empty. -func TestBootstrapOfNonEmptyStore(t *testing.T) { +func TestInitializeEngineErrors(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() ctx := context.TODO() @@ -500,10 +503,16 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) { eng := storage.NewDefaultInMem() stopper.AddCloser(eng) - // Put some random garbage into the engine. - if err := eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")); err != nil { - t.Errorf("failure putting key foo into engine: %+v", err) + // Bootstrap should fail if engine has no cluster version yet. + if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `no cluster version`) { + t.Fatalf("unexpected error: %v", err) } + + require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) + + // Put some random garbage into the engine. + require.NoError(t, eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar"))) + cfg := TestStoreConfig(nil) cfg.Transport = NewDummyRaftTransport(cfg.Settings) store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) @@ -516,12 +525,8 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) { } // Bootstrap should fail on non-empty engine. - switch err := errors.Cause(InitEngine( - ctx, eng, testIdent, clusterversion.TestingClusterVersion, - )); err.(type) { - case *NotBootstrappedError: - default: - t.Errorf("unexpected error bootstrapping non-empty store: %+v", err) + if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `cannot be bootstrapped`) { + t.Fatalf("unexpected error: %v", err) } } diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 1d99159ed873..57548a953b49 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -39,11 +39,6 @@ type Stores struct { log.AmbientContext clock *hlc.Clock storeMap syncutil.IntMap // map[roachpb.StoreID]*Store - // These two versions are usually - // clusterversion.binary{,MinimumSupported}Version, respectively. They are - // changed in some tests. - binaryVersion roachpb.Version - binaryMinSupportedVersion roachpb.Version mu struct { syncutil.Mutex @@ -57,16 +52,10 @@ var _ gossip.Storage = &Stores{} // Stores implements the gossip.Storage interfa // NewStores returns a local-only sender which directly accesses // a collection of stores. -func NewStores( - ambient log.AmbientContext, - clock *hlc.Clock, - binaryVersion, binaryMinSupportedVersion roachpb.Version, -) *Stores { +func NewStores(ambient log.AmbientContext, clock *hlc.Clock) *Stores { return &Stores{ - AmbientContext: ambient, - clock: clock, - binaryVersion: binaryVersion, - binaryMinSupportedVersion: binaryMinSupportedVersion, + AmbientContext: ambient, + clock: clock, } } @@ -288,10 +277,10 @@ func (ls *Stores) updateBootstrapInfoLocked(bi *gossip.BootstrapInfo) error { // ReadVersionFromEngineOrZero reads the persisted cluster version from the // engine, falling back to the zero value. func ReadVersionFromEngineOrZero( - ctx context.Context, e storage.Engine, + ctx context.Context, reader storage.Reader, ) (clusterversion.ClusterVersion, error) { var cv clusterversion.ClusterVersion - cv, err := ReadClusterVersion(ctx, e) + cv, err := ReadClusterVersion(ctx, reader) if err != nil { return clusterversion.ClusterVersion{}, err } @@ -299,7 +288,10 @@ func ReadVersionFromEngineOrZero( } // WriteClusterVersionToEngines writes the given version to the given engines, -// without any sanity checks. +// Returns nil on success; otherwise returns first error encountered writing to +// the stores. +// +// WriteClusterVersion makes no attempt to validate the supplied version. func WriteClusterVersionToEngines( ctx context.Context, engines []storage.Engine, cv clusterversion.ClusterVersion, ) error { @@ -312,15 +304,18 @@ func WriteClusterVersionToEngines( } // SynthesizeClusterVersionFromEngines returns the cluster version that was read -// from the engines or, if there's no bootstrapped engines, returns -// binaryMinSupportedVersion. +// from the engines or, if none are initialized, binaryMinSupportedVersion. +// Typically all initialized engines will have the same version persisted, +// though ill-timed crashes can result in situations where this is not the +// case. Then, the largest version seen is returned. // -// Args: -// binaryMinSupportedVersion: The minimum version supported by this binary. An error -// is returned if any engine has a version lower that this. This version is -// written to the engines if no store has a version in it. -// binaryVersion: The version of this binary. An error is returned if -// any engine has a higher version. +// binaryVersion is the version of this binary. An error is returned if +// any engine has a higher version, as this would indicate that this node +// has previously acked the higher cluster version but is now running an +// old binary, which is unsafe. +// +// binaryMinSupportedVersion is the minimum version supported by this binary. An +// error is returned if any engine has a version lower that this. func SynthesizeClusterVersionFromEngines( ctx context.Context, engines []storage.Engine, @@ -344,6 +339,7 @@ func SynthesizeClusterVersionFromEngines( // constraints, which at the latest the second loop will achieve (because // then minStoreVersion don't change any more). for _, eng := range engines { + eng := eng.(storage.Reader) // we're read only var cv clusterversion.ClusterVersion cv, err := ReadVersionFromEngineOrZero(ctx, eng) if err != nil { @@ -399,52 +395,9 @@ func SynthesizeClusterVersionFromEngines( "is too old for running version v%s (which requires data from v%s or later)", minStoreVersion.origin, minStoreVersion.Version, binaryVersion, binaryMinSupportedVersion) } - - // Write the "actual" version back to all stores. This is almost always a - // no-op, but will backfill the information for 1.0.x clusters, and also - // smoothens out inconsistent state that can crop up during an ill-timed - // crash or when new stores are being added. - return cv, WriteClusterVersionToEngines(ctx, engines, cv) -} - -// SynthesizeClusterVersion reads and returns the ClusterVersion protobuf -// (written to any of the configured stores (all of which are bootstrapped)). -// The returned value is also replicated to all stores for consistency, in case -// a new store was added or an old store re-configured. In case of non-identical -// versions across the stores, returns a version that carries the smallest -// Version. -// -// If there aren't any stores, returns the minimum supported version of the binary. -func (ls *Stores) SynthesizeClusterVersion( - ctx context.Context, -) (clusterversion.ClusterVersion, error) { - var engines []storage.Engine - ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool { - engines = append(engines, (*Store)(v).engine) - return true // want more - }) - cv, err := SynthesizeClusterVersionFromEngines(ctx, engines, ls.binaryVersion, ls.binaryMinSupportedVersion) - if err != nil { - return clusterversion.ClusterVersion{}, err - } return cv, nil } -// WriteClusterVersion persists the supplied ClusterVersion to every -// configured store. Returns nil on success; otherwise returns first -// error encountered writing to the stores. -// -// WriteClusterVersion makes no attempt to validate the supplied version. -func (ls *Stores) WriteClusterVersion(ctx context.Context, cv clusterversion.ClusterVersion) error { - // Update all stores. - engines := ls.engines() - ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool { - engines = append(engines, (*Store)(v).Engine()) - return true // want more - }) - return WriteClusterVersionToEngines(ctx, engines, cv) -} - func (ls *Stores) engines() []storage.Engine { var engines []storage.Engine ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool { @@ -453,43 +406,3 @@ func (ls *Stores) engines() []storage.Engine { }) return engines } - -// OnClusterVersionChange is invoked when the running node receives a notification -// indicating that the cluster version has changed. It checks the currently persisted -// version and updates if it is older than the provided update. -func (ls *Stores) OnClusterVersionChange( - ctx context.Context, cv clusterversion.ClusterVersion, -) error { - // Grab a lock to make sure that there aren't two interleaved invocations of - // this method that result in clobbering of an update. - ls.mu.Lock() - defer ls.mu.Unlock() - - // We're going to read the cluster version from any engine - all the engines - // are always kept in sync so it doesn't matter which one we read from. - var someEngine storage.Engine - ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool { - someEngine = (*Store)(v).engine - return false // don't iterate any more - }) - if someEngine == nil { - // If we haven't bootstrapped any engines yet, there's nothing for us to do. - return nil - } - synthCV, err := ReadClusterVersion(ctx, someEngine) - if err != nil { - return errors.Wrap(err, "error reading persisted cluster version") - } - // If the update downgrades the version, ignore it. Must be a - // reordering (this method is called from multiple goroutines via - // `(*Node).onClusterVersionChange)`). Note that we do carry out the upgrade if - // the MinVersion is identical, to backfill the engines that may still need it. - if cv.Version.Less(synthCV.Version) { - return nil - } - if err := ls.WriteClusterVersion(ctx, cv); err != nil { - return errors.Wrap(err, "writing cluster version") - } - - return nil -} diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go index 08c066da72c9..0eea4126b16e 100644 --- a/pkg/kv/kvserver/stores_test.go +++ b/pkg/kv/kvserver/stores_test.go @@ -31,7 +31,7 @@ import ( ) func newStores(ambientCtx log.AmbientContext, clock *hlc.Clock) *Stores { - return NewStores(ambientCtx, clock, clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion) + return NewStores(ambientCtx, clock) } func TestStoresAddStore(t *testing.T) { @@ -332,18 +332,20 @@ func TestStoresGossipStorageReadLatest(t *testing.T) { // TestStoresClusterVersionWriteSynthesize verifies that the cluster version is // written to all stores and that missing versions are filled in appropriately. -func TestStoresClusterVersionWriteSynthesize(t *testing.T) { +func TestClusterVersionWriteSynthesize(t *testing.T) { defer leaktest.AfterTest(t)() _, stores, _, stopper := createStores(3, t) ctx := context.Background() defer stopper.Stop(ctx) v1_0 := roachpb.Version{Major: 1} + // Hard-code binaryVersion of 1.1 for this test. + // Hard-code binaryMinSupportedVersion of 1.0 for this test. + binV := roachpb.Version{Major: 1, Minor: 1} + minV := v1_0 + makeStores := func() *Stores { - // Hard-code binaryVersion of 1.1 for this test. - // Hard-code binaryMinSupportedVersion of 1.0 for this test. - ls := NewStores(log.AmbientContext{}, stores[0].Clock(), - roachpb.Version{Major: 1, Minor: 1}, v1_0) + ls := NewStores(log.AmbientContext{}, stores[0].Clock()) return ls } @@ -351,7 +353,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { // If there are no stores, default to binaryMinSupportedVersion // (v1_0 in this test) - if initialCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil { + if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil { t.Fatal(err) } else { expCV := clusterversion.ClusterVersion{ @@ -370,20 +372,14 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { // Verify that the initial read of an empty store synthesizes v1.0-0. This // is the code path that runs after starting the 1.1 binary for the first // time after the rolling upgrade from 1.0. - // - // Do it twice because after the first iteration, we have written these - // values to storage, so the second time around the synthesis does not - // happen. - for i := 0; i < 2; i++ { - if initialCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil { - t.Fatal(err) - } else { - expCV := clusterversion.ClusterVersion{ - Version: v1_0, - } - if !reflect.DeepEqual(initialCV, expCV) { - t.Fatalf("expected %+v; got %+v", expCV, initialCV) - } + if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil { + t.Fatal(err) + } else { + expCV := clusterversion.ClusterVersion{ + Version: v1_0, + } + if !reflect.DeepEqual(initialCV, expCV) { + t.Fatalf("expected %+v; got %+v", expCV, initialCV) } } @@ -393,12 +389,12 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { cv := clusterversion.ClusterVersion{ Version: versionB, } - if err := ls0.WriteClusterVersion(ctx, cv); err != nil { + if err := WriteClusterVersionToEngines(ctx, ls0.engines(), cv); err != nil { t.Fatal(err) } // Verify the same thing comes back on read. - if newCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil { + if newCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil { t.Fatal(err) } else { expCV := cv @@ -418,29 +414,17 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { expCV := clusterversion.ClusterVersion{ Version: v1_0, } - if cv, err := ls01.SynthesizeClusterVersion(ctx); err != nil { + if cv, err := SynthesizeClusterVersionFromEngines(ctx, ls01.engines(), binV, minV); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(cv, expCV) { t.Fatalf("expected %+v, got %+v", expCV, cv) } - // Verify the same thing comes back on read of either of the stores (i.e., - // we wrote the updated version to both). - for _, store := range stores[:2] { - ls := makeStores() - ls.AddStore(store) - if cv, err := ls.SynthesizeClusterVersion(ctx); err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(cv, expCV) { - t.Fatalf("expected %+v; got %+v", expCV, cv) - } - } - // Write an updated Version to both stores. cv := clusterversion.ClusterVersion{ Version: versionB, } - if err := ls01.WriteClusterVersion(ctx, cv); err != nil { + if err := WriteClusterVersionToEngines(ctx, ls01.engines(), cv); err != nil { t.Fatal(err) } } @@ -453,7 +437,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { { ls3 := makeStores() ls3.AddStore(stores[2]) - if err := ls3.WriteClusterVersion(ctx, cv); err != nil { + if err := WriteClusterVersionToEngines(ctx, ls3.engines(), cv); err != nil { t.Fatal(err) } } @@ -468,7 +452,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) { expCV := clusterversion.ClusterVersion{ Version: versionA, } - if cv, err := ls012.SynthesizeClusterVersion(ctx); err != nil { + if cv, err := SynthesizeClusterVersionFromEngines(ctx, ls012.engines(), binV, minV); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(cv, expCV) { t.Fatalf("expected %+v, got %+v", expCV, cv) @@ -486,46 +470,50 @@ func TestStoresClusterVersionIncompatible(t *testing.T) { vOneDashOne := roachpb.Version{Major: 1, Unstable: 1} vOne := roachpb.Version{Major: 1} - type testFn func(*clusterversion.ClusterVersion, *Stores) string - for name, setter := range map[string]testFn{ - "StoreTooNew": func(cv *clusterversion.ClusterVersion, ls *Stores) string { - // This is what the running node requires from its stores. - ls.binaryMinSupportedVersion = vOne + type testCase struct { + binV, minV roachpb.Version // binary version and min supported version + engV roachpb.Version // version found on engine in test + expErr string + } + for name, tc := range map[string]testCase{ + "StoreTooNew": { // This is what the node is running. - ls.binaryVersion = vOneDashOne + binV: vOneDashOne, + // This is what the running node requires from its stores. + minV: vOne, // Version is way too high for this node. - cv.Version = roachpb.Version{Major: 9} - return `cockroach version v1\.0-1 is incompatible with data in store =; use version v9\.0 or later` + engV: roachpb.Version{Major: 9}, + expErr: `cockroach version v1\.0-1 is incompatible with data in store =; use version v9\.0 or later`, }, - "StoreTooOldVersion": func(cv *clusterversion.ClusterVersion, ls *Stores) string { - // This is what the running node requires from its stores. - ls.binaryMinSupportedVersion = roachpb.Version{Major: 5} + "StoreTooOldVersion": { // This is what the node is running. - ls.binaryVersion = roachpb.Version{Major: 9} + binV: roachpb.Version{Major: 9}, + // This is what the running node requires from its stores. + minV: roachpb.Version{Major: 5}, // Version is way too low. - cv.Version = roachpb.Version{Major: 4} - return `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)` + engV: roachpb.Version{Major: 4}, + expErr: `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`, }, - "StoreTooOldMinVersion": func(cv *clusterversion.ClusterVersion, ls *Stores) string { + "StoreTooOldMinVersion": { // Like the previous test case, but this time cv.MinimumVersion is the culprit. - ls.binaryMinSupportedVersion = roachpb.Version{Major: 5} - ls.binaryVersion = roachpb.Version{Major: 9} - cv.Version = roachpb.Version{Major: 4} - return `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)` + binV: roachpb.Version{Major: 9}, + minV: roachpb.Version{Major: 5}, + engV: roachpb.Version{Major: 4}, + expErr: `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`, }, } { t.Run(name, func(t *testing.T) { - _, stores, ls, stopper := createStores(1, t) - defer stopper.Stop(ctx) - ls.AddStore(stores[0]) + engs := []storage.Engine{storage.NewDefaultInMem()} + defer engs[0].Close() // Configure versions and write. - var cv clusterversion.ClusterVersion - expErr := setter(&cv, ls) - if err := ls.WriteClusterVersion(ctx, cv); err != nil { + cv := clusterversion.ClusterVersion{Version: tc.engV} + if err := WriteClusterVersionToEngines(ctx, engs, cv); err != nil { t.Fatal(err) } - if _, err := ls.SynthesizeClusterVersion(ctx); !testutils.IsError(err, expErr) { - t.Fatalf("unexpected error: %+v", err) + if cv, err := SynthesizeClusterVersionFromEngines( + ctx, engs, tc.binV, tc.minV, + ); !testutils.IsError(err, tc.expErr) { + t.Fatalf("unexpected error: %+v, got version %v", err, cv) } }) } diff --git a/pkg/server/init.go b/pkg/server/init.go index 1a8cf337396c..f3d4b6897ff6 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -17,8 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -150,7 +152,7 @@ type initState struct { // TODO(tbg): give this a KV client and thus initialize at least one store in // all cases. func (s *initServer) ServeAndWait( - ctx context.Context, stopper *stop.Stopper, g *gossip.Gossip, + ctx context.Context, stopper *stop.Stopper, sv *settings.Values, g *gossip.Gossip, ) (*initState, error) { if len(s.inspectState.initializedEngines) != 0 { // If already bootstrapped, return early. @@ -168,7 +170,19 @@ func (s *initServer) ServeAndWait( case <-stopper.ShouldQuiesce(): return nil, stop.ErrUnavailable case state := <-s.bootstrapReqCh: - // Bootstrap() did its job. + // Bootstrap() did its job. At this point, we know that the cluster + // version will be bootstrapVersion (=state.clusterVersion.Version), but + // the version setting does not know yet (it was initialized as + // BinaryMinSupportedVersion because the engines were all + // uninitialized). We *could* just let the server start, and it would + // populate system.settings, which is then gossiped, and then the + // callback would update the version, but we take this shortcut to avoid + // having every freshly bootstrapped cluster spend time at an old + // cluster version. + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { + return nil, err + } + log.Infof(ctx, "**** cluster %s has been created", state.clusterID) return state, nil case <-g.Connected: @@ -244,7 +258,16 @@ func (s *initServer) Bootstrap( func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { cv := clusterversion.ClusterVersion{Version: s.bootstrapVersion} + if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectState.newEngines, cv); err != nil { + return nil, err + } return bootstrapCluster( - ctx, s.inspectState.newEngines, cv, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig, + ctx, s.inspectState.newEngines, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig, ) } + +// DiskClusterVersion returns the cluster version synthesized from disk. This +// is always non-zero since it falls back to the BinaryMinSupportedVersion. +func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { + return s.inspectState.clusterVersion +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 354f893b8a13..e00e841f41bd 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -204,14 +204,32 @@ func GetBootstrapSchema( func bootstrapCluster( ctx context.Context, engines []storage.Engine, - bootstrapVersion clusterversion.ClusterVersion, defaultZoneConfig *zonepb.ZoneConfig, defaultSystemZoneConfig *zonepb.ZoneConfig, ) (*initState, error) { clusterID := uuid.MakeV4() // TODO(andrei): It'd be cool if this method wouldn't do anything to engines // other than the first one, and let regular node startup code deal with them. + var bootstrapVersion clusterversion.ClusterVersion for i, eng := range engines { + cv, err := kvserver.ReadClusterVersion(ctx, eng) + if err != nil { + return nil, errors.Wrapf(err, "reading cluster version of %s", eng) + } + + // bootstrapCluster requires matching cluster versions on all engines. + if cv.Major == 0 { + return nil, errors.Errorf("missing bootstrap version") + } + + if i == 0 { + bootstrapVersion = cv + } + + if bootstrapVersion != cv { + return nil, errors.Wrapf(err, "found cluster versions %s and %s", bootstrapVersion, cv) + } + sIdent := roachpb.StoreIdent{ ClusterID: clusterID, NodeID: FirstNodeID, @@ -220,7 +238,7 @@ func bootstrapCluster( // Initialize the engine backing the store with the store ident and cluster // version. - if err := kvserver.InitEngine(ctx, eng, sIdent, bootstrapVersion); err != nil { + if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { return nil, err } @@ -251,6 +269,7 @@ func bootstrapCluster( clusterID: clusterID, clusterVersion: bootstrapVersion, initializedEngines: engines, + newEngines: nil, }, joined: true, } @@ -276,14 +295,11 @@ func NewNode( eventLogger = sql.MakeEventLogger(execCfg) } n := &Node{ - storeCfg: cfg, - stopper: stopper, - recorder: recorder, - metrics: makeNodeMetrics(reg, cfg.HistogramWindowInterval), - stores: kvserver.NewStores( - cfg.AmbientCtx, cfg.Clock, - cfg.Settings.Version.BinaryVersion(), - cfg.Settings.Version.BinaryMinSupportedVersion()), + storeCfg: cfg, + stopper: stopper, + recorder: recorder, + metrics: makeNodeMetrics(reg, cfg.HistogramWindowInterval), + stores: kvserver.NewStores(cfg.AmbientCtx, cfg.Clock), txnMetrics: txnMetrics, eventLogger: eventLogger, clusterID: clusterID, @@ -314,12 +330,6 @@ func (n *Node) AnnotateCtxWithSpan( return n.storeCfg.AmbientCtx.AnnotateCtxWithSpan(ctx, opName) } -func (n *Node) onClusterVersionChange(ctx context.Context, cv clusterversion.ClusterVersion) { - if err := n.stores.OnClusterVersionChange(ctx, cv); err != nil { - log.Fatal(ctx, errors.Wrapf(err, "updating cluster version to %v", cv)) - } -} - // start starts the node by registering the storage instance for the // RPC service "Node" and initializing stores for each specified // engine. Launches periodic store gossiping in a goroutine. @@ -335,10 +345,6 @@ func (n *Node) start( localityAddress []roachpb.LocalityAddress, nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor), ) error { - if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, &n.storeCfg.Settings.SV); err != nil { - return err - } - // Obtaining the NodeID requires a dance of sorts. If the node has initialized // stores, the NodeID is persisted in each of them. If not, then we'll need to // use the KV store to get a NodeID assigned. @@ -442,13 +448,6 @@ func (n *Node) start( return fmt.Errorf("failed to initialize the gossip interface: %s", err) } - // Read persisted ClusterVersion from each configured store to - // verify there are no stores with data too old or too new for this - // binary. - if _, err := n.stores.SynthesizeClusterVersion(ctx); err != nil { - return err - } - // Bootstrap any uninitialized stores. // // TODO(tbg): address https://github.com/cockroachdb/cockroach/issues/39415. @@ -461,17 +460,6 @@ func (n *Node) start( n.startComputePeriodicMetrics(n.stopper, DefaultMetricsSampleInterval) - // Now that we've created all our stores, install the gossip version update - // handler to write version updates to them. - // It's important that we persist new versions to the engines before the node - // starts using it, otherwise the node might regress the version after a - // crash. - clusterversion.SetBeforeChange(ctx, &n.storeCfg.Settings.SV, n.onClusterVersionChange) - // Invoke the callback manually once so that we persist the updated value that - // gossip might have already received. - clusterVersion := n.storeCfg.Settings.Version.ActiveVersion(ctx) - n.onClusterVersionChange(ctx, clusterVersion) - // Be careful about moving this line above `startStores`; store migrations rely // on the fact that the cluster version has not been updated via Gossip (we // have migrations that want to run only if the server starts with a given @@ -557,20 +545,6 @@ func (n *Node) bootstrapStores( return errors.New("ClusterID missing during store bootstrap of auxiliary store") } - // There's a bit of an awkward dance around cluster versions here. If this node - // is joining an existing cluster for the first time, it doesn't have any engines - // set up yet, and cv below will be the binary's minimum supported version. - // At the same time, the Gossip update which notifies us about the real - // cluster version won't persist it to any engines (because we haven't - // installed the gossip update handler yet and also because none of the - // stores are bootstrapped). So we just accept that we won't use the correct - // version here, but post-bootstrapping will invoke the callback manually, - // which will disseminate the correct version to all engines. - cv, err := n.stores.SynthesizeClusterVersion(ctx) - if err != nil { - return errors.Errorf("error retrieving cluster version for bootstrap: %s", err) - } - { // Bootstrap all waiting stores by allocating a new store id for // each and invoking storage.Bootstrap() to persist it and the cluster @@ -586,7 +560,7 @@ func (n *Node) bootstrapStores( StoreID: firstID, } for _, eng := range emptyEngines { - if err := kvserver.InitEngine(ctx, eng, sIdent, cv); err != nil { + if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { return err } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 308a5ac6e16f..8204d9524b63 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func formatKeys(keys []roachpb.Key) string { @@ -58,8 +59,9 @@ func TestBootstrapCluster(t *testing.T) { ctx := context.Background() e := storage.NewDefaultInMem() defer e.Close() + require.NoError(t, kvserver.WriteClusterVersion(ctx, e, clusterversion.TestingClusterVersion)) if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, clusterversion.TestingClusterVersion, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), + ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), ); err != nil { t.Fatal(err) } @@ -231,8 +233,9 @@ func TestCorruptedClusterID(t *testing.T) { cv := clusterversion.TestingClusterVersion + require.NoError(t, kvserver.WriteClusterVersion(ctx, e, cv)) if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, cv, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), + ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), ); err != nil { t.Fatal(err) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3332394f4e60..13d33f66c4c3 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/blobs/blobspb" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/geo/geos" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1498,6 +1499,83 @@ func (s *Server) Start(ctx context.Context) error { return err } + { + // Set up the callback that persists gossiped version bumps to the + // engines. The invariant we uphold here is that the bump needs to be + // persisted on all engines before it becomes "visible" to the version + // setting. To this end, + // + // a) make sure Gossip is not started yet, and + // b) set up the BeforeChange callback on the version setting to persist + // incoming updates to all engines. + // c) write back the disk-loaded cluster version to all engines, + // d) initialize the version setting (with the disk-loaded version). + // + // Note that "all engines" means "all engines", not "all initialized + // engines". We cannot initialize engines this early in the boot + // sequence. + s.gossip.AssertNotStarted(ctx) + + // Serialize the callback through a mutex to make sure we're not + // clobbering the disk state if callback gets fired off concurrently. + var mu syncutil.Mutex + cb := func(ctx context.Context, newCV clusterversion.ClusterVersion) { + mu.Lock() + defer mu.Unlock() + v := s.cfg.Settings.Version + prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, s.engines, v.BinaryVersion(), v.BinaryMinSupportedVersion(), + ) + if err != nil { + log.Fatal(ctx, err) + } + if !prevCV.Version.Less(newCV.Version) { + // If nothing needs to be updated, don't do anything. The + // callbacks fire async (or at least we want to assume the worst + // case in which they do) and so an old update might happen + // after a new one. + return + } + if err := kvserver.WriteClusterVersionToEngines(ctx, s.engines, newCV); err != nil { + log.Fatal(ctx, err) + } + log.Infof(ctx, "active cluster version is now %s (up from %s)", newCV, prevCV) + } + clusterversion.SetBeforeChange(ctx, &s.cfg.Settings.SV, cb) + + diskClusterVersion := initServer.DiskClusterVersion() + // The version setting loaded from disk is the maximum cluster version + // seen on any engine. If new stores are being added to the server right + // now, or if the process crashed earlier half-way through the callback, + // that version won't be on all engines. For that reason, we backfill + // once. + if err := kvserver.WriteClusterVersionToEngines( + ctx, s.engines, diskClusterVersion, + ); err != nil { + return err + } + + // NB: if we bootstrap a new server (in initServer.ServeAndWait below) + // we will call Initialize a second time, to eagerly move it to the + // bootstrap version (from the min supported version). Initialize() + // tolerates that. Note that in that case we know that the callback + // has not fired yet, since Gossip won't connect (to itself) until + // the server starts and so the callback will never fire prior to + // that second Initialize() call. Note also that at this point in + // the code we don't know if we'll bootstrap or join an existing + // cluster, so we have to conservatively go with the version from + // disk, which in the case of no initialized engines is the binary + // min supported version. + if err := clusterversion.Initialize(ctx, diskClusterVersion.Version, &s.cfg.Settings.SV); err != nil { + return err + } + + // At this point, we've established the invariant: all engines hold the + // version currently visible to the setting. And we have the callback in + // place that will persist an incoming updated version on all engines + // before making it visible to the setting. + } + serverpb.RegisterInitServer(s.grpc.Server, initServer) s.node.startAssertEngineHealth(ctx, s.engines) @@ -1678,7 +1756,7 @@ func (s *Server) Start(ctx context.Context) error { // This opens the main listener. startRPCServer(workersCtx) - state, err := initServer.ServeAndWait(ctx, s.stopper, s.gossip) + state, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, s.gossip) if err != nil { return errors.Wrap(err, "during init") } diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 38e935516f64..2f8479b2a2ab 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -122,7 +122,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto storage.DefaultStorageEngine, roachpb.Attributes{}, 50<<20) ltc.Stopper.AddCloser(ltc.Eng) - ltc.Stores = kvserver.NewStores(ambient, ltc.Clock, clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion) + ltc.Stores = kvserver.NewStores(ambient, ltc.Clock) factory := initFactory(cfg.Settings, nodeDesc, ambient.Tracer, ltc.Clock, ltc.Latency, ltc.Stores, ltc.Stopper, ltc.Gossip) if ltc.DBContext == nil { @@ -170,8 +170,11 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto cfg.TimestampCachePageSize = tscache.TestSklPageSize ctx := context.TODO() + if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil { + t.Fatalf("unable to write cluster version: %s", err) + } if err := kvserver.InitEngine( - ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1}, clusterversion.TestingClusterVersion, + ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1}, ); err != nil { t.Fatalf("unable to start local test cluster: %s", err) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 4fabed433a0e..bfa9fdea8f16 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -219,7 +219,7 @@ func StartServer( ) (TestServerInterface, *gosql.DB, *kv.DB) { server, err := StartServerRaw(params) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } pgURL, cleanupGoDB := sqlutils.PGUrl(