From 79ec60e313b5d5ebdbf7f4640019097627d6c5af Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Mon, 22 Jul 2024 21:29:53 -0400 Subject: [PATCH 1/8] feat: gossip config reload information --- cmd/refinery/main.go | 2 + collect/collect_test.go | 6 +-- config/config.go | 18 ++++--- config/file_config.go | 60 +++++++++++++-------- config/mock.go | 11 +++- internal/configwatcher/watcher.go | 86 ++++++++++++++++++++++++++++++ sharder/deterministic_test.go | 4 +- test/EMAThroughput_rules.yaml | 12 +++++ test/TotalThroughput_rules.yaml | 12 +++++ test/WindowedThroughput_rules.yaml | 12 +++++ test/config.yaml | 28 ++++++++++ test/rules.yaml | 18 +++++++ 12 files changed, 234 insertions(+), 35 deletions(-) create mode 100644 internal/configwatcher/watcher.go create mode 100644 test/EMAThroughput_rules.yaml create mode 100644 test/TotalThroughput_rules.yaml create mode 100644 test/WindowedThroughput_rules.yaml create mode 100644 test/config.yaml create mode 100644 test/rules.yaml diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 2123cb07ab..752a90f876 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -25,6 +25,7 @@ import ( "github.com/honeycombio/refinery/app" "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/configwatcher" "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/internal/peer" @@ -266,6 +267,7 @@ func main() { {Value: samplerFactory}, {Value: stressRelief, Name: "stressRelief"}, {Value: &health.Health{}}, + {Value: &configwatcher.ConfigWatcher{}}, {Value: &a}, } err = g.Provide(objects...) diff --git a/collect/collect_test.go b/collect/collect_test.go index 764b804a5b..749784fb23 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -508,7 +508,7 @@ func TestCacheSizeReload(t *testing.T) { conf.Mux.Lock() conf.GetCollectionConfigVal.CacheCapacity = 2 conf.Mux.Unlock() - conf.ReloadConfig() + conf.Reload() assert.Eventually(t, func() bool { coll.mutex.RLock() @@ -525,7 +525,7 @@ func TestCacheSizeReload(t *testing.T) { conf.Mux.Lock() conf.GetCollectionConfigVal.CacheCapacity = 1 conf.Mux.Unlock() - conf.ReloadConfig() + conf.Reload() expectedEvents = 2 assert.Eventually(t, check, 60*wait, wait, "expected another trace evicted and sent") @@ -574,7 +574,7 @@ func TestSampleConfigReload(t *testing.T) { return ok }, conf.GetTraceTimeoutVal*2, conf.SendTickerVal) - conf.ReloadConfig() + conf.Reload() assert.Eventually(t, func() bool { coll.mutex.Lock() diff --git a/config/config.go b/config/config.go index db7685ccb4..eca4add9a9 100644 --- a/config/config.go +++ b/config/config.go @@ -10,19 +10,25 @@ const ( // Config defines the interface the rest of the code uses to get items from the // config. There are different implementations of the config using different -// backends to store the config. FileConfig is the default and uses a -// TOML-formatted config file. RedisPeerFileConfig uses a redis cluster to store -// the list of peers and then falls back to a filesystem config file for all -// other config elements. +// backends to store the config. type Config interface { // RegisterReloadCallback takes a name and a function that will be called - // when the configuration is reloaded. This will happen infrequently. If + // whenever the configuration is reloaded. This will happen infrequently. If // consumers of configuration set config values on startup, they should // check their values haven't changed and re-start anything that needs - // restarting with the new values. + // restarting with the new values. The callback is passed the two hashes + // for config and rules so that the caller can decide if they need to + // reconfigure anything. RegisterReloadCallback(callback ConfigReloadCallback) + // Reload forces the config to attempt to reload its values. If the config + // checksum has changed, the reload callbacks will be called. + Reload() + + // GetHashes returns the current config and rule hashes + GetHashes() (cfg string, rules string) + // GetListenAddr returns the address and port on which to listen for // incoming events GetListenAddr() (string, error) diff --git a/config/file_config.go b/config/file_config.go index 4d6f027917..ce36a3ef27 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -430,6 +430,42 @@ func NewConfig(opts *CmdEnv, errorCallback func(error)) (Config, error) { return cfg, err } +// Reload attempts to reload the configuration; if it has changed, it stores the +// new data and calls the reload callbacks. +func (f *fileConfig) Reload() { + // reread the configs + cfg, err := newFileConfig(f.opts) + if err != nil { + f.errorCallback(err) + return + } + + // if nothing's changed, we're fine + if f.mainHash == cfg.mainHash && f.rulesHash == cfg.rulesHash { + return + } + + // otherwise, update our state and call the callbacks + f.mux.Lock() + f.mainConfig = cfg.mainConfig + f.mainHash = cfg.mainHash + f.rulesConfig = cfg.rulesConfig + f.rulesHash = cfg.rulesHash + f.mux.Unlock() // can't defer -- we don't want callbacks to deadlock + + for _, cb := range f.callbacks { + cb(cfg.mainHash, cfg.rulesHash) + } +} + +// GetHashes returns the current hash values for the main and rules configs. +func (f *fileConfig) GetHashes() (cfg string, rules string) { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.mainHash, f.rulesHash +} + func (f *fileConfig) monitor() { f.done = make(chan struct{}) // adjust the time by +/- 10% to avoid everyone reloading at the same time @@ -440,29 +476,7 @@ func (f *fileConfig) monitor() { case <-f.done: return case <-f.ticker.C: - // reread the configs - cfg, err := newFileConfig(f.opts) - if err != nil { - f.errorCallback(err) - continue - } - - // if nothing's changed, we're fine - if f.mainHash == cfg.mainHash && f.rulesHash == cfg.rulesHash { - continue - } - - // otherwise, update our state and call the callbacks - f.mux.Lock() - f.mainConfig = cfg.mainConfig - f.mainHash = cfg.mainHash - f.rulesConfig = cfg.rulesConfig - f.rulesHash = cfg.rulesHash - f.mux.Unlock() // can't defer -- routine never ends, and callbacks will deadlock - - for _, cb := range f.callbacks { - cb(cfg.mainHash, cfg.rulesHash) - } + f.Reload() } } } diff --git a/config/mock.go b/config/mock.go index 07ffe70b47..36dcf77463 100644 --- a/config/mock.go +++ b/config/mock.go @@ -91,11 +91,13 @@ type MockConfig struct { TraceIdFieldNames []string ParentIdFieldNames []string CfgMetadata []ConfigMetadata + CfgHash string + RulesHash string Mux sync.RWMutex } -func (m *MockConfig) ReloadConfig() { +func (m *MockConfig) Reload() { m.Mux.RLock() defer m.Mux.RUnlock() @@ -110,6 +112,13 @@ func (m *MockConfig) RegisterReloadCallback(callback ConfigReloadCallback) { m.Mux.Unlock() } +func (m *MockConfig) GetHashes() (string, string) { + m.Mux.RLock() + defer m.Mux.RUnlock() + + return m.CfgHash, m.RulesHash +} + func (m *MockConfig) IsAPIKeyValid(key string) bool { m.Mux.RLock() defer m.Mux.RUnlock() diff --git a/internal/configwatcher/watcher.go b/internal/configwatcher/watcher.go new file mode 100644 index 0000000000..e97bdb1a38 --- /dev/null +++ b/internal/configwatcher/watcher.go @@ -0,0 +1,86 @@ +package configwatcher + +import ( + "context" + "strings" + + "github.com/facebookgo/startstop" + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/otelutil" + "github.com/honeycombio/refinery/pubsub" + "go.opentelemetry.io/otel/trace" +) + +const ConfigPubsubTopic = "cfg_update" + +// This exists in internal because it depends on both config and pubsub; it +// should be part of config, but it also needs to be able to use pubsub, which +// depends on config, which would create a circular dependency. +// So we have to create it after creating pubsub and let dependency injection work. + +// ConfigWatcher listens to configuration changes and publishes notice of them. +// It avoids sending duplicate messages by comparing the hash of the configs. +type ConfigWatcher struct { + Config config.Config + PubSub pubsub.PubSub + Tracer trace.Tracer `inject:"tracer"` + subscr pubsub.Subscription + lastmsg string + startstop.Starter + startstop.Stopper +} + +// ReloadCallback is used to tell others that the config has changed. +// This gets called whenever it has actually changed, but it might have +// changed because we were told about it in pubsub, so we don't publish +// a message if the hashes are the same. +func (cw *ConfigWatcher) ReloadCallback(cfgHash, rulesHash string) { + ctx := context.Background() + ctx, span := otelutil.StartSpanMulti(ctx, cw.Tracer, "ConfigWatcher.ReloadCallback", map[string]any{ + "new_config_hash": cfgHash, + "new_rules_hash": rulesHash, + }) + defer span.End() + + message := cfgHash + ":" + rulesHash + // don't republish if we got this message from the subscription + if message != cw.lastmsg { + cw.PubSub.Publish(ctx, ConfigPubsubTopic, message) + } +} + +// SubscriptionListener listens for messages on the config pubsub topic and reloads the config +// if a new set of hashes is received. +func (cw *ConfigWatcher) SubscriptionListener(ctx context.Context, msg string) { + _, span := otelutil.StartSpanWith(ctx, cw.Tracer, "ConfigWatcher.SubscriptionListener", "message", msg) + defer span.End() + + cw.lastmsg = msg + + parts := strings.Split(msg, ":") + if len(parts) != 2 { + return + } + newCfg, newRules := parts[0], parts[1] + // we have been told about a new config, but do we already have it? + currentCfg, currentRules := cw.Config.GetHashes() + if newCfg == currentCfg && newRules == currentRules { + // we already have this config, no need to reload + otelutil.AddSpanField(span, "loading_config", false) + return + } + // it's new, so reload + otelutil.AddSpanField(span, "loading_config", true) + cw.Config.Reload() +} + +func (cw *ConfigWatcher) Start() error { + cw.subscr = cw.PubSub.Subscribe(context.Background(), ConfigPubsubTopic, cw.SubscriptionListener) + cw.Config.RegisterReloadCallback(cw.ReloadCallback) + return nil +} + +func (cw *ConfigWatcher) Stop() error { + cw.subscr.Close() + return nil +} diff --git a/sharder/deterministic_test.go b/sharder/deterministic_test.go index 73ea093fd4..74bc1b3fb3 100644 --- a/sharder/deterministic_test.go +++ b/sharder/deterministic_test.go @@ -45,7 +45,7 @@ func TestWhichShard(t *testing.T) { "should select a peer for a trace") config.GetPeersVal = []string{} - config.ReloadConfig() + config.Reload() assert.Equal(t, shard.GetAddress(), sharder.WhichShard(traceID).GetAddress(), "should select the same peer if peer list becomes empty") } @@ -87,7 +87,7 @@ func TestWhichShardAtEdge(t *testing.T) { "should select a peer for a trace") config.GetPeersVal = []string{} - config.ReloadConfig() + config.Reload() assert.Equal(t, shard.GetAddress(), sharder.WhichShard(traceID).GetAddress(), "should select the same peer if peer list becomes empty") } diff --git a/test/EMAThroughput_rules.yaml b/test/EMAThroughput_rules.yaml new file mode 100644 index 0000000000..596f13f68b --- /dev/null +++ b/test/EMAThroughput_rules.yaml @@ -0,0 +1,12 @@ +RulesVersion: 2 +Samplers: + __default__: + DeterministicSampler: + SampleRate: 1 + + TheNewWorld: + EMAThroughputSampler: + GoalThroughputPerSec: 50 + AdjustmentInterval: 5s + FieldList: + - title diff --git a/test/TotalThroughput_rules.yaml b/test/TotalThroughput_rules.yaml new file mode 100644 index 0000000000..596f13f68b --- /dev/null +++ b/test/TotalThroughput_rules.yaml @@ -0,0 +1,12 @@ +RulesVersion: 2 +Samplers: + __default__: + DeterministicSampler: + SampleRate: 1 + + TheNewWorld: + EMAThroughputSampler: + GoalThroughputPerSec: 50 + AdjustmentInterval: 5s + FieldList: + - title diff --git a/test/WindowedThroughput_rules.yaml b/test/WindowedThroughput_rules.yaml new file mode 100644 index 0000000000..596f13f68b --- /dev/null +++ b/test/WindowedThroughput_rules.yaml @@ -0,0 +1,12 @@ +RulesVersion: 2 +Samplers: + __default__: + DeterministicSampler: + SampleRate: 1 + + TheNewWorld: + EMAThroughputSampler: + GoalThroughputPerSec: 50 + AdjustmentInterval: 5s + FieldList: + - title diff --git a/test/config.yaml b/test/config.yaml new file mode 100644 index 0000000000..9af3be2a65 --- /dev/null +++ b/test/config.yaml @@ -0,0 +1,28 @@ +General: + ConfigurationVersion: 2 + MinRefineryVersion: v2.0 + ConfigReloadInterval: 50s + +Network: + HoneycombAPI: https://api-dogfood.honeycomb.io + +Logger: + Type: stdout + Level: debug + +LegacyMetrics: + Enabled: true + Dataset: refinery_metrics + APIKey: HTab42OXw838zcJDH3eJEH + APIHost: https://api-dogfood.honeycomb.io + +OTelMetrics: + Enabled: true + Dataset: refinery_metrics_otel + APIKey: HTab42OXw838zcJDH3eJEH + APIHost: https://api-dogfood.honeycomb.io + +RefineryTelemetry: + AddRuleReasonToTrace: true + AddSpanCountToRoot: true + AddHostMetadataToTrace: true diff --git a/test/rules.yaml b/test/rules.yaml new file mode 100644 index 0000000000..1bff2bd112 --- /dev/null +++ b/test/rules.yaml @@ -0,0 +1,18 @@ + +RulesVersion: 2 +Samplers: + __default__: + DeterministicSampler: + SampleRate: 1 + + TheNewWorld: + + + TotalThroughputSampler: + GoalThroughputPerSec: 50 + ClearFrequency: 5s + + + FieldList: + - title + From bb28bd012cef56be2313eb58f747427f6698b45b Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 17:53:30 -0400 Subject: [PATCH 2/8] Move monitor into watcher, fix things --- config/config.go | 3 + config/config_test.go | 192 ++++++++---------------------- config/file_config.go | 40 ++----- config/memorysize.go | 8 +- config/metadata/configMeta.yaml | 6 +- config/mock.go | 8 ++ internal/configwatcher/watcher.go | 67 +++++++---- 7 files changed, 121 insertions(+), 203 deletions(-) diff --git a/config/config.go b/config/config.go index eca4add9a9..f762ae4b78 100644 --- a/config/config.go +++ b/config/config.go @@ -134,6 +134,9 @@ type Config interface { // GetAllSamplerRules returns all rules in a single map, including the default rules GetAllSamplerRules() (*V2SamplerConfig, error) + // GetGeneralConfig returns the config specific to General + GetGeneralConfig() GeneralConfig + // GetLegacyMetricsConfig returns the config specific to LegacyMetrics GetLegacyMetricsConfig() LegacyMetricsConfig diff --git a/config/config_test.go b/config/config_test.go index 6a25730e1f..2079a5d471 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,23 +1,23 @@ -package config +package config_test import ( "fmt" "os" "strings" - "sync" "testing" "time" + "github.com/honeycombio/refinery/config" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" ) -func getConfig(args []string) (Config, error) { - opts, err := NewCmdEnvOptions(args) +func getConfig(args []string) (config.Config, error) { + opts, err := config.NewCmdEnvOptions(args) if err != nil { return nil, err } - return NewConfig(opts, func(err error) {}) + return config.NewConfig(opts, func(err error) {}) } // creates two temporary yaml files from the strings passed in and returns their filenames @@ -195,94 +195,6 @@ func TestMetricsAPIKeyFallbackEnvVar(t *testing.T) { } } -func TestReload(t *testing.T) { - cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", Duration(1*time.Second), "Network.ListenAddr", "0.0.0.0:8080") - rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) - defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) - assert.NoError(t, err) - - if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { - t.Error("received", d, "expected", "0.0.0.0:8080") - } - - wg := &sync.WaitGroup{} - - ch := make(chan interface{}, 1) - - c.RegisterReloadCallback(func(cfgHash, ruleHash string) { - close(ch) - }) - - // Hey race detector, we're doing some concurrent config reads. - // That's cool, right? - go func() { - tick := time.NewTicker(time.Millisecond) - defer tick.Stop() - for { - c.GetListenAddr() - select { - case <-ch: - return - case <-tick.C: - } - } - }() - - wg.Add(1) - - go func() { - defer wg.Done() - select { - case <-ch: - case <-time.After(5 * time.Second): - t.Error("No callback") - } - }() - - if file, err := os.OpenFile(config, os.O_RDWR, 0644); err == nil { - cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", Duration(1*time.Second), "Network.ListenAddr", "0.0.0.0:9000") - file.WriteString(cm) - file.Close() - } - - wg.Wait() - - if d, _ := c.GetListenAddr(); d != "0.0.0.0:9000" { - t.Error("received", d, "expected", "0.0.0.0:9000") - } - -} - -func TestReloadDisabled(t *testing.T) { - cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", Duration(0*time.Second), "Network.ListenAddr", "0.0.0.0:8080") - rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) - defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) - assert.NoError(t, err) - - if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { - t.Error("received", d, "expected", "0.0.0.0:8080") - } - - if file, err := os.OpenFile(config, os.O_RDWR, 0644); err == nil { - // Since we disabled reload checking this should not change anything - cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", Duration(0*time.Second), "Network.ListenAddr", "0.0.0.0:9000") - file.WriteString(cm) - file.Close() - } - - time.Sleep(5 * time.Second) - - if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { - t.Error("received", d, "expected", "0.0.0.0:8080") - } -} - func TestReadDefaults(t *testing.T) { c, err := getConfig([]string{"--no-validate", "--config", "../config.yaml", "--rules_config", "../rules.yaml"}) assert.NoError(t, err) @@ -321,7 +233,7 @@ func TestReadDefaults(t *testing.T) { d, name, err := c.GetSamplerConfigForDestName("dataset-doesnt-exist") assert.NoError(t, err) - assert.IsType(t, &DeterministicSamplerConfig{}, d) + assert.IsType(t, &config.DeterministicSamplerConfig{}, d) assert.Equal(t, "DeterministicSampler", name) } @@ -331,21 +243,21 @@ func TestReadRulesConfig(t *testing.T) { d, name, err := c.GetSamplerConfigForDestName("doesnt-exist") assert.NoError(t, err) - assert.IsType(t, &DeterministicSamplerConfig{}, d) + assert.IsType(t, &config.DeterministicSamplerConfig{}, d) assert.Equal(t, "DeterministicSampler", name) d, name, err = c.GetSamplerConfigForDestName("env1") assert.NoError(t, err) - assert.IsType(t, &DynamicSamplerConfig{}, d) + assert.IsType(t, &config.DynamicSamplerConfig{}, d) assert.Equal(t, "DynamicSampler", name) d, name, err = c.GetSamplerConfigForDestName("env4") assert.NoError(t, err) switch r := d.(type) { - case *RulesBasedSamplerConfig: + case *config.RulesBasedSamplerConfig: assert.Len(t, r.Rules, 6) - var rule *RulesBasedSamplerRule + var rule *config.RulesBasedSamplerRule rule = r.Rules[0] assert.True(t, rule.Drop) @@ -445,13 +357,13 @@ func TestDryRun(t *testing.T) { func TestMaxAlloc(t *testing.T) { cm := makeYAML("General.ConfigurationVersion", 2, "Collection.CacheCapacity", 1000, "Collection.MaxAlloc", 17179869184) rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) assert.NoError(t, err) - expected := MemorySize(16 * 1024 * 1024 * 1024) + expected := config.MemorySize(16 * 1024 * 1024 * 1024) inMemConfig, err := c.GetCollectionConfig() assert.NoError(t, err) assert.Equal(t, expected, inMemConfig.MaxAlloc) @@ -508,13 +420,13 @@ func TestPeerAndIncomingQueueSize(t *testing.T) { func TestAvailableMemoryCmdLine(t *testing.T) { cm := makeYAML("General.ConfigurationVersion", 2, "Collection.CacheCapacity", 1000, "Collection.AvailableMemory", 2_000_000_000) rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules, "--available-memory", "2.5Gib"}) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules, "--available-memory", "2.5Gib"}) assert.NoError(t, err) - expected := MemorySize(2*1024*1024*1024 + 512*1024*1024) + expected := config.MemorySize(2*1024*1024*1024 + 512*1024*1024) inMemConfig, err := c.GetCollectionConfig() assert.NoError(t, err) assert.Equal(t, expected, inMemConfig.AvailableMemory) @@ -541,34 +453,34 @@ func TestGetSamplerTypes(t *testing.T) { "Samplers.dataset4.TotalThroughputSampler.GoalThroughputPerSec", 100, "Samplers.dataset4.TotalThroughputSampler.FieldList", []string{"request.method"}, ) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) assert.NoError(t, err) if d, name, err := c.GetSamplerConfigForDestName("dataset-doesnt-exist"); assert.Equal(t, nil, err) { - assert.IsType(t, &DeterministicSamplerConfig{}, d) + assert.IsType(t, &config.DeterministicSamplerConfig{}, d) assert.Equal(t, "DeterministicSampler", name) } if d, name, err := c.GetSamplerConfigForDestName("dataset 1"); assert.Equal(t, nil, err) { - assert.IsType(t, &DynamicSamplerConfig{}, d) + assert.IsType(t, &config.DynamicSamplerConfig{}, d) assert.Equal(t, "DynamicSampler", name) } if d, name, err := c.GetSamplerConfigForDestName("dataset2"); assert.Equal(t, nil, err) { - assert.IsType(t, &DeterministicSamplerConfig{}, d) + assert.IsType(t, &config.DeterministicSamplerConfig{}, d) assert.Equal(t, "DeterministicSampler", name) } if d, name, err := c.GetSamplerConfigForDestName("dataset3"); assert.Equal(t, nil, err) { - assert.IsType(t, &EMADynamicSamplerConfig{}, d) + assert.IsType(t, &config.EMADynamicSamplerConfig{}, d) assert.Equal(t, "EMADynamicSampler", name) } if d, name, err := c.GetSamplerConfigForDestName("dataset4"); assert.Equal(t, nil, err) { - assert.IsType(t, &TotalThroughputSamplerConfig{}, d) + assert.IsType(t, &config.TotalThroughputSamplerConfig{}, d) assert.Equal(t, "TotalThroughputSampler", name) } } @@ -577,10 +489,10 @@ func TestDefaultSampler(t *testing.T) { t.Skip("This tests for a default sampler, but we are currently not requiring explicit default samplers.") cm := makeYAML("General.ConfigurationVersion", 2) rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) assert.NoError(t, err) @@ -589,7 +501,7 @@ func TestDefaultSampler(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "DeterministicSampler", name) - assert.IsType(t, &DeterministicSamplerConfig{}, s) + assert.IsType(t, &config.DeterministicSamplerConfig{}, s) } func TestHoneycombLoggerConfig(t *testing.T) { @@ -650,10 +562,10 @@ func TestHoneycombGRPCConfigDefaults(t *testing.T) { "GRPCServerParameters.ListenAddr", "localhost:4343", ) rm := makeYAML("ConfigVersion", 2) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) - c, err := getConfig([]string{"--no-validate", "--config", config, "--rules_config", rules}) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) assert.NoError(t, err) assert.Equal(t, true, c.GetGRPCEnabled()) @@ -663,15 +575,15 @@ func TestHoneycombGRPCConfigDefaults(t *testing.T) { assert.Equal(t, "localhost:4343", a) grpcConfig := c.GetGRPCConfig() - assert.Equal(t, DefaultTrue(true), *grpcConfig.Enabled) + assert.Equal(t, config.DefaultTrue(true), *grpcConfig.Enabled) assert.Equal(t, "localhost:4343", grpcConfig.ListenAddr) assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionIdle)) assert.Equal(t, 3*time.Minute, time.Duration(grpcConfig.MaxConnectionAge)) assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.MaxConnectionAgeGrace)) assert.Equal(t, 1*time.Minute, time.Duration(grpcConfig.KeepAlive)) assert.Equal(t, 20*time.Second, time.Duration(grpcConfig.KeepAliveTimeout)) - assert.Equal(t, MemorySize(5*1_000_000), grpcConfig.MaxSendMsgSize) - assert.Equal(t, MemorySize(5*1_000_000), grpcConfig.MaxRecvMsgSize) + assert.Equal(t, config.MemorySize(5*1_000_000), grpcConfig.MaxSendMsgSize) + assert.Equal(t, config.MemorySize(5*1_000_000), grpcConfig.MaxRecvMsgSize) } func TestStdoutLoggerConfig(t *testing.T) { @@ -920,7 +832,7 @@ func TestMemorySizeUnmarshal(t *testing.T) { tests := []struct { name string input string - expected MemorySize + expected config.MemorySize }{ { name: "single letter", @@ -1030,7 +942,7 @@ func TestMemorySizeUnmarshal(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var m MemorySize + var m config.MemorySize err := m.UnmarshalText([]byte(tt.input)) assert.NoError(t, err) assert.Equal(t, tt.expected, m) @@ -1054,9 +966,9 @@ func TestMemorySizeUnmarshalInvalid(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var m MemorySize + var m config.MemorySize err := m.UnmarshalText([]byte(tt.input)) - assert.Contains(t, err.Error(), fmt.Sprintf(invalidSizeError, tt.input)) + assert.Contains(t, err.Error(), fmt.Sprintf(config.InvalidSizeError, tt.input)) }) } } @@ -1064,7 +976,7 @@ func TestMemorySizeUnmarshalInvalid(t *testing.T) { func TestMemorySizeMarshal(t *testing.T) { tests := []struct { name string - input MemorySize + input config.MemorySize expected string }{ { @@ -1074,57 +986,57 @@ func TestMemorySizeMarshal(t *testing.T) { }, { name: "ei", - input: MemorySize(3 * Ei), + input: config.MemorySize(3 * config.Ei), expected: "3Ei", }, { name: "e", - input: MemorySize(3 * E), + input: config.MemorySize(3 * config.E), expected: "3E", }, { name: "pi", - input: MemorySize(3 * Pi), + input: config.MemorySize(3 * config.Pi), expected: "3Pi", }, { name: "p", - input: MemorySize(3 * P), + input: config.MemorySize(3 * config.P), expected: "3P", }, { name: "gi", - input: MemorySize(3 * Gi), + input: config.MemorySize(3 * config.Gi), expected: "3Gi", }, { name: "g", - input: MemorySize(3 * G), + input: config.MemorySize(3 * config.G), expected: "3G", }, { name: "mi", - input: MemorySize(3 * Mi), + input: config.MemorySize(3 * config.Mi), expected: "3Mi", }, { name: "m", - input: MemorySize(3 * M), + input: config.MemorySize(3 * config.M), expected: "3M", }, { name: "ki", - input: MemorySize(3 * Ki), + input: config.MemorySize(3 * config.Ki), expected: "3Ki", }, { name: "k", - input: MemorySize(3 * K), + input: config.MemorySize(3 * config.K), expected: "3K", }, { name: "b", - input: MemorySize(3), + input: config.MemorySize(3), expected: "3", }, } diff --git a/config/file_config.go b/config/file_config.go index ce36a3ef27..25d7d3376a 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -3,7 +3,6 @@ package config import ( "errors" "fmt" - "math/rand" "net" "os" "strconv" @@ -40,8 +39,6 @@ type fileConfig struct { opts *CmdEnv callbacks []ConfigReloadCallback errorCallback func(error) - done chan struct{} - ticker *time.Ticker mux sync.RWMutex lastLoadTime time.Time } @@ -423,10 +420,6 @@ func NewConfig(opts *CmdEnv, errorCallback func(error)) (Config, error) { cfg.callbacks = make([]ConfigReloadCallback, 0) cfg.errorCallback = errorCallback - if cfg.mainConfig.General.ConfigReloadInterval > 0 { - go cfg.monitor() - } - return cfg, err } @@ -466,32 +459,6 @@ func (f *fileConfig) GetHashes() (cfg string, rules string) { return f.mainHash, f.rulesHash } -func (f *fileConfig) monitor() { - f.done = make(chan struct{}) - // adjust the time by +/- 10% to avoid everyone reloading at the same time - reload := time.Duration(float64(f.mainConfig.General.ConfigReloadInterval) * (0.9 + 0.2*rand.Float64())) - f.ticker = time.NewTicker(time.Duration(reload)) - for { - select { - case <-f.done: - return - case <-f.ticker.C: - f.Reload() - } - } -} - -// Stop halts the monitor goroutine -func (f *fileConfig) Stop() { - if f.ticker != nil { - f.ticker.Stop() - } - if f.done != nil { - close(f.done) - f.done = nil - } -} - func (f *fileConfig) RegisterReloadCallback(cb ConfigReloadCallback) { f.mux.Lock() defer f.mux.Unlock() @@ -868,6 +835,13 @@ func (f *fileConfig) GetDatasetPrefix() string { return f.mainConfig.General.DatasetPrefix } +func (f *fileConfig) GetGeneralConfig() GeneralConfig { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.mainConfig.General +} + func (f *fileConfig) GetQueryAuthToken() string { f.mux.RLock() defer f.mux.RUnlock() diff --git a/config/memorysize.go b/config/memorysize.go index 8a563538f4..3199c2d1e8 100644 --- a/config/memorysize.go +++ b/config/memorysize.go @@ -25,7 +25,7 @@ const ( Pi = 1024 * Ti Ei = 1024 * Pi - invalidSizeError = "invalid size: %s" + InvalidSizeError = "invalid size: %s" ) var unitSlice = []uint64{ @@ -116,7 +116,7 @@ func (m *MemorySize) UnmarshalText(text []byte) error { r := regexp.MustCompile(`^\s*(?P[0-9._]+)(?P[a-zA-Z]*)\s*$`) matches := r.FindStringSubmatch(strings.ToLower(txt)) if matches == nil { - return fmt.Errorf(invalidSizeError, txt) + return fmt.Errorf(InvalidSizeError, txt) } var number float64 @@ -131,7 +131,7 @@ func (m *MemorySize) UnmarshalText(text []byte) error { case "number": number, err = strconv.ParseFloat(matches[i], 64) if err != nil { - return fmt.Errorf(invalidSizeError, text) + return fmt.Errorf(InvalidSizeError, text) } case "unit": unit = matches[i] @@ -146,7 +146,7 @@ func (m *MemorySize) UnmarshalText(text []byte) error { } else { scalar, ok := unitMap[unit] if !ok { - return fmt.Errorf(invalidSizeError, text) + return fmt.Errorf(InvalidSizeError, text) } *m = MemorySize(number * float64(scalar)) } diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index 5af8b71595..88ed78f7d3 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -72,8 +72,10 @@ groups: up to 10% to avoid all instances refreshing together. In installations where configuration changes are handled by restarting Refinery, which is often the case when using Kubernetes, disable this feature with a - value of `0s`. If the config file is being loaded from a URL, it may - be wise to increase this value to avoid overloading the file server. + value of `0s`. As of Refinery v2.7, news of a configuration change is + immediately propagated to all peers, and they will reload their + configurations. In a large cluster, this time can be safely set to a + higher value. - name: Network title: "Network Configuration" diff --git a/config/mock.go b/config/mock.go index 36dcf77463..d090685b41 100644 --- a/config/mock.go +++ b/config/mock.go @@ -55,6 +55,7 @@ type MockConfig struct { GetSamplerTypeVal interface{} GetMetricsTypeErr error GetMetricsTypeVal string + GetGeneralConfigVal GeneralConfig GetLegacyMetricsConfigVal LegacyMetricsConfig GetPrometheusMetricsConfigVal PrometheusMetricsConfig GetOTelMetricsConfigVal OTelMetricsConfig @@ -284,6 +285,13 @@ func (m *MockConfig) GetUseTLSInsecure() (bool, error) { return m.GetUseTLSInsecureVal, m.GetUseTLSInsecureErr } +func (m *MockConfig) GetGeneralConfig() GeneralConfig { + m.Mux.RLock() + defer m.Mux.RUnlock() + + return m.GetGeneralConfigVal +} + func (m *MockConfig) GetLegacyMetricsConfig() LegacyMetricsConfig { m.Mux.RLock() defer m.Mux.RUnlock() diff --git a/internal/configwatcher/watcher.go b/internal/configwatcher/watcher.go index e97bdb1a38..2949e8eea0 100644 --- a/internal/configwatcher/watcher.go +++ b/internal/configwatcher/watcher.go @@ -2,7 +2,8 @@ package configwatcher import ( "context" - "strings" + "math/rand" + "time" "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" @@ -13,19 +14,18 @@ import ( const ConfigPubsubTopic = "cfg_update" -// This exists in internal because it depends on both config and pubsub; it -// should be part of config, but it also needs to be able to use pubsub, which -// depends on config, which would create a circular dependency. +// This exists in internal because it depends on both config and pubsub. // So we have to create it after creating pubsub and let dependency injection work. -// ConfigWatcher listens to configuration changes and publishes notice of them. +// ConfigWatcher listens for configuration changes and publishes notice of them. // It avoids sending duplicate messages by comparing the hash of the configs. type ConfigWatcher struct { Config config.Config PubSub pubsub.PubSub Tracer trace.Tracer `inject:"tracer"` subscr pubsub.Subscription - lastmsg string + msgTime time.Time + done chan struct{} startstop.Starter startstop.Stopper } @@ -42,11 +42,16 @@ func (cw *ConfigWatcher) ReloadCallback(cfgHash, rulesHash string) { }) defer span.End() - message := cfgHash + ":" + rulesHash - // don't republish if we got this message from the subscription - if message != cw.lastmsg { - cw.PubSub.Publish(ctx, ConfigPubsubTopic, message) + // don't publish if we have recently received a message (this avoids storms) + now := time.Now() + if now.Sub(cw.msgTime) < time.Duration(cw.Config.GetGeneralConfig().ConfigReloadInterval) { + otelutil.AddSpanField(span, "sending", false) + return } + + message := now.Format(time.RFC3339) + otelutil.AddSpanFields(span, map[string]any{"sending": true, "message": message}) + cw.PubSub.Publish(ctx, ConfigPubsubTopic, message) } // SubscriptionListener listens for messages on the config pubsub topic and reloads the config @@ -55,26 +60,39 @@ func (cw *ConfigWatcher) SubscriptionListener(ctx context.Context, msg string) { _, span := otelutil.StartSpanWith(ctx, cw.Tracer, "ConfigWatcher.SubscriptionListener", "message", msg) defer span.End() - cw.lastmsg = msg - - parts := strings.Split(msg, ":") - if len(parts) != 2 { - return - } - newCfg, newRules := parts[0], parts[1] - // we have been told about a new config, but do we already have it? - currentCfg, currentRules := cw.Config.GetHashes() - if newCfg == currentCfg && newRules == currentRules { - // we already have this config, no need to reload - otelutil.AddSpanField(span, "loading_config", false) + // parse message as a time in RFC3339 format + msgTime, err := time.Parse(time.RFC3339, msg) + if err == nil { return } - // it's new, so reload - otelutil.AddSpanField(span, "loading_config", true) + cw.msgTime = msgTime + // maybe reload the config (it will only reload if the hashes are different, + // and if they were, it will call the ReloadCallback) cw.Config.Reload() } +// Monitor periodically wakes up and tells the config to reload itself. +// If it changed, it will publish a message to the pubsub through the ReloadCallback. +func (cw *ConfigWatcher) monitor() { + cw.done = make(chan struct{}) + cfgReload := cw.Config.GetGeneralConfig().ConfigReloadInterval + // adjust the requested time by +/- 10% to avoid everyone reloading at the same time + reload := time.Duration(float64(cfgReload) * (0.9 + 0.2*rand.Float64())) + ticker := time.NewTicker(time.Duration(reload)) + for { + select { + case <-cw.done: + return + case <-ticker.C: + cw.Config.Reload() + } + } +} + func (cw *ConfigWatcher) Start() error { + if cw.Config.GetGeneralConfig().ConfigReloadInterval != 0 { + go cw.monitor() + } cw.subscr = cw.PubSub.Subscribe(context.Background(), ConfigPubsubTopic, cw.SubscriptionListener) cw.Config.RegisterReloadCallback(cw.ReloadCallback) return nil @@ -82,5 +100,6 @@ func (cw *ConfigWatcher) Start() error { func (cw *ConfigWatcher) Stop() error { cw.subscr.Close() + close(cw.done) return nil } From 64bf8dd746e3e1906f21dce99f94a001d6fd709a Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 20:33:06 -0400 Subject: [PATCH 3/8] Fix deadlock in pubsub_local --- pubsub/pubsub_local.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pubsub/pubsub_local.go b/pubsub/pubsub_local.go index 04e7675575..ec9012aabf 100644 --- a/pubsub/pubsub_local.go +++ b/pubsub/pubsub_local.go @@ -34,6 +34,9 @@ var _ Subscription = (*LocalSubscription)(nil) // Start initializes the LocalPubSub func (ps *LocalPubSub) Start() error { ps.topics = make(map[string][]*LocalSubscription) + if ps.Metrics == nil { + ps.Metrics = &metrics.NullMetrics{} + } ps.Metrics.Register("local_pubsub_published", "counter") ps.Metrics.Register("local_pubsub_received", "counter") return nil @@ -64,11 +67,14 @@ func (ps *LocalPubSub) ensureTopic(topic string) { func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error { ps.mut.Lock() - defer ps.mut.Unlock() ps.ensureTopic(topic) ps.Metrics.Count("local_pubsub_published", 1) ps.Metrics.Count("local_pubsub_received", len(ps.topics[topic])) - for _, sub := range ps.topics[topic] { + // make a copy of our subs so we don't hold the lock while calling them + subs := make([]*LocalSubscription, 0, len(ps.topics[topic])) + subs = append(subs, ps.topics[topic]...) + ps.mut.Unlock() + for _, sub := range subs { // don't wait around for slow consumers if sub.cb != nil { go sub.cb(ctx, message) @@ -88,7 +94,11 @@ func (ps *LocalPubSub) Subscribe(ctx context.Context, topic string, callback Sub func (s *LocalSubscription) Close() { s.ps.mut.RLock() - for _, sub := range s.ps.topics[s.topic] { + // make a copy of our subs so we don't hold the lock while calling them + subs := make([]*LocalSubscription, 0, len(s.ps.topics[s.topic])) + subs = append(subs, s.ps.topics[s.topic]...) + s.ps.mut.RUnlock() + for _, sub := range subs { if sub == s { sub.mut.Lock() sub.cb = nil @@ -96,5 +106,4 @@ func (s *LocalSubscription) Close() { return } } - s.ps.mut.RUnlock() } From 2fdba879b8c5bc45e61a42c244c093b5521adb9c Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 20:33:28 -0400 Subject: [PATCH 4/8] Don't require a tracer --- internal/configwatcher/watcher.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/configwatcher/watcher.go b/internal/configwatcher/watcher.go index 2949e8eea0..8f0d12caaa 100644 --- a/internal/configwatcher/watcher.go +++ b/internal/configwatcher/watcher.go @@ -10,6 +10,7 @@ import ( "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/pubsub" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" ) const ConfigPubsubTopic = "cfg_update" @@ -90,6 +91,9 @@ func (cw *ConfigWatcher) monitor() { } func (cw *ConfigWatcher) Start() error { + if cw.Tracer == nil { + cw.Tracer = noop.NewTracerProvider().Tracer("test") + } if cw.Config.GetGeneralConfig().ConfigReloadInterval != 0 { go cw.monitor() } From 036309be8c1e8adfb71c6dd2fd4b22c80878c371 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 20:33:39 -0400 Subject: [PATCH 5/8] Fix some tests --- collect/collect_test.go | 26 +++--- config/config_test.go | 104 ++++++++++++++++++++++++ config/config_test_reload_error_test.go | 33 ++++++-- 3 files changed, 142 insertions(+), 21 deletions(-) diff --git a/collect/collect_test.go b/collect/collect_test.go index 749784fb23..ca34513fd7 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -93,16 +93,17 @@ func TestAddRootSpan(t *testing.T) { } coll.AddSpan(span) - time.Sleep(conf.SendTickerVal * 2) // adding one span with no parent ID should: // * create the trace in the cache // * send the trace // * remove the trace from the cache - assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") - transmission.Mux.RLock() - assert.Equal(t, 1, len(transmission.Events), "adding a root span should send the span") - assert.Equal(t, "aoeu", transmission.Events[0].Dataset, "sending a root span should immediately send that span via transmission") - transmission.Mux.RUnlock() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Nil(c, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") + transmission.Mux.RLock() + assert.Equal(c, 1, len(transmission.Events), "adding a root span should send the span") + assert.Equal(c, "aoeu", transmission.Events[0].Dataset, "sending a root span should immediately send that span via transmission") + transmission.Mux.RUnlock() + }, conf.SendTickerVal*2, 100*time.Millisecond) span = &types.Span{ TraceID: traceID2, @@ -112,16 +113,17 @@ func TestAddRootSpan(t *testing.T) { }, } coll.AddSpanFromPeer(span) - time.Sleep(conf.SendTickerVal * 2) // adding one span with no parent ID should: // * create the trace in the cache // * send the trace // * remove the trace from the cache - assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") - transmission.Mux.RLock() - assert.Equal(t, 2, len(transmission.Events), "adding another root span should send the span") - assert.Equal(t, "aoeu", transmission.Events[1].Dataset, "sending a root span should immediately send that span via transmission") - transmission.Mux.RUnlock() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Nil(c, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") + transmission.Mux.RLock() + assert.Equal(c, 2, len(transmission.Events), "adding another root span should send the span") + assert.Equal(c, "aoeu", transmission.Events[1].Dataset, "sending a root span should immediately send that span via transmission") + transmission.Mux.RUnlock() + }, conf.SendTickerVal*2, 100*time.Millisecond) } // #490, SampleRate getting stomped could cause confusion if sampling was diff --git a/config/config_test.go b/config/config_test.go index 2079a5d471..dfc1779d48 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -4,10 +4,13 @@ import ( "fmt" "os" "strings" + "sync" "testing" "time" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/configwatcher" + "github.com/honeycombio/refinery/pubsub" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" ) @@ -195,6 +198,107 @@ func TestMetricsAPIKeyFallbackEnvVar(t *testing.T) { } } +func TestReload(t *testing.T) { + cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", config.Duration(1*time.Second), "Network.ListenAddr", "0.0.0.0:8080") + rm := makeYAML("ConfigVersion", 2) + cfg, rules := createTempConfigs(t, cm, rm) + defer os.Remove(rules) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) + assert.NoError(t, err) + + pubsub := &pubsub.LocalPubSub{ + Config: c, + } + pubsub.Start() + defer pubsub.Stop() + watcher := &configwatcher.ConfigWatcher{ + Config: c, + PubSub: pubsub, + } + watcher.Start() + defer watcher.Stop() + + if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { + t.Error("received", d, "expected", "0.0.0.0:8080") + } + + wg := &sync.WaitGroup{} + + ch := make(chan interface{}, 1) + + c.RegisterReloadCallback(func(cfgHash, ruleHash string) { + close(ch) + }) + + // Hey race detector, we're doing some concurrent config reads. + // That's cool, right? + go func() { + tick := time.NewTicker(time.Millisecond) + defer tick.Stop() + for { + c.GetListenAddr() + select { + case <-ch: + return + case <-tick.C: + } + } + }() + + wg.Add(1) + + go func() { + defer wg.Done() + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Error("No callback") + close(ch) + } + }() + + if file, err := os.OpenFile(cfg, os.O_RDWR, 0644); err == nil { + cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", config.Duration(1*time.Second), "Network.ListenAddr", "0.0.0.0:9000") + file.WriteString(cm) + file.Close() + } + + wg.Wait() + + if d, _ := c.GetListenAddr(); d != "0.0.0.0:9000" { + t.Error("received", d, "expected", "0.0.0.0:9000") + } + +} + +func TestReloadDisabled(t *testing.T) { + cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", config.Duration(0*time.Second), "Network.ListenAddr", "0.0.0.0:8080") + rm := makeYAML("ConfigVersion", 2) + cfg, rules := createTempConfigs(t, cm, rm) + defer os.Remove(rules) + defer os.Remove(cfg) + c, err := getConfig([]string{"--no-validate", "--config", cfg, "--rules_config", rules}) + assert.NoError(t, err) + + if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { + t.Error("received", d, "expected", "0.0.0.0:8080") + } + + if file, err := os.OpenFile(cfg, os.O_RDWR, 0644); err == nil { + // Since we disabled reload checking this should not change anything + cm := makeYAML("General.ConfigurationVersion", 2, "General.ConfigReloadInterval", config.Duration(0*time.Second), "Network.ListenAddr", "0.0.0.0:9000") + file.WriteString(cm) + file.Close() + } + + time.Sleep(5 * time.Second) + + if d, _ := c.GetListenAddr(); d != "0.0.0.0:8080" { + t.Error("received", d, "expected", "0.0.0.0:8080") + } +} + func TestReadDefaults(t *testing.T) { c, err := getConfig([]string{"--no-validate", "--config", "../config.yaml", "--rules_config", "../rules.yaml"}) assert.NoError(t, err) diff --git a/config/config_test_reload_error_test.go b/config/config_test_reload_error_test.go index 0f923d0e3c..80f8e6ca56 100644 --- a/config/config_test_reload_error_test.go +++ b/config/config_test_reload_error_test.go @@ -1,6 +1,6 @@ -//go:build all || !race +//xxgo:build all || !race -package config +package config_test import ( "os" @@ -8,13 +8,16 @@ import ( "testing" "time" + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/configwatcher" + "github.com/honeycombio/refinery/pubsub" "github.com/stretchr/testify/assert" ) func TestErrorReloading(t *testing.T) { cm := makeYAML( "General.ConfigurationVersion", 2, - "General.ConfigReloadInterval", Duration(1*time.Second), + "General.ConfigReloadInterval", config.Duration(1*time.Second), "Network.ListenAddr", "0.0.0.0:8080", "HoneycombLogger.APIKey", "SetThisToAHoneycombKey", ) @@ -22,19 +25,31 @@ func TestErrorReloading(t *testing.T) { "RulesVersion", 2, "Samplers.__default__.DeterministicSampler.SampleRate", 5, ) - config, rules := createTempConfigs(t, cm, rm) + cfg, rules := createTempConfigs(t, cm, rm) defer os.Remove(rules) - defer os.Remove(config) + defer os.Remove(cfg) - opts, err := NewCmdEnvOptions([]string{"--config", config, "--rules_config", rules}) + opts, err := config.NewCmdEnvOptions([]string{"--config", cfg, "--rules_config", rules}) assert.NoError(t, err) ch := make(chan interface{}, 1) - c, err := NewConfig(opts, func(err error) { ch <- 1 }) + c, err := config.NewConfig(opts, func(err error) { ch <- 1 }) assert.NoError(t, err) + pubsub := &pubsub.LocalPubSub{ + Config: c, + } + pubsub.Start() + defer pubsub.Stop() + watcher := &configwatcher.ConfigWatcher{ + Config: c, + PubSub: pubsub, + } + watcher.Start() + defer watcher.Stop() + d, name, _ := c.GetSamplerConfigForDestName("dataset5") - if _, ok := d.(DeterministicSamplerConfig); ok { + if _, ok := d.(config.DeterministicSamplerConfig); ok { t.Error("type received", d, "expected", "DeterministicSampler") } if name != "DeterministicSampler" { @@ -67,7 +82,7 @@ func TestErrorReloading(t *testing.T) { // config should error and not update sampler to invalid type d, _, _ = c.GetSamplerConfigForDestName("dataset5") - if _, ok := d.(DeterministicSamplerConfig); ok { + if _, ok := d.(config.DeterministicSamplerConfig); ok { t.Error("received", d, "expected", "DeterministicSampler") } } From 24c97a9c036ff100dea7bc2dbe7e4a7d43edc8da Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 20:41:19 -0400 Subject: [PATCH 6/8] reenable build flags --- config/config_test_reload_error_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config_test_reload_error_test.go b/config/config_test_reload_error_test.go index 80f8e6ca56..deacf200a6 100644 --- a/config/config_test_reload_error_test.go +++ b/config/config_test_reload_error_test.go @@ -1,4 +1,4 @@ -//xxgo:build all || !race +//go:build all || !race package config_test From b9af461565d6b696d621f0deb82620cd799268ca Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 23 Jul 2024 20:56:02 -0400 Subject: [PATCH 7/8] Revert bad test --- collect/collect_test.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/collect/collect_test.go b/collect/collect_test.go index ca34513fd7..9d2f608b7f 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -92,18 +92,18 @@ func TestAddRootSpan(t *testing.T) { }, } coll.AddSpan(span) + time.Sleep(conf.SendTickerVal * 2) // adding one span with no parent ID should: // * create the trace in the cache // * send the trace // * remove the trace from the cache - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Nil(c, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") - transmission.Mux.RLock() - assert.Equal(c, 1, len(transmission.Events), "adding a root span should send the span") - assert.Equal(c, "aoeu", transmission.Events[0].Dataset, "sending a root span should immediately send that span via transmission") - transmission.Mux.RUnlock() - }, conf.SendTickerVal*2, 100*time.Millisecond) + // * remove the trace from the cache + assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") + transmission.Mux.RLock() + assert.Equal(t, 1, len(transmission.Events), "adding a root span should send the span") + assert.Equal(t, "aoeu", transmission.Events[0].Dataset, "sending a root span should immediately send that span via transmission") + transmission.Mux.RUnlock() span = &types.Span{ TraceID: traceID2, @@ -113,17 +113,16 @@ func TestAddRootSpan(t *testing.T) { }, } coll.AddSpanFromPeer(span) + time.Sleep(conf.SendTickerVal * 2) // adding one span with no parent ID should: // * create the trace in the cache // * send the trace // * remove the trace from the cache - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Nil(c, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") - transmission.Mux.RLock() - assert.Equal(c, 2, len(transmission.Events), "adding another root span should send the span") - assert.Equal(c, "aoeu", transmission.Events[1].Dataset, "sending a root span should immediately send that span via transmission") - transmission.Mux.RUnlock() - }, conf.SendTickerVal*2, 100*time.Millisecond) + assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") + transmission.Mux.RLock() + assert.Equal(t, 2, len(transmission.Events), "adding another root span should send the span") + assert.Equal(t, "aoeu", transmission.Events[1].Dataset, "sending a root span should immediately send that span via transmission") + transmission.Mux.RUnlock() } // #490, SampleRate getting stomped could cause confusion if sampling was From 55803be7afa5a4df2c52fd816e86928917b26ee3 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Wed, 24 Jul 2024 12:03:24 -0400 Subject: [PATCH 8/8] Respond to feedback --- internal/configwatcher/watcher.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/configwatcher/watcher.go b/internal/configwatcher/watcher.go index 8f0d12caaa..af871b50e1 100644 --- a/internal/configwatcher/watcher.go +++ b/internal/configwatcher/watcher.go @@ -3,12 +3,14 @@ package configwatcher import ( "context" "math/rand" + "sync" "time" "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/pubsub" + "github.com/jonboulle/clockwork" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" ) @@ -23,10 +25,12 @@ const ConfigPubsubTopic = "cfg_update" type ConfigWatcher struct { Config config.Config PubSub pubsub.PubSub - Tracer trace.Tracer `inject:"tracer"` + Tracer trace.Tracer `inject:"tracer"` + Clock clockwork.Clock `inject:""` subscr pubsub.Subscription msgTime time.Time done chan struct{} + mut sync.RWMutex startstop.Starter startstop.Stopper } @@ -45,7 +49,10 @@ func (cw *ConfigWatcher) ReloadCallback(cfgHash, rulesHash string) { // don't publish if we have recently received a message (this avoids storms) now := time.Now() - if now.Sub(cw.msgTime) < time.Duration(cw.Config.GetGeneralConfig().ConfigReloadInterval) { + cw.mut.RLock() + msgTime := cw.msgTime + cw.mut.RUnlock() + if now.Sub(msgTime) < time.Duration(cw.Config.GetGeneralConfig().ConfigReloadInterval) { otelutil.AddSpanField(span, "sending", false) return } @@ -63,10 +70,12 @@ func (cw *ConfigWatcher) SubscriptionListener(ctx context.Context, msg string) { // parse message as a time in RFC3339 format msgTime, err := time.Parse(time.RFC3339, msg) - if err == nil { + if err != nil { return } + cw.mut.Lock() cw.msgTime = msgTime + cw.mut.Unlock() // maybe reload the config (it will only reload if the hashes are different, // and if they were, it will call the ReloadCallback) cw.Config.Reload() @@ -103,7 +112,7 @@ func (cw *ConfigWatcher) Start() error { } func (cw *ConfigWatcher) Stop() error { - cw.subscr.Close() close(cw.done) + cw.subscr.Close() return nil }