Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock if Append happens at same time as Commit #1392

Merged
merged 3 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ For more details on the alerting system see the full documentation [here](https:
- [#1369](https://github.com/influxdata/kapacitor/issues/1369): Fix panic with concurrent writes to same points in state tracking nodes.
- [#1387](https://github.com/influxdata/kapacitor/pull/1387): static-discovery configuration simplified
- [#1378](https://github.com/influxdata/kapacitor/issues/1378): Fix panic in InfluxQL node with missing field.
- [#1392](https://github.com/influxdata/kapacitor/pull/1392): Fix possible deadlock for scraper configuration updating.

## v1.3.0-rc2 [2017-05-11]

Expand Down
53 changes: 29 additions & 24 deletions services/scraper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"math"
"sync"
"sync/atomic"

"github.com/influxdata/influxdb/models"
"github.com/prometheus/common/model"
Expand All @@ -31,7 +32,7 @@ type Service struct {
closing chan struct{}
updating chan *config.Config

configs []Config
configs atomic.Value // []Config

logger *log.Logger

Expand All @@ -49,9 +50,9 @@ type Service struct {
// NewService creates a new scraper service
func NewService(c []Config, l *log.Logger) *Service {
s := &Service{
configs: c,
logger: l,
logger: l,
}
s.storeConfigs(c)
s.mgr = retrieval.NewTargetManager(s, NewLogger(l))
return s
}
Expand Down Expand Up @@ -91,6 +92,14 @@ func (s *Service) Close() error {
return nil
}

func (s *Service) loadConfigs() []Config {
return s.configs.Load().([]Config)
}

func (s *Service) storeConfigs(c []Config) {
s.configs.Store(c)
}

func (s *Service) scrape() {
s.wg.Add(1)
go func() {
Expand Down Expand Up @@ -153,25 +162,17 @@ func (s *Service) Append(sample *model.Sample) error {
tags[string(name)] = string(value)
}

blacklist := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
// If instance is blacklisted then do not send to PointsWriter
if instance, ok := tags["instance"]; ok {
for i := range s.configs {
if s.configs[i].Name == job {
for _, listed := range s.configs[i].Blacklist {
if instance == listed {
return true
}
// If instance is blacklisted then do not send to PointsWriter
if instance, ok := tags["instance"]; ok {
for _, c := range s.loadConfigs() {
if c.Name == job {
for _, listed := range c.Blacklist {
if instance == listed {
return nil
}
}
}
}
return false
}
if blacklist() {
return nil
}

fields := models.Fields{
Expand Down Expand Up @@ -220,7 +221,7 @@ func (s *Service) Update(newConfigs []interface{}) error {
}
}

s.configs = configs
s.storeConfigs(configs)
if s.open {
pairs := s.pairs()
conf := s.prom(pairs)
Expand Down Expand Up @@ -302,7 +303,9 @@ func (s *Service) AddScrapers(scrapers []Config) {
return
}

s.configs = append(s.configs, scrapers...)
configs := s.loadConfigs()
configs = append(configs, scrapers...)
s.storeConfigs(configs)
}

// RemoveScrapers removes scrapers from the registry
Expand All @@ -313,19 +316,21 @@ func (s *Service) RemoveScrapers(scrapers []Config) {
return
}

configs := s.loadConfigs()[0:0]
for _, rm := range scrapers {
for i, c := range s.configs {
if c.Name == rm.Name {
s.configs = append(s.configs[:i], s.configs[i+1:]...)
for _, c := range configs {
if c.Name != rm.Name {
configs = append(configs, c)
}
}
}
s.storeConfigs(configs)
}

// pairs returns all named pairs of scrapers and discoverers from registry must be locked
func (s *Service) pairs() []Pair {
pairs := []Pair{}
for _, scr := range s.configs {
for _, scr := range s.loadConfigs() {
if !scr.Enabled {
continue
}
Expand Down