Skip to content

Commit

Permalink
server: rework cluster version initialization
Browse files Browse the repository at this point in the history
A central invariant around cluster versions is that when an update
arrives, we need to persist it before exposing it through the version
setting. This was not true during server start time, as described in:

cockroachdb#47235 (comment)

In short, we were registering the callback to persist to the engines
*after* Gossip had already connected, opening up a window during which
a cluster version bump simply would not be persisted.

In the acceptance/version-upgrade test, this would manifest as nodes
refusing to start because their binary version had proceeded to far
beyond the persisted version. At the time of writing and before this
commit, this would happen in perhaps 1-10% of local runs on a linux
machine (rarely on OSX).

The buggy code was originally written when the startup sequence was a
lot more intricate and we were, as I recall, under pressure to deliver.
Now, with recent refactors around the `initServer`, we're in a
really good place to solve this while also simplifying the whole
story. In this commit,

- persist the cluster version on *all* engines, not just the initialized
  ones; this completely decouples the timing of when the engines get
  initialized from when we can set up the persistence callback and
  makes everything *much simpler*.
- stop opportunistically backfilling the cluster version. It is now
  done once at the beginning, in the right place, without FUD later
  on.
- remove the cluster version persistence from engine initialization.
  Anyone initializing an engine must put a cluster version on it
  first. In a running server, this happens during initServer creation
  time, way before there are any moving parts in the system. In tests,
  extra writes were added as needed.
- set up the callback with Gossip before starting Gossip, and make sure
  (via an assertion) that this property does not rot.
  By setting up the callback before Gossip starts, we make sure there
  isn't a period during which Gossip receives an update but doesn't have
  the callback yet.
- as a result of all of the above, take all knowledge of cluster version
  init away from `*Node` and `*Stores`.

As a last note, we are planning to stop using Gossip for this version
business in 20.1, though this too will be facilitated by this change.

Release note (bug fix): Avoid a condition during rapid version upgrades
where a node would refuse to start, claiming "[a store is] too old for
running version". Before this bug fix, the workaround is to decommission
the node, delete the store directory, and re-add it to the cluster as
a new node.
  • Loading branch information
tbg committed Apr 14, 2020
1 parent 0736b92 commit 9a78c72
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 296 deletions.
21 changes: 15 additions & 6 deletions pkg/clusterversion/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,24 @@ func (cv *clusterVersionSetting) initialize(
ctx context.Context, version roachpb.Version, sv *settings.Values,
) error {
if ver := cv.activeVersionOrEmpty(ctx, sv); ver != (ClusterVersion{}) {
// Allow initializing a second time as long as it's setting the version to
// what it was already set. This is useful in tests that use
// MakeTestingClusterSettings() which initializes the version, and the
// start a server which again initializes it.
// Allow initializing a second time as long as it's not regressing.
//
// This is useful in tests that use MakeTestingClusterSettings() which
// initializes the version, and the start a server which again
// initializes it once more.
//
// It's also used in production code during bootstrap, where the version
// is first initialized to BinaryMinSupportedVersion and then
// re-initialized to BootstrapVersion (=BinaryVersion).
if version.Less(ver.Version) {
return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s",
version, ver)
}
if version == ver.Version {
// Don't trigger callbacks, etc, a second time.
return nil
}
return errors.AssertionFailedf("cannot initialize version to %s because already set to: %s",
version, ver)
// Now version > ver.Version.
}
if err := cv.validateSupportedVersionInner(ctx, version, sv); err != nil {
return err
Expand Down
19 changes: 14 additions & 5 deletions pkg/cmd/roachtest/versionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ DROP TABLE test.t;
}

