diff --git a/CHANGELOG.md b/CHANGELOG.md index f5b69577d..fc1f50164 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,7 @@ For more details on the alerting system see the full documentation [here](https: - [#1369](https://github.com/influxdata/kapacitor/issues/1369): Fix panic with concurrent writes to same points in state tracking nodes. - [#1387](https://github.com/influxdata/kapacitor/pull/1387): static-discovery configuration simplified - [#1378](https://github.com/influxdata/kapacitor/issues/1378): Fix panic in InfluxQL node with missing field. +- [#1392](https://github.com/influxdata/kapacitor/pull/1392): Fix possible deadlock for scraper configuration updating. ## v1.3.0-rc2 [2017-05-11] diff --git a/services/scraper/service.go b/services/scraper/service.go index b6a114855..f2e7dfc3d 100644 --- a/services/scraper/service.go +++ b/services/scraper/service.go @@ -5,6 +5,7 @@ import ( "log" "math" "sync" + "sync/atomic" "github.com/influxdata/influxdb/models" "github.com/prometheus/common/model" @@ -31,7 +32,7 @@ type Service struct { closing chan struct{} updating chan *config.Config - configs []Config + configs atomic.Value // []Config logger *log.Logger @@ -49,9 +50,9 @@ type Service struct { // NewService creates a new scraper service func NewService(c []Config, l *log.Logger) *Service { s := &Service{ - configs: c, - logger: l, + logger: l, } + s.storeConfigs(c) s.mgr = retrieval.NewTargetManager(s, NewLogger(l)) return s } @@ -91,6 +92,14 @@ func (s *Service) Close() error { return nil } +func (s *Service) loadConfigs() []Config { + return s.configs.Load().([]Config) +} + +func (s *Service) storeConfigs(c []Config) { + s.configs.Store(c) +} + func (s *Service) scrape() { s.wg.Add(1) go func() { @@ -153,25 +162,17 @@ func (s *Service) Append(sample *model.Sample) error { tags[string(name)] = string(value) } - blacklist := func() bool { - s.mu.Lock() - defer s.mu.Unlock() - // If instance is blacklisted then do not send to PointsWriter - if instance, ok := tags["instance"]; ok { - for i := range s.configs { - if s.configs[i].Name == job { - for _, listed := range s.configs[i].Blacklist { - if instance == listed { - return true - } + // If instance is blacklisted then do not send to PointsWriter + if instance, ok := tags["instance"]; ok { + for _, c := range s.loadConfigs() { + if c.Name == job { + for _, listed := range c.Blacklist { + if instance == listed { + return nil } } } } - return false - } - if blacklist() { - return nil } fields := models.Fields{ @@ -220,7 +221,7 @@ func (s *Service) Update(newConfigs []interface{}) error { } } - s.configs = configs + s.storeConfigs(configs) if s.open { pairs := s.pairs() conf := s.prom(pairs) @@ -302,7 +303,9 @@ func (s *Service) AddScrapers(scrapers []Config) { return } - s.configs = append(s.configs, scrapers...) + configs := s.loadConfigs() + configs = append(configs, scrapers...) + s.storeConfigs(configs) } // RemoveScrapers removes scrapers from the registry @@ -313,19 +316,21 @@ func (s *Service) RemoveScrapers(scrapers []Config) { return } + configs := s.loadConfigs()[0:0] for _, rm := range scrapers { - for i, c := range s.configs { - if c.Name == rm.Name { - s.configs = append(s.configs[:i], s.configs[i+1:]...) + for _, c := range configs { + if c.Name != rm.Name { + configs = append(configs, c) } } } + s.storeConfigs(configs) } // pairs returns all named pairs of scrapers and discoverers from registry must be locked func (s *Service) pairs() []Pair { pairs := []Pair{} - for _, scr := range s.configs { + for _, scr := range s.loadConfigs() { if !scr.Enabled { continue }