diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 5510fcfa2536..19eb12a42f8d 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -69,6 +69,6 @@
trace.debug.enable | boolean | false | if set, traces for recent requests can be seen in the /debug page |
trace.lightstep.token | string |
| if set, traces go to Lightstep using this token |
trace.zipkin.collector | string |
| if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set |
-version | custom validation | 20.1 | set the active cluster version in the format '.' |
+version | custom validation | 20.1-1 | set the active cluster version in the format '.' |
diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go
index be12e07d3018..ba9600d67b1c 100644
--- a/pkg/clusterversion/cockroach_versions.go
+++ b/pkg/clusterversion/cockroach_versions.go
@@ -60,6 +60,7 @@ const (
VersionTimeTZType
VersionTimePrecision
Version20_1
+ VersionStart20_2
// Add new versions here (step one of two).
)
@@ -462,6 +463,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: Version20_1,
Version: roachpb.Version{Major: 20, Minor: 1},
},
+ {
+ // VersionStart20_2 demarcates work towards CockroachDB v20.2.
+ Key: VersionStart20_2,
+ Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 1},
+ },
// Add new versions here (step two of two).
diff --git a/pkg/clusterversion/setting.go b/pkg/clusterversion/setting.go
index db914510fb6a..25ea7bdfd69b 100644
--- a/pkg/clusterversion/setting.go
+++ b/pkg/clusterversion/setting.go
@@ -87,15 +87,24 @@ func (cv *clusterVersionSetting) initialize(
ctx context.Context, version roachpb.Version, sv *settings.Values,
) error {
if ver := cv.activeVersionOrEmpty(ctx, sv); ver != (ClusterVersion{}) {
- // Allow initializing a second time as long as it's setting the version to
- // what it was already set. This is useful in tests that use
- // MakeTestingClusterSettings() which initializes the version, and the
- // start a server which again initializes it.
+ // Allow initializing a second time as long as it's not regressing.
+ //
+ // This is useful in tests that use MakeTestingClusterSettings() which
+ // initializes the version, and the start a server which again
+ // initializes it once more.
+ //
+ // It's also used in production code during bootstrap, where the version
+ // is first initialized to BinaryMinSupportedVersion and then
+ // re-initialized to BootstrapVersion (=BinaryVersion).
+ if version.Less(ver.Version) {
+ return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s",
+ version, ver)
+ }
if version == ver.Version {
+ // Don't trigger callbacks, etc, a second time.
return nil
}
- return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s",
- version, ver)
+ // Now version > ver.Version.
}
if err := cv.validateSupportedVersionInner(ctx, version, sv); err != nil {
return err
diff --git a/pkg/clusterversion/versionkey_string.go b/pkg/clusterversion/versionkey_string.go
index fd4437e32efd..dfe3a3c217d1 100644
--- a/pkg/clusterversion/versionkey_string.go
+++ b/pkg/clusterversion/versionkey_string.go
@@ -36,11 +36,12 @@ func _() {
_ = x[VersionTimeTZType-25]
_ = x[VersionTimePrecision-26]
_ = x[Version20_1-27]
+ _ = x[VersionStart20_2-28]
}
-const _VersionKey_name = "Version19_1VersionStart19_2VersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCCVersionPartitionedBackupVersion19_2VersionStart20_1VersionContainsEstimatesCounterVersionChangeReplicasDemotionVersionSecondaryIndexColumnFamiliesVersionNamespaceTableWithSchemasVersionProtectedTimestampsVersionPrimaryKeyChangesVersionAuthLocalAndTrustRejectMethodsVersionPrimaryKeyColumnsOutOfFamilyZeroVersionRootPasswordVersionNoExplicitForeignKeyIndexIDsVersionHashShardedIndexesVersionCreateRolePrivilegeVersionStatementDiagnosticsSystemTablesVersionSchemaChangeJobVersionSavepointsVersionTimeTZTypeVersionTimePrecisionVersion20_1"
+const _VersionKey_name = "Version19_1VersionStart19_2VersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCCVersionPartitionedBackupVersion19_2VersionStart20_1VersionContainsEstimatesCounterVersionChangeReplicasDemotionVersionSecondaryIndexColumnFamiliesVersionNamespaceTableWithSchemasVersionProtectedTimestampsVersionPrimaryKeyChangesVersionAuthLocalAndTrustRejectMethodsVersionPrimaryKeyColumnsOutOfFamilyZeroVersionRootPasswordVersionNoExplicitForeignKeyIndexIDsVersionHashShardedIndexesVersionCreateRolePrivilegeVersionStatementDiagnosticsSystemTablesVersionSchemaChangeJobVersionSavepointsVersionTimeTZTypeVersionTimePrecisionVersion20_1VersionStart20_2"
-var _VersionKey_index = [...]uint16{0, 11, 27, 49, 75, 109, 136, 176, 200, 211, 227, 258, 287, 322, 354, 380, 404, 441, 480, 499, 534, 559, 585, 624, 646, 663, 680, 700, 711}
+var _VersionKey_index = [...]uint16{0, 11, 27, 49, 75, 109, 136, 176, 200, 211, 227, 258, 287, 322, 354, 380, 404, 441, 480, 499, 534, 559, 585, 624, 646, 663, 680, 700, 711, 727}
func (i VersionKey) String() string {
if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) {
diff --git a/pkg/cmd/roachtest/versionupgrade.go b/pkg/cmd/roachtest/versionupgrade.go
index 6c0c51935b66..2f9db4993453 100644
--- a/pkg/cmd/roachtest/versionupgrade.go
+++ b/pkg/cmd/roachtest/versionupgrade.go
@@ -72,10 +72,7 @@ DROP TABLE test.t;
}
func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVersion string) {
- // This is ugly, but we can't pass `--encrypt=false` to old versions of
- // Cockroach.
- //
- // TODO(tbg): revisit as old versions are aged out of this test.
+ // This test uses fixtures and we do not have encrypted fixtures right now.
c.encryptDefault = false
// Set the bool within to true to create a new fixture for this test. This
@@ -104,6 +101,18 @@ func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVers
waitForUpgradeStep(c.All()),
testFeaturesStep,
+ // Work around a bug in <= 20.1 that exists at the time of writing. The
+ // bug would sometimes cause the cluster version (predecessorVersion)
+ // to not be persisted on the engines. In that case, moving to the next
+ // version would fail. For details, see:
+ //
+ // https://github.com/cockroachdb/cockroach/pull/47358
+ //
+ // TODO(tbg): remove this when we stop running this test against 20.1
+ // or any earlier release. Or, make sure the <=20.1 fixtures all have
+ // the bumped cluster version on all stores.
+ binaryUpgradeStep(c.All(), predecessorVersion),
+
// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.
@@ -350,7 +359,7 @@ func waitForUpgradeStep(nodes nodeListOption) versionStep {
}
}
- t.l.Printf("%s: nodes %v are upgraded\n", newVersion)
+ t.l.Printf("%s: nodes %v are upgraded\n", newVersion, nodes)
// TODO(nvanbenschoten): add upgrade qualification step.
}
diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go
index 56101655fdd7..cd054c9e5098 100644
--- a/pkg/gossip/gossip.go
+++ b/pkg/gossip/gossip.go
@@ -218,6 +218,8 @@ type Storage interface {
// During bootstrapping, the bootstrap list contains candidates for
// entry to the gossip network.
type Gossip struct {
+ started bool // for assertions
+
*server // Embedded gossip RPC server
Connected chan struct{} // Closed upon initial connection
@@ -384,6 +386,13 @@ func NewTestWithLocality(
return gossip
}
+// AssertNotStarted fatals if the Gossip instance was already started.
+func (g *Gossip) AssertNotStarted(ctx context.Context) {
+ if g.started {
+ log.Fatalf(ctx, "Gossip instance was already started")
+ }
+}
+
// GetNodeMetrics returns the gossip node metrics.
func (g *Gossip) GetNodeMetrics() *Metrics {
return g.server.GetNodeMetrics()
@@ -1244,6 +1253,8 @@ func (g *Gossip) MaxHops() uint32 {
// This method starts bootstrap loop, gossip server, and client
// management in separate goroutines and returns.
func (g *Gossip) Start(advertAddr net.Addr, resolvers []resolver.Resolver) {
+ g.AssertNotStarted(context.Background())
+ g.started = true
g.setResolvers(resolvers)
g.server.start(advertAddr) // serve gossip protocol
g.bootstrap() // bootstrap gossip client
diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go
index 445e620ea77a..a94f9a37eb53 100644
--- a/pkg/kv/kvserver/client_test.go
+++ b/pkg/kv/kvserver/client_test.go
@@ -141,9 +141,7 @@ func createTestStoreWithOpts(
nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(), storeCfg.DefaultZoneConfig,
)
storeCfg.ScanMaxIdleTime = 1 * time.Second
- stores := kvserver.NewStores(
- ac, storeCfg.Clock,
- clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion)
+ stores := kvserver.NewStores(ac, storeCfg.Clock)
if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil {
t.Fatal(err)
@@ -177,8 +175,9 @@ func createTestStoreWithOpts(
// TODO(bdarnell): arrange to have the transport closed.
ctx := context.Background()
if !opts.dontBootstrap {
+ require.NoError(t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
if err := kvserver.InitEngine(
- ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion,
+ ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
); err != nil {
t.Fatal(err)
}
@@ -880,10 +879,11 @@ func (m *multiTestContext) addStore(idx int) {
ctx := context.Background()
if needBootstrap {
+ require.NoError(m.t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
if err := kvserver.InitEngine(ctx, eng, roachpb.StoreIdent{
NodeID: roachpb.NodeID(idx + 1),
StoreID: roachpb.StoreID(idx + 1),
- }, clusterversion.TestingClusterVersion); err != nil {
+ }); err != nil {
m.t.Fatal(err)
}
}
@@ -914,9 +914,7 @@ func (m *multiTestContext) addStore(idx int) {
m.t.Fatal(err)
}
- sender := kvserver.NewStores(ambient, clock,
- clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion,
- )
+ sender := kvserver.NewStores(ambient, clock)
sender.AddStore(store)
perReplicaServer := kvserver.MakeServer(&roachpb.NodeDescriptor{NodeID: nodeID}, sender)
kvserver.RegisterPerReplicaServer(grpcServer, perReplicaServer)
diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go
index e594a7ca762a..11af3e2ceb86 100644
--- a/pkg/kv/kvserver/replica_test.go
+++ b/pkg/kv/kvserver/replica_test.go
@@ -240,12 +240,12 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock)
+ require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv))
if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{
ClusterID: uuid.MakeV4(),
NodeID: 1,
StoreID: 1,
- },
- cv); err != nil {
+ }); err != nil {
t.Fatal(err)
}
if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil {
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index 50079f493ed9..e2531e2047bb 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -2019,23 +2019,40 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64,
return hlcUpperBound, nil
}
-func checkEngineEmpty(ctx context.Context, eng storage.Engine) error {
+// checkCanInitializeEngine ensures that the engine is empty except for a
+// cluster version, which must be present.
+func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
kvs, err := storage.Scan(eng, roachpb.KeyMin, roachpb.KeyMax, 10)
if err != nil {
return err
}
- if len(kvs) > 0 {
- // See if this is an already-bootstrapped store.
- ident, err := ReadStoreIdent(ctx, eng)
- if err != nil {
- return errors.Wrap(err, "unable to read store ident")
- }
- keyVals := make([]string, len(kvs))
- for i, kv := range kvs {
- keyVals[i] = fmt.Sprintf("%s: %q", kv.Key, kv.Value)
+ // See if this is an already-bootstrapped store.
+ ident, err := ReadStoreIdent(ctx, eng)
+ if err == nil {
+ return errors.Errorf("engine already initialized as %s", ident.String())
+ } else if !errors.Is(errors.Cause(err), &NotBootstrappedError{}) {
+ return errors.Wrap(err, "unable to read store ident")
+ }
+
+ // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain
+ // a cluster version and nothing else?
+
+ var sawClusterVersion bool
+ var keyVals []string
+ for _, kv := range kvs {
+ if kv.Key.Key.Equal(keys.StoreClusterVersionKey()) {
+ sawClusterVersion = true
+ continue
}
- return errors.Errorf("engine belongs to store %s, contains %s", ident.String(), keyVals)
+ keyVals = append(keyVals, fmt.Sprintf("%s: %q", kv.Key, kv.Value))
+ }
+ if len(keyVals) > 0 {
+ return errors.Errorf("engine cannot be bootstrapped, contains:\n%s", keyVals)
}
+ if !sawClusterVersion {
+ return errors.New("no cluster version found on uninitialized engine")
+ }
+
return nil
}
diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go
index 593685861e0c..273fda51da3b 100644
--- a/pkg/kv/kvserver/store_bootstrap.go
+++ b/pkg/kv/kvserver/store_bootstrap.go
@@ -13,7 +13,6 @@ package kvserver
import (
"context"
- "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
@@ -29,14 +28,10 @@ import (
// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
-// should be completely empty. It returns an error if called on a
-// non-empty engine.
-func InitEngine(
- ctx context.Context,
- eng storage.Engine,
- ident roachpb.StoreIdent,
- cv clusterversion.ClusterVersion,
-) error {
+// should be completely empty save for a cluster version, which must
+// already have been persisted to it. Returns an error if this is not
+// the case.
+func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String())
@@ -45,8 +40,8 @@ func InitEngine(
return err
}
- if err := checkEngineEmpty(ctx, eng); err != nil {
- return errors.Wrap(err, "cannot verify empty engine for bootstrap")
+ if err := checkCanInitializeEngine(ctx, eng); err != nil {
+ return errors.Wrap(err, "while trying to initialize store")
}
batch := eng.NewBatch()
@@ -62,10 +57,6 @@ func InitEngine(
batch.Close()
return err
}
- if err := WriteClusterVersion(ctx, batch, cv); err != nil {
- batch.Close()
- return errors.Wrap(err, "cannot write cluster version")
- }
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting bootstrap data")
}
diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go
index a8a5e81b4ce1..518c52792ac4 100644
--- a/pkg/kv/kvserver/store_test.go
+++ b/pkg/kv/kvserver/store_test.go
@@ -231,8 +231,10 @@ func createTestStoreWithoutStart(
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock)
store := NewStore(context.TODO(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
factory.setStore(store)
+
+ require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
if err := InitEngine(
- context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion,
+ context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
); err != nil {
t.Fatal(err)
}
@@ -436,8 +438,9 @@ func TestStoreInitAndBootstrap(t *testing.T) {
t.Error("expected failure starting un-bootstrapped store")
}
+ require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
// Bootstrap with a fake ident.
- if err := InitEngine(ctx, eng, testIdent, clusterversion.TestingClusterVersion); err != nil {
+ if err := InitEngine(ctx, eng, testIdent); err != nil {
t.Fatalf("error bootstrapping store: %+v", err)
}
@@ -490,9 +493,9 @@ func TestStoreInitAndBootstrap(t *testing.T) {
}
}
-// TestBootstrapOfNonEmptyStore verifies bootstrap failure if engine
+// TestInitializeEngineErrors verifies bootstrap failure if engine
// is not empty.
-func TestBootstrapOfNonEmptyStore(t *testing.T) {
+func TestInitializeEngineErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
ctx := context.TODO()
@@ -500,10 +503,16 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) {
eng := storage.NewDefaultInMem()
stopper.AddCloser(eng)
- // Put some random garbage into the engine.
- if err := eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")); err != nil {
- t.Errorf("failure putting key foo into engine: %+v", err)
+ // Bootstrap should fail if engine has no cluster version yet.
+ if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `no cluster version`) {
+ t.Fatalf("unexpected error: %v", err)
}
+
+ require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
+
+ // Put some random garbage into the engine.
+ require.NoError(t, eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")))
+
cfg := TestStoreConfig(nil)
cfg.Transport = NewDummyRaftTransport(cfg.Settings)
store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
@@ -516,12 +525,8 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) {
}
// Bootstrap should fail on non-empty engine.
- switch err := errors.Cause(InitEngine(
- ctx, eng, testIdent, clusterversion.TestingClusterVersion,
- )); err.(type) {
- case *NotBootstrappedError:
- default:
- t.Errorf("unexpected error bootstrapping non-empty store: %+v", err)
+ if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `cannot be bootstrapped`) {
+ t.Fatalf("unexpected error: %v", err)
}
}
diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go
index 1d99159ed873..57548a953b49 100644
--- a/pkg/kv/kvserver/stores.go
+++ b/pkg/kv/kvserver/stores.go
@@ -39,11 +39,6 @@ type Stores struct {
log.AmbientContext
clock *hlc.Clock
storeMap syncutil.IntMap // map[roachpb.StoreID]*Store
- // These two versions are usually
- // clusterversion.binary{,MinimumSupported}Version, respectively. They are
- // changed in some tests.
- binaryVersion roachpb.Version
- binaryMinSupportedVersion roachpb.Version
mu struct {
syncutil.Mutex
@@ -57,16 +52,10 @@ var _ gossip.Storage = &Stores{} // Stores implements the gossip.Storage interfa
// NewStores returns a local-only sender which directly accesses
// a collection of stores.
-func NewStores(
- ambient log.AmbientContext,
- clock *hlc.Clock,
- binaryVersion, binaryMinSupportedVersion roachpb.Version,
-) *Stores {
+func NewStores(ambient log.AmbientContext, clock *hlc.Clock) *Stores {
return &Stores{
- AmbientContext: ambient,
- clock: clock,
- binaryVersion: binaryVersion,
- binaryMinSupportedVersion: binaryMinSupportedVersion,
+ AmbientContext: ambient,
+ clock: clock,
}
}
@@ -288,10 +277,10 @@ func (ls *Stores) updateBootstrapInfoLocked(bi *gossip.BootstrapInfo) error {
// ReadVersionFromEngineOrZero reads the persisted cluster version from the
// engine, falling back to the zero value.
func ReadVersionFromEngineOrZero(
- ctx context.Context, e storage.Engine,
+ ctx context.Context, reader storage.Reader,
) (clusterversion.ClusterVersion, error) {
var cv clusterversion.ClusterVersion
- cv, err := ReadClusterVersion(ctx, e)
+ cv, err := ReadClusterVersion(ctx, reader)
if err != nil {
return clusterversion.ClusterVersion{}, err
}
@@ -299,7 +288,10 @@ func ReadVersionFromEngineOrZero(
}
// WriteClusterVersionToEngines writes the given version to the given engines,
-// without any sanity checks.
+// Returns nil on success; otherwise returns first error encountered writing to
+// the stores.
+//
+// WriteClusterVersion makes no attempt to validate the supplied version.
func WriteClusterVersionToEngines(
ctx context.Context, engines []storage.Engine, cv clusterversion.ClusterVersion,
) error {
@@ -312,15 +304,18 @@ func WriteClusterVersionToEngines(
}
// SynthesizeClusterVersionFromEngines returns the cluster version that was read
-// from the engines or, if there's no bootstrapped engines, returns
-// binaryMinSupportedVersion.
+// from the engines or, if none are initialized, binaryMinSupportedVersion.
+// Typically all initialized engines will have the same version persisted,
+// though ill-timed crashes can result in situations where this is not the
+// case. Then, the largest version seen is returned.
//
-// Args:
-// binaryMinSupportedVersion: The minimum version supported by this binary. An error
-// is returned if any engine has a version lower that this. This version is
-// written to the engines if no store has a version in it.
-// binaryVersion: The version of this binary. An error is returned if
-// any engine has a higher version.
+// binaryVersion is the version of this binary. An error is returned if
+// any engine has a higher version, as this would indicate that this node
+// has previously acked the higher cluster version but is now running an
+// old binary, which is unsafe.
+//
+// binaryMinSupportedVersion is the minimum version supported by this binary. An
+// error is returned if any engine has a version lower that this.
func SynthesizeClusterVersionFromEngines(
ctx context.Context,
engines []storage.Engine,
@@ -344,6 +339,7 @@ func SynthesizeClusterVersionFromEngines(
// constraints, which at the latest the second loop will achieve (because
// then minStoreVersion don't change any more).
for _, eng := range engines {
+ eng := eng.(storage.Reader) // we're read only
var cv clusterversion.ClusterVersion
cv, err := ReadVersionFromEngineOrZero(ctx, eng)
if err != nil {
@@ -399,52 +395,9 @@ func SynthesizeClusterVersionFromEngines(
"is too old for running version v%s (which requires data from v%s or later)",
minStoreVersion.origin, minStoreVersion.Version, binaryVersion, binaryMinSupportedVersion)
}
-
- // Write the "actual" version back to all stores. This is almost always a
- // no-op, but will backfill the information for 1.0.x clusters, and also
- // smoothens out inconsistent state that can crop up during an ill-timed
- // crash or when new stores are being added.
- return cv, WriteClusterVersionToEngines(ctx, engines, cv)
-}
-
-// SynthesizeClusterVersion reads and returns the ClusterVersion protobuf
-// (written to any of the configured stores (all of which are bootstrapped)).
-// The returned value is also replicated to all stores for consistency, in case
-// a new store was added or an old store re-configured. In case of non-identical
-// versions across the stores, returns a version that carries the smallest
-// Version.
-//
-// If there aren't any stores, returns the minimum supported version of the binary.
-func (ls *Stores) SynthesizeClusterVersion(
- ctx context.Context,
-) (clusterversion.ClusterVersion, error) {
- var engines []storage.Engine
- ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
- engines = append(engines, (*Store)(v).engine)
- return true // want more
- })
- cv, err := SynthesizeClusterVersionFromEngines(ctx, engines, ls.binaryVersion, ls.binaryMinSupportedVersion)
- if err != nil {
- return clusterversion.ClusterVersion{}, err
- }
return cv, nil
}
-// WriteClusterVersion persists the supplied ClusterVersion to every
-// configured store. Returns nil on success; otherwise returns first
-// error encountered writing to the stores.
-//
-// WriteClusterVersion makes no attempt to validate the supplied version.
-func (ls *Stores) WriteClusterVersion(ctx context.Context, cv clusterversion.ClusterVersion) error {
- // Update all stores.
- engines := ls.engines()
- ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
- engines = append(engines, (*Store)(v).Engine())
- return true // want more
- })
- return WriteClusterVersionToEngines(ctx, engines, cv)
-}
-
func (ls *Stores) engines() []storage.Engine {
var engines []storage.Engine
ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
@@ -453,43 +406,3 @@ func (ls *Stores) engines() []storage.Engine {
})
return engines
}
-
-// OnClusterVersionChange is invoked when the running node receives a notification
-// indicating that the cluster version has changed. It checks the currently persisted
-// version and updates if it is older than the provided update.
-func (ls *Stores) OnClusterVersionChange(
- ctx context.Context, cv clusterversion.ClusterVersion,
-) error {
- // Grab a lock to make sure that there aren't two interleaved invocations of
- // this method that result in clobbering of an update.
- ls.mu.Lock()
- defer ls.mu.Unlock()
-
- // We're going to read the cluster version from any engine - all the engines
- // are always kept in sync so it doesn't matter which one we read from.
- var someEngine storage.Engine
- ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
- someEngine = (*Store)(v).engine
- return false // don't iterate any more
- })
- if someEngine == nil {
- // If we haven't bootstrapped any engines yet, there's nothing for us to do.
- return nil
- }
- synthCV, err := ReadClusterVersion(ctx, someEngine)
- if err != nil {
- return errors.Wrap(err, "error reading persisted cluster version")
- }
- // If the update downgrades the version, ignore it. Must be a
- // reordering (this method is called from multiple goroutines via
- // `(*Node).onClusterVersionChange)`). Note that we do carry out the upgrade if
- // the MinVersion is identical, to backfill the engines that may still need it.
- if cv.Version.Less(synthCV.Version) {
- return nil
- }
- if err := ls.WriteClusterVersion(ctx, cv); err != nil {
- return errors.Wrap(err, "writing cluster version")
- }
-
- return nil
-}
diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go
index 08c066da72c9..0eea4126b16e 100644
--- a/pkg/kv/kvserver/stores_test.go
+++ b/pkg/kv/kvserver/stores_test.go
@@ -31,7 +31,7 @@ import (
)
func newStores(ambientCtx log.AmbientContext, clock *hlc.Clock) *Stores {
- return NewStores(ambientCtx, clock, clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion)
+ return NewStores(ambientCtx, clock)
}
func TestStoresAddStore(t *testing.T) {
@@ -332,18 +332,20 @@ func TestStoresGossipStorageReadLatest(t *testing.T) {
// TestStoresClusterVersionWriteSynthesize verifies that the cluster version is
// written to all stores and that missing versions are filled in appropriately.
-func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
+func TestClusterVersionWriteSynthesize(t *testing.T) {
defer leaktest.AfterTest(t)()
_, stores, _, stopper := createStores(3, t)
ctx := context.Background()
defer stopper.Stop(ctx)
v1_0 := roachpb.Version{Major: 1}
+ // Hard-code binaryVersion of 1.1 for this test.
+ // Hard-code binaryMinSupportedVersion of 1.0 for this test.
+ binV := roachpb.Version{Major: 1, Minor: 1}
+ minV := v1_0
+
makeStores := func() *Stores {
- // Hard-code binaryVersion of 1.1 for this test.
- // Hard-code binaryMinSupportedVersion of 1.0 for this test.
- ls := NewStores(log.AmbientContext{}, stores[0].Clock(),
- roachpb.Version{Major: 1, Minor: 1}, v1_0)
+ ls := NewStores(log.AmbientContext{}, stores[0].Clock())
return ls
}
@@ -351,7 +353,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
// If there are no stores, default to binaryMinSupportedVersion
// (v1_0 in this test)
- if initialCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil {
+ if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil {
t.Fatal(err)
} else {
expCV := clusterversion.ClusterVersion{
@@ -370,20 +372,14 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
// Verify that the initial read of an empty store synthesizes v1.0-0. This
// is the code path that runs after starting the 1.1 binary for the first
// time after the rolling upgrade from 1.0.
- //
- // Do it twice because after the first iteration, we have written these
- // values to storage, so the second time around the synthesis does not
- // happen.
- for i := 0; i < 2; i++ {
- if initialCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil {
- t.Fatal(err)
- } else {
- expCV := clusterversion.ClusterVersion{
- Version: v1_0,
- }
- if !reflect.DeepEqual(initialCV, expCV) {
- t.Fatalf("expected %+v; got %+v", expCV, initialCV)
- }
+ if initialCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil {
+ t.Fatal(err)
+ } else {
+ expCV := clusterversion.ClusterVersion{
+ Version: v1_0,
+ }
+ if !reflect.DeepEqual(initialCV, expCV) {
+ t.Fatalf("expected %+v; got %+v", expCV, initialCV)
}
}
@@ -393,12 +389,12 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
cv := clusterversion.ClusterVersion{
Version: versionB,
}
- if err := ls0.WriteClusterVersion(ctx, cv); err != nil {
+ if err := WriteClusterVersionToEngines(ctx, ls0.engines(), cv); err != nil {
t.Fatal(err)
}
// Verify the same thing comes back on read.
- if newCV, err := ls0.SynthesizeClusterVersion(ctx); err != nil {
+ if newCV, err := SynthesizeClusterVersionFromEngines(ctx, ls0.engines(), binV, minV); err != nil {
t.Fatal(err)
} else {
expCV := cv
@@ -418,29 +414,17 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
expCV := clusterversion.ClusterVersion{
Version: v1_0,
}
- if cv, err := ls01.SynthesizeClusterVersion(ctx); err != nil {
+ if cv, err := SynthesizeClusterVersionFromEngines(ctx, ls01.engines(), binV, minV); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(cv, expCV) {
t.Fatalf("expected %+v, got %+v", expCV, cv)
}
- // Verify the same thing comes back on read of either of the stores (i.e.,
- // we wrote the updated version to both).
- for _, store := range stores[:2] {
- ls := makeStores()
- ls.AddStore(store)
- if cv, err := ls.SynthesizeClusterVersion(ctx); err != nil {
- t.Fatal(err)
- } else if !reflect.DeepEqual(cv, expCV) {
- t.Fatalf("expected %+v; got %+v", expCV, cv)
- }
- }
-
// Write an updated Version to both stores.
cv := clusterversion.ClusterVersion{
Version: versionB,
}
- if err := ls01.WriteClusterVersion(ctx, cv); err != nil {
+ if err := WriteClusterVersionToEngines(ctx, ls01.engines(), cv); err != nil {
t.Fatal(err)
}
}
@@ -453,7 +437,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
{
ls3 := makeStores()
ls3.AddStore(stores[2])
- if err := ls3.WriteClusterVersion(ctx, cv); err != nil {
+ if err := WriteClusterVersionToEngines(ctx, ls3.engines(), cv); err != nil {
t.Fatal(err)
}
}
@@ -468,7 +452,7 @@ func TestStoresClusterVersionWriteSynthesize(t *testing.T) {
expCV := clusterversion.ClusterVersion{
Version: versionA,
}
- if cv, err := ls012.SynthesizeClusterVersion(ctx); err != nil {
+ if cv, err := SynthesizeClusterVersionFromEngines(ctx, ls012.engines(), binV, minV); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(cv, expCV) {
t.Fatalf("expected %+v, got %+v", expCV, cv)
@@ -486,46 +470,50 @@ func TestStoresClusterVersionIncompatible(t *testing.T) {
vOneDashOne := roachpb.Version{Major: 1, Unstable: 1}
vOne := roachpb.Version{Major: 1}
- type testFn func(*clusterversion.ClusterVersion, *Stores) string
- for name, setter := range map[string]testFn{
- "StoreTooNew": func(cv *clusterversion.ClusterVersion, ls *Stores) string {
- // This is what the running node requires from its stores.
- ls.binaryMinSupportedVersion = vOne
+ type testCase struct {
+ binV, minV roachpb.Version // binary version and min supported version
+ engV roachpb.Version // version found on engine in test
+ expErr string
+ }
+ for name, tc := range map[string]testCase{
+ "StoreTooNew": {
// This is what the node is running.
- ls.binaryVersion = vOneDashOne
+ binV: vOneDashOne,
+ // This is what the running node requires from its stores.
+ minV: vOne,
// Version is way too high for this node.
- cv.Version = roachpb.Version{Major: 9}
- return `cockroach version v1\.0-1 is incompatible with data in store =; use version v9\.0 or later`
+ engV: roachpb.Version{Major: 9},
+ expErr: `cockroach version v1\.0-1 is incompatible with data in store =; use version v9\.0 or later`,
},
- "StoreTooOldVersion": func(cv *clusterversion.ClusterVersion, ls *Stores) string {
- // This is what the running node requires from its stores.
- ls.binaryMinSupportedVersion = roachpb.Version{Major: 5}
+ "StoreTooOldVersion": {
// This is what the node is running.
- ls.binaryVersion = roachpb.Version{Major: 9}
+ binV: roachpb.Version{Major: 9},
+ // This is what the running node requires from its stores.
+ minV: roachpb.Version{Major: 5},
// Version is way too low.
- cv.Version = roachpb.Version{Major: 4}
- return `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`
+ engV: roachpb.Version{Major: 4},
+ expErr: `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`,
},
- "StoreTooOldMinVersion": func(cv *clusterversion.ClusterVersion, ls *Stores) string {
+ "StoreTooOldMinVersion": {
// Like the previous test case, but this time cv.MinimumVersion is the culprit.
- ls.binaryMinSupportedVersion = roachpb.Version{Major: 5}
- ls.binaryVersion = roachpb.Version{Major: 9}
- cv.Version = roachpb.Version{Major: 4}
- return `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`
+ binV: roachpb.Version{Major: 9},
+ minV: roachpb.Version{Major: 5},
+ engV: roachpb.Version{Major: 4},
+ expErr: `store =, last used with cockroach version v4\.0, is too old for running version v9\.0 \(which requires data from v5\.0 or later\)`,
},
} {
t.Run(name, func(t *testing.T) {
- _, stores, ls, stopper := createStores(1, t)
- defer stopper.Stop(ctx)
- ls.AddStore(stores[0])
+ engs := []storage.Engine{storage.NewDefaultInMem()}
+ defer engs[0].Close()
// Configure versions and write.
- var cv clusterversion.ClusterVersion
- expErr := setter(&cv, ls)
- if err := ls.WriteClusterVersion(ctx, cv); err != nil {
+ cv := clusterversion.ClusterVersion{Version: tc.engV}
+ if err := WriteClusterVersionToEngines(ctx, engs, cv); err != nil {
t.Fatal(err)
}
- if _, err := ls.SynthesizeClusterVersion(ctx); !testutils.IsError(err, expErr) {
- t.Fatalf("unexpected error: %+v", err)
+ if cv, err := SynthesizeClusterVersionFromEngines(
+ ctx, engs, tc.binV, tc.minV,
+ ); !testutils.IsError(err, tc.expErr) {
+ t.Fatalf("unexpected error: %+v, got version %v", err, cv)
}
})
}
diff --git a/pkg/server/init.go b/pkg/server/init.go
index 1a8cf337396c..f3d4b6897ff6 100644
--- a/pkg/server/init.go
+++ b/pkg/server/init.go
@@ -17,8 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
@@ -150,7 +152,7 @@ type initState struct {
// TODO(tbg): give this a KV client and thus initialize at least one store in
// all cases.
func (s *initServer) ServeAndWait(
- ctx context.Context, stopper *stop.Stopper, g *gossip.Gossip,
+ ctx context.Context, stopper *stop.Stopper, sv *settings.Values, g *gossip.Gossip,
) (*initState, error) {
if len(s.inspectState.initializedEngines) != 0 {
// If already bootstrapped, return early.
@@ -168,7 +170,19 @@ func (s *initServer) ServeAndWait(
case <-stopper.ShouldQuiesce():
return nil, stop.ErrUnavailable
case state := <-s.bootstrapReqCh:
- // Bootstrap() did its job.
+ // Bootstrap() did its job. At this point, we know that the cluster
+ // version will be bootstrapVersion (=state.clusterVersion.Version), but
+ // the version setting does not know yet (it was initialized as
+ // BinaryMinSupportedVersion because the engines were all
+ // uninitialized). We *could* just let the server start, and it would
+ // populate system.settings, which is then gossiped, and then the
+ // callback would update the version, but we take this shortcut to avoid
+ // having every freshly bootstrapped cluster spend time at an old
+ // cluster version.
+ if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil {
+ return nil, err
+ }
+
log.Infof(ctx, "**** cluster %s has been created", state.clusterID)
return state, nil
case <-g.Connected:
@@ -244,7 +258,16 @@ func (s *initServer) Bootstrap(
func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) {
cv := clusterversion.ClusterVersion{Version: s.bootstrapVersion}
+ if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectState.newEngines, cv); err != nil {
+ return nil, err
+ }
return bootstrapCluster(
- ctx, s.inspectState.newEngines, cv, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig,
+ ctx, s.inspectState.newEngines, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig,
)
}
+
+// DiskClusterVersion returns the cluster version synthesized from disk. This
+// is always non-zero since it falls back to the BinaryMinSupportedVersion.
+func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion {
+ return s.inspectState.clusterVersion
+}
diff --git a/pkg/server/node.go b/pkg/server/node.go
index 354f893b8a13..e00e841f41bd 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -204,14 +204,32 @@ func GetBootstrapSchema(
func bootstrapCluster(
ctx context.Context,
engines []storage.Engine,
- bootstrapVersion clusterversion.ClusterVersion,
defaultZoneConfig *zonepb.ZoneConfig,
defaultSystemZoneConfig *zonepb.ZoneConfig,
) (*initState, error) {
clusterID := uuid.MakeV4()
// TODO(andrei): It'd be cool if this method wouldn't do anything to engines
// other than the first one, and let regular node startup code deal with them.
+ var bootstrapVersion clusterversion.ClusterVersion
for i, eng := range engines {
+ cv, err := kvserver.ReadClusterVersion(ctx, eng)
+ if err != nil {
+ return nil, errors.Wrapf(err, "reading cluster version of %s", eng)
+ }
+
+ // bootstrapCluster requires matching cluster versions on all engines.
+ if cv.Major == 0 {
+ return nil, errors.Errorf("missing bootstrap version")
+ }
+
+ if i == 0 {
+ bootstrapVersion = cv
+ }
+
+ if bootstrapVersion != cv {
+ return nil, errors.Wrapf(err, "found cluster versions %s and %s", bootstrapVersion, cv)
+ }
+
sIdent := roachpb.StoreIdent{
ClusterID: clusterID,
NodeID: FirstNodeID,
@@ -220,7 +238,7 @@ func bootstrapCluster(
// Initialize the engine backing the store with the store ident and cluster
// version.
- if err := kvserver.InitEngine(ctx, eng, sIdent, bootstrapVersion); err != nil {
+ if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {
return nil, err
}
@@ -251,6 +269,7 @@ func bootstrapCluster(
clusterID: clusterID,
clusterVersion: bootstrapVersion,
initializedEngines: engines,
+ newEngines: nil,
},
joined: true,
}
@@ -276,14 +295,11 @@ func NewNode(
eventLogger = sql.MakeEventLogger(execCfg)
}
n := &Node{
- storeCfg: cfg,
- stopper: stopper,
- recorder: recorder,
- metrics: makeNodeMetrics(reg, cfg.HistogramWindowInterval),
- stores: kvserver.NewStores(
- cfg.AmbientCtx, cfg.Clock,
- cfg.Settings.Version.BinaryVersion(),
- cfg.Settings.Version.BinaryMinSupportedVersion()),
+ storeCfg: cfg,
+ stopper: stopper,
+ recorder: recorder,
+ metrics: makeNodeMetrics(reg, cfg.HistogramWindowInterval),
+ stores: kvserver.NewStores(cfg.AmbientCtx, cfg.Clock),
txnMetrics: txnMetrics,
eventLogger: eventLogger,
clusterID: clusterID,
@@ -314,12 +330,6 @@ func (n *Node) AnnotateCtxWithSpan(
return n.storeCfg.AmbientCtx.AnnotateCtxWithSpan(ctx, opName)
}
-func (n *Node) onClusterVersionChange(ctx context.Context, cv clusterversion.ClusterVersion) {
- if err := n.stores.OnClusterVersionChange(ctx, cv); err != nil {
- log.Fatal(ctx, errors.Wrapf(err, "updating cluster version to %v", cv))
- }
-}
-
// start starts the node by registering the storage instance for the
// RPC service "Node" and initializing stores for each specified
// engine. Launches periodic store gossiping in a goroutine.
@@ -335,10 +345,6 @@ func (n *Node) start(
localityAddress []roachpb.LocalityAddress,
nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor),
) error {
- if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, &n.storeCfg.Settings.SV); err != nil {
- return err
- }
-
// Obtaining the NodeID requires a dance of sorts. If the node has initialized
// stores, the NodeID is persisted in each of them. If not, then we'll need to
// use the KV store to get a NodeID assigned.
@@ -442,13 +448,6 @@ func (n *Node) start(
return fmt.Errorf("failed to initialize the gossip interface: %s", err)
}
- // Read persisted ClusterVersion from each configured store to
- // verify there are no stores with data too old or too new for this
- // binary.
- if _, err := n.stores.SynthesizeClusterVersion(ctx); err != nil {
- return err
- }
-
// Bootstrap any uninitialized stores.
//
// TODO(tbg): address https://github.com/cockroachdb/cockroach/issues/39415.
@@ -461,17 +460,6 @@ func (n *Node) start(
n.startComputePeriodicMetrics(n.stopper, DefaultMetricsSampleInterval)
- // Now that we've created all our stores, install the gossip version update
- // handler to write version updates to them.
- // It's important that we persist new versions to the engines before the node
- // starts using it, otherwise the node might regress the version after a
- // crash.
- clusterversion.SetBeforeChange(ctx, &n.storeCfg.Settings.SV, n.onClusterVersionChange)
- // Invoke the callback manually once so that we persist the updated value that
- // gossip might have already received.
- clusterVersion := n.storeCfg.Settings.Version.ActiveVersion(ctx)
- n.onClusterVersionChange(ctx, clusterVersion)
-
// Be careful about moving this line above `startStores`; store migrations rely
// on the fact that the cluster version has not been updated via Gossip (we
// have migrations that want to run only if the server starts with a given
@@ -557,20 +545,6 @@ func (n *Node) bootstrapStores(
return errors.New("ClusterID missing during store bootstrap of auxiliary store")
}
- // There's a bit of an awkward dance around cluster versions here. If this node
- // is joining an existing cluster for the first time, it doesn't have any engines
- // set up yet, and cv below will be the binary's minimum supported version.
- // At the same time, the Gossip update which notifies us about the real
- // cluster version won't persist it to any engines (because we haven't
- // installed the gossip update handler yet and also because none of the
- // stores are bootstrapped). So we just accept that we won't use the correct
- // version here, but post-bootstrapping will invoke the callback manually,
- // which will disseminate the correct version to all engines.
- cv, err := n.stores.SynthesizeClusterVersion(ctx)
- if err != nil {
- return errors.Errorf("error retrieving cluster version for bootstrap: %s", err)
- }
-
{
// Bootstrap all waiting stores by allocating a new store id for
// each and invoking storage.Bootstrap() to persist it and the cluster
@@ -586,7 +560,7 @@ func (n *Node) bootstrapStores(
StoreID: firstID,
}
for _, eng := range emptyEngines {
- if err := kvserver.InitEngine(ctx, eng, sIdent, cv); err != nil {
+ if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {
return err
}
diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go
index 308a5ac6e16f..8204d9524b63 100644
--- a/pkg/server/node_test.go
+++ b/pkg/server/node_test.go
@@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
+ "github.com/stretchr/testify/require"
)
func formatKeys(keys []roachpb.Key) string {
@@ -58,8 +59,9 @@ func TestBootstrapCluster(t *testing.T) {
ctx := context.Background()
e := storage.NewDefaultInMem()
defer e.Close()
+ require.NoError(t, kvserver.WriteClusterVersion(ctx, e, clusterversion.TestingClusterVersion))
if _, err := bootstrapCluster(
- ctx, []storage.Engine{e}, clusterversion.TestingClusterVersion, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
+ ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
); err != nil {
t.Fatal(err)
}
@@ -231,8 +233,9 @@ func TestCorruptedClusterID(t *testing.T) {
cv := clusterversion.TestingClusterVersion
+ require.NoError(t, kvserver.WriteClusterVersion(ctx, e, cv))
if _, err := bootstrapCluster(
- ctx, []storage.Engine{e}, cv, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
+ ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
); err != nil {
t.Fatal(err)
}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 3332394f4e60..13d33f66c4c3 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -1498,6 +1499,83 @@ func (s *Server) Start(ctx context.Context) error {
return err
}
+ {
+ // Set up the callback that persists gossiped version bumps to the
+ // engines. The invariant we uphold here is that the bump needs to be
+ // persisted on all engines before it becomes "visible" to the version
+ // setting. To this end,
+ //
+ // a) make sure Gossip is not started yet, and
+ // b) set up the BeforeChange callback on the version setting to persist
+ // incoming updates to all engines.
+ // c) write back the disk-loaded cluster version to all engines,
+ // d) initialize the version setting (with the disk-loaded version).
+ //
+ // Note that "all engines" means "all engines", not "all initialized
+ // engines". We cannot initialize engines this early in the boot
+ // sequence.
+ s.gossip.AssertNotStarted(ctx)
+
+ // Serialize the callback through a mutex to make sure we're not
+ // clobbering the disk state if callback gets fired off concurrently.
+ var mu syncutil.Mutex
+ cb := func(ctx context.Context, newCV clusterversion.ClusterVersion) {
+ mu.Lock()
+ defer mu.Unlock()
+ v := s.cfg.Settings.Version
+ prevCV, err := kvserver.SynthesizeClusterVersionFromEngines(
+ ctx, s.engines, v.BinaryVersion(), v.BinaryMinSupportedVersion(),
+ )
+ if err != nil {
+ log.Fatal(ctx, err)
+ }
+ if !prevCV.Version.Less(newCV.Version) {
+ // If nothing needs to be updated, don't do anything. The
+ // callbacks fire async (or at least we want to assume the worst
+ // case in which they do) and so an old update might happen
+ // after a new one.
+ return
+ }
+ if err := kvserver.WriteClusterVersionToEngines(ctx, s.engines, newCV); err != nil {
+ log.Fatal(ctx, err)
+ }
+ log.Infof(ctx, "active cluster version is now %s (up from %s)", newCV, prevCV)
+ }
+ clusterversion.SetBeforeChange(ctx, &s.cfg.Settings.SV, cb)
+
+ diskClusterVersion := initServer.DiskClusterVersion()
+ // The version setting loaded from disk is the maximum cluster version
+ // seen on any engine. If new stores are being added to the server right
+ // now, or if the process crashed earlier half-way through the callback,
+ // that version won't be on all engines. For that reason, we backfill
+ // once.
+ if err := kvserver.WriteClusterVersionToEngines(
+ ctx, s.engines, diskClusterVersion,
+ ); err != nil {
+ return err
+ }
+
+ // NB: if we bootstrap a new server (in initServer.ServeAndWait below)
+ // we will call Initialize a second time, to eagerly move it to the
+ // bootstrap version (from the min supported version). Initialize()
+ // tolerates that. Note that in that case we know that the callback
+ // has not fired yet, since Gossip won't connect (to itself) until
+ // the server starts and so the callback will never fire prior to
+ // that second Initialize() call. Note also that at this point in
+ // the code we don't know if we'll bootstrap or join an existing
+ // cluster, so we have to conservatively go with the version from
+ // disk, which in the case of no initialized engines is the binary
+ // min supported version.
+ if err := clusterversion.Initialize(ctx, diskClusterVersion.Version, &s.cfg.Settings.SV); err != nil {
+ return err
+ }
+
+ // At this point, we've established the invariant: all engines hold the
+ // version currently visible to the setting. And we have the callback in
+ // place that will persist an incoming updated version on all engines
+ // before making it visible to the setting.
+ }
+
serverpb.RegisterInitServer(s.grpc.Server, initServer)
s.node.startAssertEngineHealth(ctx, s.engines)
@@ -1678,7 +1756,7 @@ func (s *Server) Start(ctx context.Context) error {
// This opens the main listener.
startRPCServer(workersCtx)
- state, err := initServer.ServeAndWait(ctx, s.stopper, s.gossip)
+ state, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, s.gossip)
if err != nil {
return errors.Wrap(err, "during init")
}
diff --git a/pkg/server/server_update.go b/pkg/server/server_update.go
index 49d2d8119fa9..9e476047ad0a 100644
--- a/pkg/server/server_update.go
+++ b/pkg/server/server_update.go
@@ -103,11 +103,17 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
}
var newVersion string
+ var notRunningErr error
for nodeID, st := range nodesWithLiveness {
if st.livenessStatus != storagepb.NodeLivenessStatus_LIVE &&
st.livenessStatus != storagepb.NodeLivenessStatus_DECOMMISSIONING {
- return false, errors.Errorf("node %d not running (%s), cannot determine version",
- nodeID, st.livenessStatus)
+ // We definitely won't be able to upgrade, but defer this error as
+ // we may find out that we are already at the latest version (the
+ // cluster may be up to date, but a node is down).
+ if notRunningErr == nil {
+ notRunningErr = errors.Errorf("node %d not running (%s), cannot determine version", nodeID, st.livenessStatus)
+ }
+ continue
}
version := st.NodeStatus.Desc.ServerVersion.String()
@@ -127,6 +133,10 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
return true, nil
}
+ if notRunningErr != nil {
+ return false, notRunningErr
+ }
+
// Check if auto upgrade is enabled at current version. This is read from
// the KV store so that it's in effect on all nodes immediately following a
// SET CLUSTER SETTING.
diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go
index 38e935516f64..2f8479b2a2ab 100644
--- a/pkg/testutils/localtestcluster/local_test_cluster.go
+++ b/pkg/testutils/localtestcluster/local_test_cluster.go
@@ -122,7 +122,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
storage.DefaultStorageEngine, roachpb.Attributes{}, 50<<20)
ltc.Stopper.AddCloser(ltc.Eng)
- ltc.Stores = kvserver.NewStores(ambient, ltc.Clock, clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion)
+ ltc.Stores = kvserver.NewStores(ambient, ltc.Clock)
factory := initFactory(cfg.Settings, nodeDesc, ambient.Tracer, ltc.Clock, ltc.Latency, ltc.Stores, ltc.Stopper, ltc.Gossip)
if ltc.DBContext == nil {
@@ -170,8 +170,11 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
cfg.TimestampCachePageSize = tscache.TestSklPageSize
ctx := context.TODO()
+ if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil {
+ t.Fatalf("unable to write cluster version: %s", err)
+ }
if err := kvserver.InitEngine(
- ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1}, clusterversion.TestingClusterVersion,
+ ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1},
); err != nil {
t.Fatalf("unable to start local test cluster: %s", err)
}
diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go
index 4fabed433a0e..bfa9fdea8f16 100644
--- a/pkg/testutils/serverutils/test_server_shim.go
+++ b/pkg/testutils/serverutils/test_server_shim.go
@@ -219,7 +219,7 @@ func StartServer(
) (TestServerInterface, *gosql.DB, *kv.DB) {
server, err := StartServerRaw(params)
if err != nil {
- t.Fatal(err)
+ t.Fatalf("%+v", err)
}
pgURL, cleanupGoDB := sqlutils.PGUrl(