func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVersion string) {
// This is ugly, but we can't pass `--encrypt=false` to old versions of
// Cockroach.
//
// TODO(tbg): revisit as old versions are aged out of this test.
// This test uses fixtures and we do not have encrypted fixtures right now.
c.encryptDefault = false

// Set the bool within to true to create a new fixture for this test. This
Expand Down Expand Up @@ -104,6 +101,18 @@ func runVersionUpgrade(ctx context.Context, t *test, c *cluster, predecessorVers
waitForUpgradeStep(c.All()),
testFeaturesStep,

// Work around a bug in <= 20.1 that exists at the time of writing. The
// bug would sometimes cause the cluster version (predecessorVersion)
// to not be persisted on the engines. In that case, moving to the next
// version would fail. For details, see:
//
// https://github.com/cockroachdb/cockroach/pull/47358
//
// TODO(tbg): remove this when we stop running this test against 20.1
// or any earlier release. Or, make sure the <=20.1 fixtures all have
// the bumped cluster version on all stores.
binaryUpgradeStep(c.All(), predecessorVersion),

// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.

Expand Down Expand Up @@ -350,7 +359,7 @@ func waitForUpgradeStep(nodes nodeListOption) versionStep {
}
}

t.l.Printf("%s: nodes %v are upgraded\n", newVersion)
t.l.Printf("%s: nodes %v are upgraded\n", newVersion, nodes)

// TODO(nvanbenschoten): add upgrade qualification step.
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ type Storage interface {
// During bootstrapping, the bootstrap list contains candidates for
// entry to the gossip network.
type Gossip struct {
started bool // for assertions

*server // Embedded gossip RPC server

Connected chan struct{} // Closed upon initial connection
Expand Down Expand Up @@ -384,6 +386,13 @@ func NewTestWithLocality(
return gossip
}

// AssertNotStarted fatals if the Gossip instance was already started.
func (g *Gossip) AssertNotStarted(ctx context.Context) {
if g.started {
log.Fatalf(ctx, "Gossip instance was already started")
}
}

// GetNodeMetrics returns the gossip node metrics.
func (g *Gossip) GetNodeMetrics() *Metrics {
return g.server.GetNodeMetrics()
Expand Down Expand Up @@ -1244,6 +1253,8 @@ func (g *Gossip) MaxHops() uint32 {
// This method starts bootstrap loop, gossip server, and client
// management in separate goroutines and returns.
func (g *Gossip) Start(advertAddr net.Addr, resolvers []resolver.Resolver) {
g.AssertNotStarted(context.Background())
g.started = true
g.setResolvers(resolvers)
g.server.start(advertAddr) // serve gossip protocol
g.bootstrap() // bootstrap gossip client
Expand Down
14 changes: 6 additions & 8 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ func createTestStoreWithOpts(
nodeDesc.NodeID, rpcContext, server, stopper, metric.NewRegistry(), storeCfg.DefaultZoneConfig,
)
storeCfg.ScanMaxIdleTime = 1 * time.Second
stores := kvserver.NewStores(
ac, storeCfg.Clock,
clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion)
stores := kvserver.NewStores(ac, storeCfg.Clock)

if err := storeCfg.Gossip.SetNodeDescriptor(nodeDesc); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -177,8 +175,9 @@ func createTestStoreWithOpts(
// TODO(bdarnell): arrange to have the transport closed.
ctx := context.Background()
if !opts.dontBootstrap {
require.NoError(t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
if err := kvserver.InitEngine(
ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion,
ctx, eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -880,10 +879,11 @@ func (m *multiTestContext) addStore(idx int) {

ctx := context.Background()
if needBootstrap {
require.NoError(m.t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
if err := kvserver.InitEngine(ctx, eng, roachpb.StoreIdent{
NodeID: roachpb.NodeID(idx + 1),
StoreID: roachpb.StoreID(idx + 1),
}, clusterversion.TestingClusterVersion); err != nil {
}); err != nil {
m.t.Fatal(err)
}
}
Expand Down Expand Up @@ -914,9 +914,7 @@ func (m *multiTestContext) addStore(idx int) {
m.t.Fatal(err)
}

sender := kvserver.NewStores(ambient, clock,
clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion,
)
sender := kvserver.NewStores(ambient, clock)
sender.AddStore(store)
perReplicaServer := kvserver.MakeServer(&roachpb.NodeDescriptor{NodeID: nodeID}, sender)
kvserver.RegisterPerReplicaServer(grpcServer, perReplicaServer)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,12 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock)

require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv))
if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{
ClusterID: uuid.MakeV4(),
NodeID: 1,
StoreID: 1,
},
cv); err != nil {
}); err != nil {
t.Fatal(err)
}
if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil {
Expand Down
39 changes: 28 additions & 11 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,23 +2019,40 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64,
return hlcUpperBound, nil
}

func checkEngineEmpty(ctx context.Context, eng storage.Engine) error {
// checkCanInitializeEngine ensures that the engine is empty except for a
// cluster version, which must be present.
func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
kvs, err := storage.Scan(eng, roachpb.KeyMin, roachpb.KeyMax, 10)
if err != nil {
return err
}
if len(kvs) > 0 {
// See if this is an already-bootstrapped store.
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return errors.Wrap(err, "unable to read store ident")
}
keyVals := make([]string, len(kvs))
for i, kv := range kvs {
keyVals[i] = fmt.Sprintf("%s: %q", kv.Key, kv.Value)
// See if this is an already-bootstrapped store.
ident, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine already initialized as %s", ident.String())
} else if !errors.Is(errors.Cause(err), &NotBootstrappedError{}) {
return errors.Wrap(err, "unable to read store ident")
}

// Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain
// a cluster version and nothing else?

var sawClusterVersion bool
var keyVals []string
for _, kv := range kvs {
if kv.Key.Key.Equal(keys.StoreClusterVersionKey()) {
sawClusterVersion = true
continue
}
return errors.Errorf("engine belongs to store %s, contains %s", ident.String(), keyVals)
keyVals = append(keyVals, fmt.Sprintf("%s: %q", kv.Key, kv.Value))
}
if len(keyVals) > 0 {
return errors.Errorf("engine cannot be bootstrapped, contains:\n%s", keyVals)
}
if !sawClusterVersion {
return errors.New("no cluster version found on uninitialized engine")
}

return nil
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/kv/kvserver/store_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
Expand All @@ -29,14 +28,10 @@ import (
// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty. It returns an error if called on a
// non-empty engine.
func InitEngine(
ctx context.Context,
eng storage.Engine,
ident roachpb.StoreIdent,
cv clusterversion.ClusterVersion,
) error {
// should be completely empty save for a cluster version, which must
// already have been persisted to it. Returns an error if this is not
// the case.
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String())
Expand All @@ -45,8 +40,8 @@ func InitEngine(
return err
}

if err := checkEngineEmpty(ctx, eng); err != nil {
return errors.Wrap(err, "cannot verify empty engine for bootstrap")
if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize store")
}

batch := eng.NewBatch()
Expand All @@ -62,10 +57,6 @@ func InitEngine(
batch.Close()
return err
}
if err := WriteClusterVersion(ctx, batch, cv); err != nil {
batch.Close()
return errors.Wrap(err, "cannot write cluster version")
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting bootstrap data")
}
Expand Down
31 changes: 18 additions & 13 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ func createTestStoreWithoutStart(
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock)
store := NewStore(context.TODO(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
factory.setStore(store)

require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
if err := InitEngine(
context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, clusterversion.TestingClusterVersion,
context.TODO(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -436,8 +438,9 @@ func TestStoreInitAndBootstrap(t *testing.T) {
t.Error("expected failure starting un-bootstrapped store")
}

require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
// Bootstrap with a fake ident.
if err := InitEngine(ctx, eng, testIdent, clusterversion.TestingClusterVersion); err != nil {
if err := InitEngine(ctx, eng, testIdent); err != nil {
t.Fatalf("error bootstrapping store: %+v", err)
}

Expand Down Expand Up @@ -490,20 +493,26 @@ func TestStoreInitAndBootstrap(t *testing.T) {
}
}

// TestBootstrapOfNonEmptyStore verifies bootstrap failure if engine
// TestInitializeEngineErrors verifies bootstrap failure if engine
// is not empty.
func TestBootstrapOfNonEmptyStore(t *testing.T) {
func TestInitializeEngineErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
ctx := context.TODO()
defer stopper.Stop(ctx)
eng := storage.NewDefaultInMem()
stopper.AddCloser(eng)

// Put some random garbage into the engine.
if err := eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")); err != nil {
t.Errorf("failure putting key foo into engine: %+v", err)
// Bootstrap should fail if engine has no cluster version yet.
if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `no cluster version`) {
t.Fatalf("unexpected error: %v", err)
}

require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))

// Put some random garbage into the engine.
require.NoError(t, eng.Put(storage.MakeMVCCMetadataKey(roachpb.Key("foo")), []byte("bar")))

cfg := TestStoreConfig(nil)
cfg.Transport = NewDummyRaftTransport(cfg.Settings)
store := NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
Expand All @@ -516,12 +525,8 @@ func TestBootstrapOfNonEmptyStore(t *testing.T) {
}

// Bootstrap should fail on non-empty engine.
switch err := errors.Cause(InitEngine(
ctx, eng, testIdent, clusterversion.TestingClusterVersion,
)); err.(type) {
case *NotBootstrappedError:
default:
t.Errorf("unexpected error bootstrapping non-empty store: %+v", err)
if err := InitEngine(ctx, eng, testIdent); !testutils.IsError(err, `cannot be bootstrapped`) {
t.Fatalf("unexpected error: %v", err)
}
}

Expand Down
Loading

0 comments on commit 9a78c72

Please sign in to comment.