Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout topology watch and return better error messages for missing topology / namespaces #926

Merged
merged 8 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ db:
service: null
static: null
seedNodes: null
namespaceResolutionTimeout: 0s
topologyResolutionTimeout: 0s
writeConsistencyLevel: 2
readConsistencyLevel: 2
connectConsistencyLevel: 0
Expand Down Expand Up @@ -577,8 +575,6 @@ db:
trustedCaFile: ""
clientCertAuth: false
autoTls: false
namespaceResolutionTimeout: 0s
topologyResolutionTimeout: 0s
hashing:
seed: 42
writeNewSeriesAsync: true
Expand Down
34 changes: 8 additions & 26 deletions src/dbnode/environment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package environment
import (
"errors"
"fmt"
"time"

"github.com/m3db/m3/src/dbnode/kvconfig"
"github.com/m3db/m3/src/dbnode/sharding"
Expand All @@ -38,10 +37,6 @@ import (
"github.com/m3db/m3x/instrument"
)

const (
defaultSDTimeout = time.Duration(0) // Wait indefinitely by default
)

var (
errInvalidConfig = errors.New("must supply either service or static config")
)
Expand All @@ -56,12 +51,6 @@ type Configuration struct {

// Presence of a (etcd) server in this config denotes an embedded cluster
SeedNodes *SeedNodesConfig `yaml:"seedNodes"`

// NamespaceResolutionTimeout is the maximum time to wait to discover namespaces from KV
NamespaceResolutionTimeout time.Duration `yaml:"namespaceResolutionTimeout"`

// TopologyResolutionTimeout is the maximum time to wait for a topology from KV
TopologyResolutionTimeout time.Duration `yaml:"topologyResolutionTimeout"`
}

// SeedNodesConfig defines fields for seed node
Expand Down Expand Up @@ -109,11 +98,9 @@ type ConfigureResults struct {

// ConfigurationParameters are options used to create new ConfigureResults
type ConfigurationParameters struct {
InstrumentOpts instrument.Options
HashingSeed uint32
HostID string
NamespaceResolutionTimeout time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't just delete these (b/w compatibility and all), need to mark deprecated instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke offline and agreed its ok in this case because it was undocumented config and we're breaking the functionaltiy anyways

TopologyResolutionTimeout time.Duration
InstrumentOpts instrument.Options
HashingSeed uint32
HostID string
}

// Configure creates a new ConfigureResults
Expand All @@ -136,14 +123,11 @@ func (c Configuration) Configure(cfgParams ConfigurationParameters) (ConfigureRe
}

func (c Configuration) configureDynamic(cfgParams ConfigurationParameters) (ConfigureResults, error) {
sdTimeout := defaultSDTimeout
if initTimeout := c.Service.SDConfig.InitTimeout; initTimeout != nil && *initTimeout != 0 {
sdTimeout = *initTimeout
}

configSvcClientOpts := c.Service.NewOptions().
SetInstrumentOptions(cfgParams.InstrumentOpts).
SetServicesOptions(c.Service.SDConfig.NewOptions().SetInitTimeout(sdTimeout))
// Set timeout to zero so it will wait indefinitely for the
// initial value.
SetServicesOptions(services.NewOptions().SetInitTimeout(0))
configSvcClient, err := etcdclient.NewConfigServiceClient(configSvcClientOpts)
if err != nil {
err = fmt.Errorf("could not create m3cluster client: %v", err)
Expand All @@ -153,8 +137,7 @@ func (c Configuration) configureDynamic(cfgParams ConfigurationParameters) (Conf
dynamicOpts := namespace.NewDynamicOptions().
SetInstrumentOptions(cfgParams.InstrumentOpts).
SetConfigServiceClient(configSvcClient).
SetNamespaceRegistryKey(kvconfig.NamespacesKey).
SetInitTimeout(cfgParams.NamespaceResolutionTimeout)
SetNamespaceRegistryKey(kvconfig.NamespacesKey)
nsInit := namespace.NewDynamicInitializer(dynamicOpts)

serviceID := services.NewServiceID().
Expand All @@ -167,8 +150,7 @@ func (c Configuration) configureDynamic(cfgParams ConfigurationParameters) (Conf
SetServiceID(serviceID).
SetQueryOptions(services.NewQueryOptions().SetIncludeUnhealthy(true)).
SetInstrumentOptions(cfgParams.InstrumentOpts).
SetHashGen(sharding.NewHashGenWithSeed(cfgParams.HashingSeed)).
SetInitTimeout(cfgParams.TopologyResolutionTimeout)
SetHashGen(sharding.NewHashGenWithSeed(cfgParams.HashingSeed))
topoInit := topology.NewDynamicInitializer(topoOpts)

kv, err := configSvcClient.KV()
Expand Down
22 changes: 4 additions & 18 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ import (
)

const (
bootstrapConfigInitTimeout = 10 * time.Second
serverGracefulCloseTimeout = 10 * time.Second
defaultNamespaceResolutionTimeout = time.Minute
defaultTopologyResolutionTimeout = time.Minute
bootstrapConfigInitTimeout = 10 * time.Second
serverGracefulCloseTimeout = 10 * time.Second
)

// RunOptions provides options for running the server
Expand Down Expand Up @@ -378,21 +376,9 @@ func Run(runOpts RunOptions) {
if cfg.EnvironmentConfig.Static == nil {
logger.Info("creating dynamic config service client with m3cluster")

namespaceResolutionTimeout := cfg.EnvironmentConfig.NamespaceResolutionTimeout
if namespaceResolutionTimeout <= 0 {
namespaceResolutionTimeout = defaultNamespaceResolutionTimeout
}

topologyResolutionTimeout := cfg.EnvironmentConfig.TopologyResolutionTimeout
if topologyResolutionTimeout <= 0 {
topologyResolutionTimeout = defaultTopologyResolutionTimeout
}

envCfg, err = cfg.EnvironmentConfig.Configure(environment.ConfigurationParameters{
InstrumentOpts: iopts,
HashingSeed: cfg.Hashing.Seed,
NamespaceResolutionTimeout: namespaceResolutionTimeout,
TopologyResolutionTimeout: topologyResolutionTimeout,
InstrumentOpts: iopts,
HashingSeed: cfg.Hashing.Seed,
})
if err != nil {
logger.Fatalf("could not initialize dynamic config: %v", err)
Expand Down
21 changes: 4 additions & 17 deletions src/dbnode/storage/namespace/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,10 @@ func newDynamicRegistry(opts DynamicOptions) (Registry, error) {
}

logger := opts.InstrumentOptions().Logger()
if err = waitOnInit(watch, opts.InitTimeout()); err != nil {
logger.Errorf("dynamic namespace registry initialization timed out in %s: %v",
opts.InitTimeout().String(), err)
return nil, err
}
logger.Info(`waiting for dynamic namespace registry initialization.
If this takes a long time, make sure that a namespace is configured`)
<-watch.C()
logger.Info("initial namespace value received")

initValue := watch.Get()
m, err := getMapFromUpdate(initValue)
Expand Down Expand Up @@ -238,18 +237,6 @@ func (r *dynamicRegistry) Close() error {
return nil
}

func waitOnInit(w kv.ValueWatch, d time.Duration) error {
if d <= 0 {
return nil
}
select {
case <-w.C():
return nil
case <-time.After(d):
return errInitTimeOut
}
}

func getMapFromUpdate(val kv.Value) (Map, error) {
if val == nil {
return nil, errInvalidRegistry
Expand Down
10 changes: 0 additions & 10 deletions src/dbnode/storage/namespace/dynamic_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,3 @@ func (o *dynamicOpts) SetNamespaceRegistryKey(k string) DynamicOptions {
func (o *dynamicOpts) NamespaceRegistryKey() string {
return o.nsRegistryKey
}

func (o *dynamicOpts) SetInitTimeout(value time.Duration) DynamicOptions {
opts := *o
opts.initTimeout = value
return &opts
}

func (o *dynamicOpts) InitTimeout() time.Duration {
return o.initTimeout
}
15 changes: 0 additions & 15 deletions src/dbnode/storage/namespace/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func newTestOpts(t *testing.T, ctrl *gomock.Controller, watchable kv.ValueWatcha
instrument.NewOptions().
SetReportInterval(10 * time.Millisecond).
SetMetricsScope(ts)).
SetInitTimeout(100 * time.Millisecond).
SetConfigServiceClient(mockCSClient)

return opts
Expand All @@ -78,20 +77,6 @@ func currentVersionMetrics(opts DynamicOptions) float64 {
return g.Value()
}

func TestInitializerTimeout(t *testing.T) {
defer leaktest.CheckTimeout(t, time.Second)()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

w := newTestWatchable(t, nil)
opts := newTestOpts(t, ctrl, w)
init := NewDynamicInitializer(opts)
_, err := init.Init()
require.Error(t, err)
require.Equal(t, errInitTimeOut, err)
}

func TestInitializerNoTimeout(t *testing.T) {
defer leaktest.CheckTimeout(t, time.Second)()

Expand Down
24 changes: 0 additions & 24 deletions src/dbnode/storage/namespace/namespace_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions src/dbnode/storage/namespace/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,4 @@ type DynamicOptions interface {
// NamespaceRegistryKey returns the kv-store key used for the
// NamespaceRegistry
NamespaceRegistryKey() string

// SetInitTimeout sets the waiting time for dynamic topology to be initialized
SetInitTimeout(value time.Duration) DynamicOptions

// InitTimeout returns the waiting time for dynamic topology to be initialized
InitTimeout() time.Duration
}
28 changes: 7 additions & 21 deletions src/dbnode/topology/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package topology
import (
"errors"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3cluster/placement"
Expand Down Expand Up @@ -90,17 +89,17 @@ func newDynamicTopology(opts DynamicOptions) (DynamicTopology, error) {
if err != nil {
return nil, err
}
watch, err := services.Watch(opts.ServiceID(), opts.QueryOptions())
if err != nil {
return nil, err
}

logger := opts.InstrumentOptions().Logger()
if err = waitOnInit(watch, opts.InitTimeout()); err != nil {
logger.Errorf("dynamic topology initialization timed out in %s: %v",
opts.InitTimeout().String(), err)
logger.Info(`waiting for dynamic topology initialization.
If this takes a long time, make sure that a topology / placement is configured`)
// Watch will wait for an initial value so we don't need to do that
// in this function.
watch, err := services.Watch(opts.ServiceID(), opts.QueryOptions())
if err != nil {
return nil, err
}
logger.Info("initial topology / placement value received")

m, err := getMapFromUpdate(watch.Get(), opts.HashGen())
if err != nil {
Expand Down Expand Up @@ -185,19 +184,6 @@ func (t *dynamicTopology) MarkShardsAvailable(
return ps.MarkShardsAvailable(instanceID, shardIDs...)
}

func waitOnInit(w services.Watch, d time.Duration) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We finally removed this, noice!

if d <= 0 {
<-w.C() // Wait for the first placement indefinitely
return nil
}
select {
case <-w.C():
return nil
case <-time.After(d):
return errInitTimeOut
}
}

func getMapFromUpdate(data interface{}, hashGen sharding.HashGen) (Map, error) {
service, ok := data.(services.Service)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/topology/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestInitTimeout(t *testing.T) {
opts, w := testSetup(ctrl)
defer testFinish(ctrl, w)

topo, err := newDynamicTopology(opts.SetInitTimeout(10 * time.Millisecond))
topo, err := newDynamicTopology(opts)
assert.Equal(t, errInitTimeOut, err)
assert.Nil(t, topo)
}
Expand Down
9 changes: 0 additions & 9 deletions src/dbnode/topology/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,6 @@ func (o *dynamicOptions) InstrumentOptions() instrument.Options {
return o.instrumentOptions
}

func (o *dynamicOptions) SetInitTimeout(value time.Duration) DynamicOptions {
o.initTimeout = value
return o
}

func (o *dynamicOptions) InitTimeout() time.Duration {
return o.initTimeout
}

func (o *dynamicOptions) SetHashGen(h sharding.HashGen) DynamicOptions {
o.hashGen = h
return o
Expand Down
25 changes: 0 additions & 25 deletions src/dbnode/topology/topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions src/dbnode/topology/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package topology

import (
"time"

"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3cluster/client"
"github.com/m3db/m3cluster/services"
Expand Down Expand Up @@ -205,12 +203,6 @@ type DynamicOptions interface {
// InstrumentOptions returns the instrumentation options
InstrumentOptions() instrument.Options

// SetInitTimeout sets the waiting time for dynamic topology to be initialized
SetInitTimeout(value time.Duration) DynamicOptions

// InitTimeout returns the waiting time for dynamic topology to be initialized
InitTimeout() time.Duration

// SetHashGen sets the HashGen function
SetHashGen(h sharding.HashGen) DynamicOptions

Expand Down