diff --git a/Gopkg.lock b/Gopkg.lock index 282256d2be..89b422cd7e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,4 +1,4 @@ -memo = "06a3a52907020306e440b21b8c0f10f03e60b56b704491b8bac8305456fd84ec" +memo = "0323eb5f9e25a377fbe1e582475afb7ca5945b434cb770f2dad8ea1fd51854e1" [[projects]] name = "cloud.google.com/go" @@ -378,10 +378,11 @@ memo = "06a3a52907020306e440b21b8c0f10f03e60b56b704491b8bac8305456fd84ec" revision = "6ac8c5d890d415025dd5aae7595bcb2a6e7e2fad" [[projects]] + branch = "go-discovery-logger" name = "github.com/prometheus/prometheus" packages = ["config","discovery","discovery/azure","discovery/consul","discovery/dns","discovery/ec2","discovery/file","discovery/gce","discovery/kubernetes","discovery/marathon","discovery/triton","discovery/zookeeper","relabel","retrieval","storage","storage/local","storage/local/chunk","storage/local/codable","storage/local/index","storage/metric","util/flock","util/httputil","util/strutil","util/testutil","util/treecache"] - revision = "4666df502c0e239ed4aa1d80abbbfb54f61b23c3" - version = "v1.6.1" + revision = "1d19e25cd38579f5b007906a119ddf76e3cd160b" + source = "github.com/goller/prometheus" [[projects]] name = "github.com/russross/blackfriday" diff --git a/Gopkg.toml b/Gopkg.toml index 0645e22a6d..87cba77754 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -28,6 +28,11 @@ required = ["github.com/benbjohnson/tmpl","github.com/golang/protobuf/protoc-gen branch = "master" name = "github.com/mitchellh/mapstructure" +[[dependencies]] + branch = "go-discovery-logger" + name = "github.com/prometheus/prometheus" + source = "github.com/goller/prometheus" + [[dependencies]] branch = "master" name = "github.com/shurcooL/markdownfmt" diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index ba4a50e261..a9415d4e81 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -452,3 +452,128 @@ default-retention-policy = "" batch-size = 1000 batch-pending = 5 batch-timeout = "1s" + +#[[scrapers]] +# enabled = false +# name = "myscraper" +# discoverer-id = "" +# discoverer-service = "" +# db = "mydb" +# rp = "myrp" +# type = "prometheus" +# scheme = "http" +# metrics-path = "/metrics" +# scrape-interval = "1m0s" +# scrape-timeout = "10s" +# username = "" +# password = "" +# bearer-token = "" +# ssl-ca = "" +# ssl-cert = "" +# ssl-key = "" +# ssl-server-name = "" +# insecure-skip-verify = false +# +#[[azure]] +# enabled = false +# id = "myazure" +# port = 80 +# subscription-id = "" +# tenant-id = "" +# client-id = "" +# client-secret = "" +# refresh-interval = "5m0s" +# +#[[consul]] +# enabled = false +# id = "myconsul" +# address = "127.0.0.1:8500" +# token = "" +# datacenter = "" +# tag-separator = "," +# scheme = "http" +# username = "" +# password = "" +# ssl-ca = "" +# ssl-cert = "" +# ssl-key = "" +# ssl-server-name = "" +# insecure-skip-verify = false +# +#[[dns]] +# enabled = false +# id = "mydns" +# refresh-interval = "30s" +# ## Type can be SRV, A, or AAAA +# type = "SRV" +# ## Port is the port to scrape for records returned by A or AAAA types +# port = 80 +# +#[[ec2]] +# enabled = false +# id = "myec2" +# region = "us-east-1" +# access-key = "" +# secret-key = "" +# profile = "" +# refresh-interval = "1m0s" +# port = 80 +# +#[[files]] +# enabled = false +# id = "myfile" +# refresh-interval = "5m0s" +# +#[[gce]] +# enabled = false +# id = "mygce" +# project = "" +# zone = "" +# filter = "" +# refresh-interval = "1m0s" +# port = 80 +# tag-separator = "," +# +#[[marathon]] +# enabled = false +# id = "mymarathon" +# timeout = "30s" +# refresh-interval = "30s" +# bearer-token = "" +# ssl-ca = "" +# ssl-cert = "" +# ssl-key = "" +# ssl-server-name = "" +# insecure-skip-verify = false +# +#[[nerve]] +# enabled = false +# id = "mynerve" +# timeout = "10s" +# +#[[serverset]] +# enabled = false +# id = "myserverset" +# timeout = "10s" +# +#[[static]] +# enabled = false +# id = "mystatic" +# targets = [] +# [static.labels] +# +#[[triton]] +# enabled = false +# id = "mytriton" +# account = "" +# dns-suffix = "" +# endpoint = "" +# port = 9163 +# refresh-interval = "1m0s" +# version = 1 +# ssl-ca = "" +# ssl-cert = "" +# ssl-key = "" +# ssl-server-name = "" +# insecure-skip-verify = false +# diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index 6339a7ba57..27cfcc1f39 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -183,7 +183,7 @@ type k8sScales struct { func (k k8sAutoscale) Versions() (k8s.APIVersions, error) { return k8s.APIVersions{}, nil } -func (k k8sAutoscale) Client() (k8s.Client, error) { +func (k k8sAutoscale) Client(string) (k8s.Client, error) { return k, nil } func (k k8sAutoscale) Scales(namespace string) k8s.ScalesInterface { diff --git a/k8s_autoscale.go b/k8s_autoscale.go index ac71686efe..12868706e4 100644 --- a/k8s_autoscale.go +++ b/k8s_autoscale.go @@ -44,7 +44,7 @@ type K8sAutoscaleNode struct { // Create a new K8sAutoscaleNode which can trigger autoscale event for a Kubernetes cluster. func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log.Logger) (*K8sAutoscaleNode, error) { - client, err := et.tm.K8sService.Client() + client, err := et.tm.K8sService.Client(n.Cluster) if err != nil { return nil, fmt.Errorf("cannot use the k8sAutoscale node, could not create kubernetes client: %v", err) } diff --git a/listmap/listmap.go b/listmap/listmap.go new file mode 100644 index 0000000000..397f78c7a5 --- /dev/null +++ b/listmap/listmap.go @@ -0,0 +1,58 @@ +package listmap + +import ( + "bytes" + "fmt" + "reflect" + + "github.com/BurntSushi/toml" + "github.com/pkg/errors" +) + +// DoUnmarshalTOML unmarshals either a list of maps or just a single map into dst. +// The argument dst must be a pointer to a slice. +func DoUnmarshalTOML(dst, src interface{}) error { + dstV := reflect.Indirect(reflect.ValueOf(dst)) + if !dstV.CanSet() { + return errors.New("dst must be settable") + } + dstK := dstV.Kind() + if dstK != reflect.Slice { + return errors.New("dst must be a slice") + } + + srcV := reflect.ValueOf(src) + srcK := srcV.Kind() + + var srvValues []reflect.Value + switch srcK { + case reflect.Slice: + l := srcV.Len() + srvValues = make([]reflect.Value, l) + for i := 0; i < l; i++ { + srvValues[i] = srcV.Index(i) + } + case reflect.Map: + srvValues = []reflect.Value{srcV} + default: + return fmt.Errorf("src must be a slice or map, got %v", srcK) + } + + // We want to preserve the TOML decoding behavior exactly, + // so we first re-encode the src data and then decode again, + // only this time directly into the element of the slice. + var buf bytes.Buffer + dstV.Set(reflect.MakeSlice(dstV.Type(), len(srvValues), len(srvValues))) + for i, v := range srvValues { + if err := toml.NewEncoder(&buf).Encode(v.Interface()); err != nil { + return errors.Wrap(err, "failed to reencode toml data") + } + newValue := reflect.New(dstV.Type().Elem()) + if _, err := toml.Decode(buf.String(), newValue.Interface()); err != nil { + return err + } + dstV.Index(i).Set(reflect.Indirect(newValue)) + buf.Reset() + } + return nil +} diff --git a/pipeline/k8s_autoscale.go b/pipeline/k8s_autoscale.go index acddf352ee..63f2dcde8a 100644 --- a/pipeline/k8s_autoscale.go +++ b/pipeline/k8s_autoscale.go @@ -79,6 +79,9 @@ const ( type K8sAutoscaleNode struct { chainnode + // Cluster is the name of the Kubernetes cluster to use. + Cluster string + // Namespace is the namespace of the resource, if empty the default namespace will be used. Namespace string @@ -141,6 +144,7 @@ type K8sAutoscaleNode struct { func newK8sAutoscaleNode(e EdgeType) *K8sAutoscaleNode { k := &K8sAutoscaleNode{ chainnode: newBasicChainNode("k8s_autoscale", e, StreamEdge), + Cluster: "default", Min: 1, Kind: client.DeploymentsKind, NamespaceTag: DefaultNamespaceTag, diff --git a/server/config.go b/server/config.go index 175d4094b4..035781ae75 100644 --- a/server/config.go +++ b/server/config.go @@ -13,27 +13,39 @@ import ( "github.com/influxdata/kapacitor/command" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/azure" "github.com/influxdata/kapacitor/services/config" + "github.com/influxdata/kapacitor/services/consul" "github.com/influxdata/kapacitor/services/deadman" + "github.com/influxdata/kapacitor/services/dns" + "github.com/influxdata/kapacitor/services/ec2" + "github.com/influxdata/kapacitor/services/file" + "github.com/influxdata/kapacitor/services/gce" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" "github.com/influxdata/kapacitor/services/logging" + "github.com/influxdata/kapacitor/services/marathon" + "github.com/influxdata/kapacitor/services/nerve" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/replay" "github.com/influxdata/kapacitor/services/reporting" + "github.com/influxdata/kapacitor/services/scraper" "github.com/influxdata/kapacitor/services/sensu" + "github.com/influxdata/kapacitor/services/serverset" "github.com/influxdata/kapacitor/services/slack" "github.com/influxdata/kapacitor/services/smtp" "github.com/influxdata/kapacitor/services/snmptrap" + "github.com/influxdata/kapacitor/services/static" "github.com/influxdata/kapacitor/services/stats" "github.com/influxdata/kapacitor/services/storage" "github.com/influxdata/kapacitor/services/talk" "github.com/influxdata/kapacitor/services/task_store" "github.com/influxdata/kapacitor/services/telegram" + "github.com/influxdata/kapacitor/services/triton" "github.com/influxdata/kapacitor/services/udf" "github.com/influxdata/kapacitor/services/udp" "github.com/influxdata/kapacitor/services/victorops" @@ -73,8 +85,22 @@ type Config struct { Telegram telegram.Config `toml:"telegram" override:"telegram"` VictorOps victorops.Config `toml:"victorops" override:"victorops"` + // Discovery for scraping + Scrapers []scraper.Config `toml:"scrapers" override:"scrapers,element-key=name"` + Azure []azure.Config `toml:"azure" override:"azure,element-key=id"` + Consul []consul.Config `toml:"consul" override:"consul,element-key=id"` + DNS []dns.Config `toml:"dns" override:"dns,element-key=id"` + EC2 []ec2.Config `toml:"ec2" override:"ec2,element-key=id"` + Files []file.Config `toml:"files" override:"files,element-key=id"` + GCE []gce.Config `toml:"gce" override:"gce,element-key=id"` + Marathon []marathon.Config `toml:"marathon" override:"marathon,element-key=id"` + Nerve []nerve.Config `toml:"nerve" override:"nerve,element-key=id"` + Serverset []serverset.Config `toml:"serverset" override:"serverset,element-key=id"` + Static []static.Config `toml:"static" override:"static,element-key=id"` + Triton []triton.Config `toml:"triton" override:"triton,element-key=id"` + // Third-party integrations - Kubernetes k8s.Config `toml:"kubernetes" override:"kubernetes"` + Kubernetes k8s.Configs `toml:"kubernetes" override:"kubernetes,element-key=id"` Reporting reporting.Config `toml:"reporting"` Stats stats.Config `toml:"stats"` @@ -102,7 +128,6 @@ func NewConfig() *Config { c.Task = task_store.NewConfig() c.InfluxDB = []influxdb.Config{influxdb.NewConfig()} c.Logging = logging.NewConfig() - c.Kubernetes = k8s.NewConfig() c.ConfigOverride = config.NewConfig() c.Collectd = collectd.NewConfig() @@ -250,6 +275,84 @@ func (c *Config) Validate() error { if err := c.UDF.Validate(); err != nil { return err } + + // Validate scrapers + for i := range c.Scrapers { + if err := c.Scrapers[i].Validate(); err != nil { + return err + } + } + + for i := range c.Azure { + if err := c.Azure[i].Validate(); err != nil { + return err + } + } + + for i := range c.Consul { + if err := c.Consul[i].Validate(); err != nil { + return err + } + } + + for i := range c.DNS { + if err := c.DNS[i].Validate(); err != nil { + return err + } + } + + for i := range c.EC2 { + if err := c.EC2[i].Validate(); err != nil { + return err + } + } + + for i := range c.Files { + if err := c.Files[i].Validate(); err != nil { + return err + } + } + + for i := range c.GCE { + if err := c.GCE[i].Validate(); err != nil { + return err + } + } + + if err := c.Kubernetes.Validate(); err != nil { + return err + } + + for i := range c.Marathon { + if err := c.Marathon[i].Validate(); err != nil { + return err + } + } + + for i := range c.Nerve { + if err := c.Nerve[i].Validate(); err != nil { + return err + } + } + + for i := range c.Serverset { + if err := c.Serverset[i].Validate(); err != nil { + return err + } + } + + for i := range c.Static { + if err := c.Static[i].Validate(); err != nil { + return err + } + } + + for i := range c.Triton { + if err := c.Triton[i].Validate(); err != nil { + return err + } + } + return nil } diff --git a/server/server.go b/server/server.go index 9944e3a5bf..fef0e80210 100644 --- a/server/server.go +++ b/server/server.go @@ -22,29 +22,41 @@ import ( iclient "github.com/influxdata/kapacitor/influxdb" "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/azure" "github.com/influxdata/kapacitor/services/config" + "github.com/influxdata/kapacitor/services/consul" "github.com/influxdata/kapacitor/services/deadman" + "github.com/influxdata/kapacitor/services/dns" + "github.com/influxdata/kapacitor/services/ec2" + "github.com/influxdata/kapacitor/services/file" + "github.com/influxdata/kapacitor/services/gce" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" "github.com/influxdata/kapacitor/services/logging" + "github.com/influxdata/kapacitor/services/marathon" + "github.com/influxdata/kapacitor/services/nerve" "github.com/influxdata/kapacitor/services/noauth" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/replay" "github.com/influxdata/kapacitor/services/reporting" + "github.com/influxdata/kapacitor/services/scraper" "github.com/influxdata/kapacitor/services/sensu" + "github.com/influxdata/kapacitor/services/serverset" "github.com/influxdata/kapacitor/services/servicetest" "github.com/influxdata/kapacitor/services/slack" "github.com/influxdata/kapacitor/services/smtp" "github.com/influxdata/kapacitor/services/snmptrap" + "github.com/influxdata/kapacitor/services/static" "github.com/influxdata/kapacitor/services/stats" "github.com/influxdata/kapacitor/services/storage" "github.com/influxdata/kapacitor/services/talk" "github.com/influxdata/kapacitor/services/task_store" "github.com/influxdata/kapacitor/services/telegram" + "github.com/influxdata/kapacitor/services/triton" "github.com/influxdata/kapacitor/services/udf" "github.com/influxdata/kapacitor/services/udp" "github.com/influxdata/kapacitor/services/victorops" @@ -90,6 +102,19 @@ type Server struct { TesterService *servicetest.Service StatsService *stats.Service + ScraperService *scraper.Service + AzureService *azure.Service + ConsulService *consul.Service + DNSService *dns.Service + EC2Service *ec2.Service + FileService *file.Service + GCEService *gce.Service + MarathonService *marathon.Service + NerveService *nerve.Service + ServersetService *serverset.Service + StaticService *static.Service + TritonService *triton.Service + MetaClient *kapacitor.NoopMetaClient QueryExecutor *Queryexecutor @@ -225,6 +250,20 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server, s.appendStatsService() s.appendReportingService() + // Append Scraper and discovery services + s.appendScraperService() + s.appendAzureService(s.ScraperService) + s.appendConsulService(s.ScraperService) + s.appendDNSService(s.ScraperService) + s.appendEC2Service(s.ScraperService) + s.appendFileService(s.ScraperService) + s.appendGCEService(s.ScraperService) + s.appendMarathonService(s.ScraperService) + s.appendNerveService(s.ScraperService) + s.appendServersetService(s.ScraperService) + s.appendStaticService(s.ScraperService) + s.appendTritonService(s.ScraperService) + // Append HTTPD Service last so that the API is not listening till everything else succeeded. s.appendHTTPDService() @@ -636,6 +675,115 @@ func (s *Server) appendReportingService() { } } +func (s *Server) appendScraperService() { + c := s.config.Scrapers + l := s.LogService.NewLogger("[scrapers] ", log.LstdFlags) + srv := scraper.NewService(c, l) + srv.PointsWriter = s.TaskMaster + s.ScraperService = srv + s.SetDynamicService("scrapers", srv) + s.AppendService("scrapers", srv) +} + +func (s *Server) appendAzureService(r scraper.Registry) { + c := s.config.Azure + l := s.LogService.NewLogger("[azure] ", log.LstdFlags) + srv := azure.NewService(c, r, l) + s.AzureService = srv + s.SetDynamicService("azure", srv) + s.AppendService("azure", srv) +} + +func (s *Server) appendConsulService(r scraper.Registry) { + c := s.config.Consul + l := s.LogService.NewLogger("[consul] ", log.LstdFlags) + srv := consul.NewService(c, r, l) + s.ConsulService = srv + s.SetDynamicService("consul", srv) + s.AppendService("consul", srv) +} + +func (s *Server) appendDNSService(r scraper.Registry) { + c := s.config.DNS + l := s.LogService.NewLogger("[dns] ", log.LstdFlags) + srv := dns.NewService(c, r, l) + s.DNSService = srv + s.SetDynamicService("dns", srv) + s.AppendService("dns", srv) +} + +func (s *Server) appendEC2Service(r scraper.Registry) { + c := s.config.EC2 + l := s.LogService.NewLogger("[ec2] ", log.LstdFlags) + srv := ec2.NewService(c, r, l) + s.EC2Service = srv + s.SetDynamicService("ec2", srv) + s.AppendService("ec2", srv) +} + +func (s *Server) appendFileService(r scraper.Registry) { + c := s.config.Files + l := s.LogService.NewLogger("[files] ", log.LstdFlags) + srv := file.NewService(c, r, l) + s.FileService = srv + s.SetDynamicService("files", srv) + s.AppendService("files", srv) +} + +func (s *Server) appendGCEService(r scraper.Registry) { + c := s.config.GCE + l := s.LogService.NewLogger("[gce] ", log.LstdFlags) + srv := gce.NewService(c, r, l) + s.GCEService = srv + s.SetDynamicService("gce", srv) + s.AppendService("gce", srv) +} + +func (s *Server) appendMarathonService(r scraper.Registry) { + c := s.config.Marathon + l := s.LogService.NewLogger("[marathon] ", log.LstdFlags) + srv := marathon.NewService(c, r, l) + s.MarathonService = srv + s.SetDynamicService("marathon", srv) + s.AppendService("marathon", srv) +} + +func (s *Server) appendNerveService(r scraper.Registry) { + c := s.config.Nerve + l := s.LogService.NewLogger("[nerve] ", log.LstdFlags) + srv := nerve.NewService(c, r, l) + s.NerveService = srv + s.SetDynamicService("nerve", srv) + s.AppendService("nerve", srv) +} + +func (s *Server) appendServersetService(r scraper.Registry) { + c := s.config.Serverset + l := s.LogService.NewLogger("[serverset] ", log.LstdFlags) + srv := serverset.NewService(c, r, l) + s.ServersetService = srv + s.SetDynamicService("serverset", srv) + s.AppendService("serverset", srv) +} + +func (s *Server) appendStaticService(r scraper.Registry) { + c := s.config.Static + l := s.LogService.NewLogger("[static] ", log.LstdFlags) + srv := static.NewService(c, r, l) + s.StaticService = srv + s.SetDynamicService("static", srv) + s.AppendService("static", srv) +} + +func (s *Server) appendTritonService(r scraper.Registry) { + c := s.config.Triton + l := s.LogService.NewLogger("[triton] ", log.LstdFlags) + srv := triton.NewService(c, r, l) + s.TritonService = srv + s.SetDynamicService("triton", srv) + s.AppendService("triton", srv) +} + // Err returns an error channel that multiplexes all out of band errors received from all services. func (s *Server) Err() <-chan error { return s.err } @@ -667,7 +815,7 @@ func (s *Server) startServices() error { s.Logger.Printf("D! opened service: %T", service) // Apply config overrides after the config override service has been opened and before any dynamic services. - if service == s.ConfigOverrideService && !s.config.SkipConfigOverrides { + if service == s.ConfigOverrideService && !s.config.SkipConfigOverrides && s.config.ConfigOverride.Enabled { // Apply initial config updates s.Logger.Println("D! applying configuration overrides") configs, err := s.ConfigOverrideService.Config() @@ -685,7 +833,6 @@ func (s *Server) startServices() error { } } } - } return nil } diff --git a/server/server_test.go b/server/server_test.go index 153c262de6..db920838c5 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6571,6 +6571,47 @@ func TestServer_ListServiceTests(t *testing.T) { }, }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/azure"}, + Name: "azure", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/consul"}, + Name: "consul", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/dns"}, + Name: "dns", + Options: client.ServiceTestOptions{ + "id": ""}, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/ec2"}, + Name: "ec2", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/files"}, + Name: "files", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/gce"}, + Name: "gce", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/hipchat"}, Name: "hipchat", @@ -6592,6 +6633,20 @@ func TestServer_ListServiceTests(t *testing.T) { Name: "kubernetes", Options: nil, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/marathon"}, + Name: "marathon", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/nerve"}, + Name: "nerve", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/opsgenie"}, Name: "opsgenie", @@ -6626,6 +6681,13 @@ func TestServer_ListServiceTests(t *testing.T) { "level": "CRITICAL", }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/scrapers"}, + Name: "scrapers", + Options: client.ServiceTestOptions{ + "name": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/sensu"}, Name: "sensu", @@ -6636,6 +6698,13 @@ func TestServer_ListServiceTests(t *testing.T) { "level": "CRITICAL", }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/serverset"}, + Name: "serverset", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/slack"}, Name: "slack", @@ -6670,6 +6739,13 @@ func TestServer_ListServiceTests(t *testing.T) { }, }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/static"}, + Name: "static", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/talk"}, Name: "talk", @@ -6689,6 +6765,13 @@ func TestServer_ListServiceTests(t *testing.T) { "disable-notification": false, }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/triton"}, + Name: "triton", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/victorops"}, Name: "victorops", @@ -6728,6 +6811,13 @@ func TestServer_ListServiceTests_WithPattern(t *testing.T) { expServiceTests := client.ServiceTests{ Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests"}, Services: []client.ServiceTest{ + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/scrapers"}, + Name: "scrapers", + Options: client.ServiceTestOptions{ + "name": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/sensu"}, Name: "sensu", @@ -6738,6 +6828,13 @@ func TestServer_ListServiceTests_WithPattern(t *testing.T) { "level": "CRITICAL", }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/serverset"}, + Name: "serverset", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/slack"}, Name: "slack", @@ -6772,6 +6869,13 @@ func TestServer_ListServiceTests_WithPattern(t *testing.T) { }, }, }, + { + Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/static"}, + Name: "static", + Options: client.ServiceTestOptions{ + "id": "", + }, + }, }, } if got, exp := serviceTests.Link.Href, expServiceTests.Link.Href; got != exp { @@ -6779,6 +6883,7 @@ func TestServer_ListServiceTests_WithPattern(t *testing.T) { } if got, exp := len(serviceTests.Services), len(expServiceTests.Services); got != exp { t.Fatalf("unexpected length of services: got %d exp %d", got, exp) + } for i := range expServiceTests.Services { exp := expServiceTests.Services[i] diff --git a/services/azure/config.go b/services/azure/config.go new file mode 100644 index 0000000000..ac57ad45ba --- /dev/null +++ b/services/azure/config.go @@ -0,0 +1,65 @@ +package azure + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a Azure service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Port int `toml:"port" override:"port"` + SubscriptionID string `toml:"subscription-id" override:"subscription-id"` + TenantID string `toml:"tenant-id" override:"tenant-id"` + ClientID string `toml:"client-id" override:"client-id"` + ClientSecret string `toml:"client-secret" override:"client-secret,redact"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` +} + +// Init adds the default values for uninitialized configurations +func (a *Config) Init() { + a.Port = 80 + a.RefreshInterval = toml.Duration(5 * time.Minute) +} + +// Validate checks the azure configuration +func (a Config) Validate() error { + if a.ID == "" { + return fmt.Errorf("azure discovery must be given a ID") + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (a Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.AzureSDConfigs = []*config.AzureSDConfig{ + a.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (a Config) PromConfig() *config.AzureSDConfig { + return &config.AzureSDConfig{ + Port: a.Port, + SubscriptionID: a.SubscriptionID, + TenantID: a.TenantID, + ClientID: a.ClientID, + ClientSecret: a.ClientSecret, + RefreshInterval: model.Duration(a.RefreshInterval), + } +} + +// Service return discoverer type +func (a Config) Service() string { + return "azure" +} + +// ServiceID returns the discoverers name +func (a Config) ServiceID() string { + return a.ID +} diff --git a/services/azure/service.go b/services/azure/service.go new file mode 100644 index 0000000000..385611e7b4 --- /dev/null +++ b/services/azure/service.go @@ -0,0 +1,153 @@ +package azure + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pazure "github.com/prometheus/prometheus/discovery/azure" +) + +// Service is the azure discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened Azure service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the Azure service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the Azure service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration of Azure while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pazure.NewDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/config/override/override.go b/services/config/override/override.go index 1bbcef8aa1..7c9e2fc50a 100644 --- a/services/config/override/override.go +++ b/services/config/override/override.go @@ -496,7 +496,7 @@ func (e Element) Redacted() (map[string]interface{}, []string, error) { return walker.optionsMap(), walker.redactedList(), nil } -// getElementKey returns the name of the field taht is used to uniquely identify elements of a list. +// getElementKey returns the name of the field that is used to uniquely identify elements of a list. func getElementKey(f reflect.StructField) string { parts := strings.Split(f.Tag.Get(structTagKey), ",") if len(parts) > 1 { @@ -633,12 +633,14 @@ type sectionWalker struct { sections map[string]Section currentSectionName string elementKeys map[string]string + uniqueElements map[string]bool } func newSectionWalker() *sectionWalker { return §ionWalker{ - sections: make(map[string]Section), - elementKeys: make(map[string]string), + sections: make(map[string]Section), + elementKeys: make(map[string]string), + uniqueElements: make(map[string]bool), } } @@ -707,6 +709,11 @@ func (w *sectionWalker) SliceElem(idx int, v reflect.Value) error { } else { return fmt.Errorf("could not find field with the name of the element key %q on element object", elementKey) } + + if w.uniqueElements[element] { + return fmt.Errorf("section %q has duplicate elements: %q", w.currentSectionName, element) + } + w.uniqueElements[element] = true w.sections[w.currentSectionName] = append(w.sections[w.currentSectionName], Element{ value: v.Interface(), element: element, diff --git a/services/consul/config.go b/services/consul/config.go new file mode 100644 index 0000000000..970425883c --- /dev/null +++ b/services/consul/config.go @@ -0,0 +1,88 @@ +package consul + +import ( + "fmt" + "strings" + + "github.com/prometheus/prometheus/config" +) + +// Config is a Consul service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Address string `toml:"address" override:"address"` + Token string `toml:"token" override:"token,redact"` + Datacenter string `toml:"datacenter" override:"datacenter"` + TagSeparator string `toml:"tag-separator" override:"tag-separator"` + Scheme string `toml:"scheme" override:"scheme"` + Username string `toml:"username" override:"username"` + Password string `toml:"password" override:"password,redact"` + Services []string `toml:"services" override:"services"` + // Path to CA file + SSLCA string `toml:"ssl-ca" override:"ssl-ca"` + // Path to host cert file + SSLCert string `toml:"ssl-cert" override:"ssl-cert"` + // Path to cert key file + SSLKey string `toml:"ssl-key" override:"ssl-key"` + // SSLServerName is used to verify the hostname for the targets. + SSLServerName string `toml:"ssl-server-name" override:"ssl-server-name"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` +} + +// Init adds defaults to Consul +func (c *Config) Init() { + c.Address = "127.0.0.1:8500" + c.TagSeparator = "," + c.Scheme = "http" +} + +// Validate validates the consul configuration +func (c Config) Validate() error { + if c.ID == "" { + return fmt.Errorf("consul discovery must be given a ID") + } + if strings.TrimSpace(c.Address) == "" { + return fmt.Errorf("consul discovery requires a server address") + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (c Config) Prom(conf *config.ScrapeConfig) { + conf.ServiceDiscoveryConfig.ConsulSDConfigs = []*config.ConsulSDConfig{ + c.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (c Config) PromConfig() *config.ConsulSDConfig { + return &config.ConsulSDConfig{ + Server: c.Address, + Token: c.Token, + Datacenter: c.Datacenter, + TagSeparator: c.TagSeparator, + Scheme: c.Scheme, + Username: c.Username, + Password: c.Password, + Services: c.Services, + TLSConfig: config.TLSConfig{ + CAFile: c.SSLCA, + CertFile: c.SSLCert, + KeyFile: c.SSLKey, + ServerName: c.SSLServerName, + InsecureSkipVerify: c.InsecureSkipVerify, + }, + } +} + +// Service return discoverer type +func (c Config) Service() string { + return "consul" +} + +// ServiceID returns the discoverers name +func (c Config) ServiceID() string { + return c.ID +} diff --git a/services/consul/service.go b/services/consul/service.go new file mode 100644 index 0000000000..12d433ce37 --- /dev/null +++ b/services/consul/service.go @@ -0,0 +1,155 @@ +package consul + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pconsul "github.com/prometheus/prometheus/discovery/consul" +) + +// Service is the consul discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer, err := pconsul.NewDiscovery(sd, scraper.NewLogger(s.logger)) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/dns/config.go b/services/dns/config.go new file mode 100644 index 0000000000..74810f3659 --- /dev/null +++ b/services/dns/config.go @@ -0,0 +1,71 @@ +package dns + +import ( + "fmt" + "strings" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a DNS service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + RecordNames []string `toml:"record-names" override:"record-names"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` + Type string `toml:"type" override:"type"` + Port int `toml:"port" override:"port"` // Ignored for SRV records +} + +// Init adds default values to DNS configuration +func (d *Config) Init() { + d.Type = "SRV" + d.RefreshInterval = toml.Duration(30 * time.Second) +} + +// Validate validates DNS configuration's values +func (d Config) Validate() error { + if d.ID == "" { + return fmt.Errorf("dns discovery must be given a ID") + } + switch strings.ToUpper(d.Type) { + case "SRV": + case "A", "AAAA": + if d.Port == 0 { + return fmt.Errorf("Port required for dns discovery type %s", d.Type) + } + default: + return fmt.Errorf("invalid dns discovery records type %s", d.Type) + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (d Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.DNSSDConfigs = []*config.DNSSDConfig{ + d.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (d Config) PromConfig() *config.DNSSDConfig { + return &config.DNSSDConfig{ + Names: d.RecordNames, + RefreshInterval: model.Duration(d.RefreshInterval), + Type: d.Type, + Port: d.Port, + } +} + +// Service return discoverer type +func (d Config) Service() string { + return "dns" +} + +// ServiceID returns the discoverers name +func (d Config) ServiceID() string { + return d.ID +} diff --git a/services/dns/service.go b/services/dns/service.go new file mode 100644 index 0000000000..df001ea9c7 --- /dev/null +++ b/services/dns/service.go @@ -0,0 +1,153 @@ +package dns + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pdns "github.com/prometheus/prometheus/discovery/dns" +) + +// Service is the dns discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pdns.NewDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/ec2/config.go b/services/ec2/config.go new file mode 100644 index 0000000000..f50771f9ce --- /dev/null +++ b/services/ec2/config.go @@ -0,0 +1,68 @@ +package ec2 + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is EC2 service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Region string `toml:"region" override:"region"` + AccessKey string `toml:"access-key" override:"access-key"` + SecretKey string `toml:"secret-key" override:"secret-key,redact"` + Profile string `toml:"profile" override:"profile"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` + Port int `toml:"port" override:"port"` +} + +// Init adds default values to EC2 configuration +func (e *Config) Init() { + e.Port = 80 + e.RefreshInterval = toml.Duration(60 * time.Second) +} + +// Validate validates the EC2 configuration values +func (e Config) Validate() error { + if e.ID == "" { + return fmt.Errorf("ec2 discovery must be given a ID") + } + if e.Region == "" { + return fmt.Errorf("ec2 discovery, %s, requires a region", e.ID) + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (e Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.EC2SDConfigs = []*config.EC2SDConfig{ + e.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (e Config) PromConfig() *config.EC2SDConfig { + return &config.EC2SDConfig{ + Region: e.Region, + AccessKey: e.AccessKey, + SecretKey: e.SecretKey, + Profile: e.Profile, + RefreshInterval: model.Duration(e.RefreshInterval), + Port: e.Port, + } +} + +// Service return discoverer type +func (e Config) Service() string { + return "ec2" +} + +// ServiceID returns the discoverers name +func (e Config) ServiceID() string { + return e.ID +} diff --git a/services/ec2/service.go b/services/ec2/service.go new file mode 100644 index 0000000000..c54d31d3ca --- /dev/null +++ b/services/ec2/service.go @@ -0,0 +1,153 @@ +package ec2 + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pec2 "github.com/prometheus/prometheus/discovery/ec2" +) + +// Service is the ec2 discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pec2.NewDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/file/config.go b/services/file/config.go new file mode 100644 index 0000000000..3f88a47327 --- /dev/null +++ b/services/file/config.go @@ -0,0 +1,65 @@ +package file + +import ( + "fmt" + "regexp" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a file service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Files []string `toml:"files" override:"files"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` +} + +// Init adds defaults to an existing File configuration +func (f *Config) Init() { + f.RefreshInterval = toml.Duration(5 * time.Minute) + +} + +var fileRegex = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`) + +// Validate validates the File configuration +func (f Config) Validate() error { + if f.ID == "" { + return fmt.Errorf("file discovery must be given a ID") + } + for _, name := range f.Files { + if !fileRegex.MatchString(name) { + return fmt.Errorf("path name %q is not valid for file discovery", name) + } + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (f Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.FileSDConfigs = []*config.FileSDConfig{ + f.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (f Config) PromConfig() *config.FileSDConfig { + return &config.FileSDConfig{ + Files: f.Files, + RefreshInterval: model.Duration(f.RefreshInterval), + } +} + +// Service return discoverer type +func (f Config) Service() string { + return "files" +} + +// ServiceID returns the discoverers name +func (f Config) ServiceID() string { + return f.ID +} diff --git a/services/file/service.go b/services/file/service.go new file mode 100644 index 0000000000..0982775357 --- /dev/null +++ b/services/file/service.go @@ -0,0 +1,153 @@ +package file + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pfile "github.com/prometheus/prometheus/discovery/file" +) + +// Service is the file discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pfile.NewDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/gce/config.go b/services/gce/config.go new file mode 100644 index 0000000000..424bd8619b --- /dev/null +++ b/services/gce/config.go @@ -0,0 +1,66 @@ +package gce + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is GCE service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Project string `toml:"project" override:"project"` + Zone string `toml:"zone" override:"zone"` + Filter string `toml:"filter" override:"filter"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` + Port int `toml:"port" override:"port"` + TagSeparator string `toml:"tag-separator" override:"tag-separator"` +} + +// Init adds defaults to existing GCE configuration +func (g *Config) Init() { + g.Port = 80 + g.TagSeparator = "," + g.RefreshInterval = toml.Duration(60 * time.Second) +} + +// Validate validates the GCE configuration values +func (g Config) Validate() error { + if g.ID == "" { + return fmt.Errorf("gce discovery must be given a ID") + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (g Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.GCESDConfigs = []*config.GCESDConfig{ + g.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (g Config) PromConfig() *config.GCESDConfig { + return &config.GCESDConfig{ + Project: g.Project, + Zone: g.Zone, + Filter: g.Filter, + RefreshInterval: model.Duration(g.RefreshInterval), + Port: g.Port, + TagSeparator: g.TagSeparator, + } +} + +// Service return discoverer type +func (g Config) Service() string { + return "gce" +} + +// ServiceID returns the discoverers name +func (g Config) ServiceID() string { + return g.ID +} diff --git a/services/gce/service.go b/services/gce/service.go new file mode 100644 index 0000000000..55f25a8f42 --- /dev/null +++ b/services/gce/service.go @@ -0,0 +1,155 @@ +package gce + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pgce "github.com/prometheus/prometheus/discovery/gce" +) + +// Service is the gce discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer, err := pgce.NewDiscovery(sd, scraper.NewLogger(s.logger)) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/k8s/config.go b/services/k8s/config.go index 854d70cf7a..64078594cb 100644 --- a/services/k8s/config.go +++ b/services/k8s/config.go @@ -5,12 +5,14 @@ import ( "crypto/x509" "io/ioutil" + "github.com/influxdata/kapacitor/listmap" "github.com/influxdata/kapacitor/services/k8s/client" "github.com/pkg/errors" ) type Config struct { Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` InCluster bool `toml:"in-cluster" override:"in-cluster"` APIServers []string `toml:"api-servers" override:"api-servers"` Token string `toml:"token" override:"token,redact"` @@ -70,3 +72,21 @@ func (c Config) ClientConfig() (client.Config, error) { TLSConfig: t, }, nil } + +type Configs []Config + +func (cs *Configs) UnmarshalTOML(data interface{}) error { + return listmap.DoUnmarshalTOML(cs, data) +} + +func (cs Configs) Validate() error { + for _, c := range cs { + if err := c.Validate(); err != nil { + return err + } + if c.ID == "" { + return errors.New("id must not be empty") + } + } + return nil +} diff --git a/services/k8s/k8s.go b/services/k8s/k8s.go new file mode 100644 index 0000000000..01ef731328 --- /dev/null +++ b/services/k8s/k8s.go @@ -0,0 +1,69 @@ +package k8s + +import ( + "log" + "sync/atomic" + + "github.com/influxdata/kapacitor/services/k8s/client" + "github.com/pkg/errors" +) + +type Cluster struct { + configValue atomic.Value // Config + client client.Client + logger *log.Logger +} + +func NewCluster(c Config, l *log.Logger) (*Cluster, error) { + clientConfig, err := c.ClientConfig() + if err != nil { + return nil, errors.Wrap(err, "failed to create k8s client config") + } + cli, err := client.New(clientConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create k8s client") + } + + s := &Cluster{ + client: cli, + logger: l, + } + s.configValue.Store(c) + return s, nil +} +func (s *Cluster) Open() error { + return nil +} + +func (s *Cluster) Close() error { + return nil +} + +func (s *Cluster) Update(c Config) error { + s.configValue.Store(c) + clientConfig, err := c.ClientConfig() + if err != nil { + return errors.Wrap(err, "failed to create k8s client config") + } + return s.client.Update(clientConfig) +} + +func (s *Cluster) Test() error { + cli, err := s.Client() + if err != nil { + return errors.Wrap(err, "failed to get client") + } + _, err = cli.Versions() + if err != nil { + return errors.Wrap(err, "failed to query server versions") + } + return nil +} + +func (s *Cluster) Client() (client.Client, error) { + config := s.configValue.Load().(Config) + if !config.Enabled { + return nil, errors.New("service is not enabled") + } + return s.client, nil +} diff --git a/services/k8s/service.go b/services/k8s/service.go index a2f03c6ede..cab3053c60 100644 --- a/services/k8s/service.go +++ b/services/k8s/service.go @@ -3,80 +3,121 @@ package k8s import ( "fmt" "log" - "sync/atomic" + "sync" "github.com/influxdata/kapacitor/services/k8s/client" "github.com/pkg/errors" ) type Service struct { - configValue atomic.Value // Config - client client.Client - logger *log.Logger + mu sync.Mutex + clusters map[string]*Cluster + logger *log.Logger } -func NewService(c Config, l *log.Logger) (*Service, error) { - clientConfig, err := c.ClientConfig() - if err != nil { - return nil, errors.Wrap(err, "failed to create k8s client config") - } - cli, err := client.New(clientConfig) - if err != nil { - return nil, errors.Wrap(err, "failed to create k8s client") - } +func NewService(c []Config, l *log.Logger) (*Service, error) { + l.Println("D! k8s configs", c) - s := &Service{ - client: cli, - logger: l, + clusters := make(map[string]*Cluster, len(c)) + for i := range c { + cluster, err := NewCluster(c[i], l) + if err != nil { + return nil, err + } + clusters[c[i].ID] = cluster } - s.configValue.Store(c) - return s, nil + + return &Service{ + clusters: clusters, + logger: l, + }, nil } func (s *Service) Open() error { + s.mu.Lock() + defer s.mu.Unlock() + for _, c := range s.clusters { + if err := c.Open(); err != nil { + return err + } + } return nil } func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + for _, c := range s.clusters { + c.Close() + } return nil } -func (s *Service) Update(newConfig []interface{}) error { - if l := len(newConfig); l != 1 { - return fmt.Errorf("expected only one new config object, got %d", l) - } - c, ok := newConfig[0].(Config) - if !ok { - return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + existingClusters := make(map[string]bool, len(newConfigs)) + for i := range newConfigs { + c, ok := newConfigs[i].(Config) + if !ok { + return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfigs[i]) + } + cluster, ok := s.clusters[c.ID] + if !ok { + var err error + cluster, err = NewCluster(c, s.logger) + if err != nil { + return err + } + if err := cluster.Open(); err != nil { + return err + } + s.clusters[c.ID] = cluster + } else { + if err := cluster.Update(c); err != nil { + return err + } + } + existingClusters[c.ID] = true } - s.configValue.Store(c) - clientConfig, err := c.ClientConfig() - if err != nil { - return errors.Wrap(err, "failed to create k8s client config") + // Close and delete any removed clusters + for id := range s.clusters { + if !existingClusters[id] { + s.clusters[id].Close() + delete(s.clusters, id) + } } - return s.client.Update(clientConfig) + return nil +} + +type testOptions struct { + ID string `json:"id"` } func (s *Service) TestOptions() interface{} { - return nil + return new(testOptions) } func (s *Service) Test(options interface{}) error { - cli, err := s.Client() - if err != nil { - return errors.Wrap(err, "failed to get client") + o, ok := options.(testOptions) + if !ok { + return errors.New("unexpected test options") } - _, err = cli.Versions() - if err != nil { - return errors.Wrap(err, "failed to query server versions") + s.mu.Lock() + cluster, ok := s.clusters[o.ID] + s.mu.Unlock() + if !ok { + return fmt.Errorf("unknown kubernetes cluster %q", o.ID) } - return nil + return cluster.Test() } -func (s *Service) Client() (client.Client, error) { - config := s.configValue.Load().(Config) - if !config.Enabled { - return nil, errors.New("service is not enabled") +func (s *Service) Client(id string) (client.Client, error) { + s.mu.Lock() + cluster, ok := s.clusters[id] + s.mu.Unlock() + if !ok { + return nil, fmt.Errorf("unknown kubernetes cluster %q, cannot get client", id) } - return s.client, nil + return cluster.Client() } diff --git a/services/marathon/config.go b/services/marathon/config.go new file mode 100644 index 0000000000..42bfc415a5 --- /dev/null +++ b/services/marathon/config.go @@ -0,0 +1,78 @@ +package marathon + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is Marathon service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Servers []string `toml:"servers" override:"servers"` + Timeout toml.Duration `toml:"timeout" override:"timeout"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` + BearerToken string `toml:"bearer-token" override:"bearer-token,redact"` + // Path to CA file + SSLCA string `toml:"ssl-ca" override:"ssl-ca"` + // Path to host cert file + SSLCert string `toml:"ssl-cert" override:"ssl-cert"` + // Path to cert key file + SSLKey string `toml:"ssl-key" override:"ssl-key"` + // SSLServerName is used to verify the hostname for the targets. + SSLServerName string `toml:"ssl-server-name" override:"ssl-server-name"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` +} + +// Init adds default values to existing Marathon configuration +func (m *Config) Init() { + m.Timeout = toml.Duration(30 * time.Second) + m.RefreshInterval = toml.Duration(30 * time.Second) +} + +// Validate validates Marathon configuration values +func (m Config) Validate() error { + if m.ID == "" { + return fmt.Errorf("marathon discovery must be given an ID") + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (m Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.MarathonSDConfigs = []*config.MarathonSDConfig{ + m.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (m Config) PromConfig() *config.MarathonSDConfig { + return &config.MarathonSDConfig{ + Servers: m.Servers, + Timeout: model.Duration(m.Timeout), + RefreshInterval: model.Duration(m.RefreshInterval), + BearerToken: m.BearerToken, + TLSConfig: config.TLSConfig{ + CAFile: m.SSLCA, + CertFile: m.SSLCert, + KeyFile: m.SSLKey, + ServerName: m.SSLServerName, + InsecureSkipVerify: m.InsecureSkipVerify, + }, + } +} + +// Service return discoverer type +func (m Config) Service() string { + return "marathon" +} + +// ServiceID returns the discoverers name +func (m Config) ServiceID() string { + return m.ID +} diff --git a/services/marathon/service.go b/services/marathon/service.go new file mode 100644 index 0000000000..a84aa05810 --- /dev/null +++ b/services/marathon/service.go @@ -0,0 +1,155 @@ +package marathon + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pmarathon "github.com/prometheus/prometheus/discovery/marathon" +) + +// Service is the marathon discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer, err := pmarathon.NewDiscovery(sd, scraper.NewLogger(s.logger)) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/nerve/config.go b/services/nerve/config.go new file mode 100644 index 0000000000..97f5c69f90 --- /dev/null +++ b/services/nerve/config.go @@ -0,0 +1,64 @@ +package nerve + +import ( + "fmt" + "strings" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a Nerve service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Servers []string `toml:"servers" override:"servers"` + Paths []string `toml:"paths" override:"paths"` + Timeout toml.Duration `toml:"timeout" override:"timeout"` +} + +// Init adds default values to Nerve configuration +func (n *Config) Init() { + n.Timeout = toml.Duration(10 * time.Second) +} + +// Validate validates the Nerve configuration values +func (n Config) Validate() error { + if n.ID == "" { + return fmt.Errorf("nerve discovery must be given a ID") + } + for _, path := range n.Paths { + if !strings.HasPrefix(path, "/") { + return fmt.Errorf("nerve discovery paths must begin with '/': %s", path) + } + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (n Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.NerveSDConfigs = []*config.NerveSDConfig{ + n.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (n Config) PromConfig() *config.NerveSDConfig { + return &config.NerveSDConfig{ + Servers: n.Servers, + Paths: n.Paths, + Timeout: model.Duration(n.Timeout), + } +} + +// Service return discoverer type +func (n Config) Service() string { + return "nerve" +} + +// ServiceID returns the discoverers name +func (n Config) ServiceID() string { + return n.ID +} diff --git a/services/nerve/service.go b/services/nerve/service.go new file mode 100644 index 0000000000..a3012a86ae --- /dev/null +++ b/services/nerve/service.go @@ -0,0 +1,153 @@ +package nerve + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pnerve "github.com/prometheus/prometheus/discovery/zookeeper" +) + +// Service is the nerve discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pnerve.NewNerveDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/scraper/config.go b/services/scraper/config.go new file mode 100644 index 0000000000..d6951c825d --- /dev/null +++ b/services/scraper/config.go @@ -0,0 +1,163 @@ +package scraper + +import ( + "fmt" + "net/url" + "strings" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is the scraper configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + // The job name to which the job label is set by default. + Name string `toml:"name" override:"name"` + // Type of the scraper + Type string `toml:"type" override:"type"` + // Database this data will be associated with + Database string `toml:"db" override:"db"` + // RetentionPolicyt this data will be associated with + RetentionPolicy string `toml:"rp" override:"rp"` + // The URL scheme with which to fetch metrics from targets. + Scheme string `toml:"scheme" override:"scheme"` + // The HTTP resource path on which to fetch metrics from targets. + MetricsPath string `toml:"metrics-path" override:"metrics-path"` + // A set of query parameters with which the target is scraped. + Params url.Values `toml:"params" override:"params"` + // How frequently to scrape the targets of this scrape config. + ScrapeInterval toml.Duration `toml:"scrape-interval" override:"scrape-interval"` + // The timeout for scraping targets of this config. + ScrapeTimeout toml.Duration `toml:"scrape-timeout" override:"scrape-timeout"` + // The HTTP basic authentication credentials for the targets. + Username string `toml:"username" override:"username"` + Password string `toml:"password" override:"password,redact"` + + // Path to CA file + SSLCA string `toml:"ssl-ca" override:"ssl-ca"` + // Path to host cert file + SSLCert string `toml:"ssl-cert" override:"ssl-cert"` + // Path to cert key file + SSLKey string `toml:"ssl-key" override:"ssl-key"` + // SSLServerName is used to verify the hostname for the targets. + SSLServerName string `toml:"ssl-server-name" override:"ssl-server-name"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` + + // The bearer token for the targets. + BearerToken string `toml:"bearer-token" override:"bearer-token,redact"` + // HTTP proxy server to use to connect to the targets. + ProxyURL *url.URL `toml:"proxy-url" override:"proxy-url"` + // DiscoverID is the id of the discoverer that generates hosts for the scraper + DiscoverID string `toml:"discoverer-id" override:"discoverer-id"` + // DiscoverService is the type of the discoverer that generates hosts for the scraper + DiscoverService string `toml:"discoverer-service" override:"discoverer-service"` + + // Blacklist is a list of hosts to ignore and not scrape + Blacklist []string `toml:"blacklist" override:"blacklist"` +} + +// Init adds default values to Config scraper +func (c *Config) Init() { + c.Type = "prometheus" + c.ScrapeInterval = toml.Duration(time.Minute) + c.ScrapeTimeout = toml.Duration(10 * time.Second) + c.MetricsPath = "/metrics" + c.Scheme = "http" +} + +// Validate validates the configuration of the Scraper +func (c *Config) Validate() error { + if c.Name == "" { + return fmt.Errorf("scraper config must be given a name") + } + if c.Database == "" { + return fmt.Errorf("scraper config must be given a db") + } + if c.RetentionPolicy == "" { + return fmt.Errorf("scraper config must be given an rp") + } + if c.Type != "prometheus" { + return fmt.Errorf("Unknown scraper type") + } + + return nil +} + +// Prom generates the prometheus configuration for the scraper +func (c *Config) Prom() *config.ScrapeConfig { + sc := &config.ScrapeConfig{ + JobName: encodeJobName(c.Database, c.RetentionPolicy, c.Name), + Scheme: c.Scheme, + MetricsPath: c.MetricsPath, + Params: c.Params, + ScrapeInterval: model.Duration(c.ScrapeInterval), + HTTPClientConfig: config.HTTPClientConfig{ + BasicAuth: &config.BasicAuth{ + Username: c.Username, + Password: c.Password, + }, + BearerToken: c.BearerToken, + ProxyURL: config.URL{ + URL: c.ProxyURL, + }, + TLSConfig: config.TLSConfig{ + CAFile: c.SSLCA, + CertFile: c.SSLCert, + KeyFile: c.SSLKey, + ServerName: c.SSLServerName, + InsecureSkipVerify: c.InsecureSkipVerify, + }, + }, + } + return sc +} + +func encodeJobName(db, rp, name string) string { + // Because I cannot add label information to my scraped targets + // I'm abusing the JobName by encoding database, retention policy, + // and name. + return fmt.Sprintf("%s|%s|%s", db, rp, name) +} + +func decodeJobName(job string) (string, string, string, error) { + split := strings.Split(job, "|") + if len(split) != 3 { + return "", "", "", fmt.Errorf("unable to decode job name") + } + return split[0], split[1], split[2], nil +} + +type KubernetesRole string + +const ( + KubernetesRoleNode = "node" + KubernetesRolePod = "pod" + KubernetesRoleService = "service" + KubernetesRoleEndpoint = "endpoints" +) + +// Kubernetes is Kubernetes service discovery configuration +type Kubernetes struct { + APIServer url.URL `toml:"api_server" override:"api_server"` + Role KubernetesRole `toml:"role" override:"role"` +} + +func (k Kubernetes) Prom(c *config.ScrapeConfig) { + // TODO: auth token tls + c.ServiceDiscoveryConfig.KubernetesSDConfigs = []*config.KubernetesSDConfig{ + &config.KubernetesSDConfig{ + APIServer: config.URL{ + URL: &k.APIServer, + }, + Role: config.KubernetesRole(k.Role), + }, + } +} + +func NewKubernetes() Kubernetes { + return Kubernetes{} +} diff --git a/services/scraper/discoverer.go b/services/scraper/discoverer.go new file mode 100644 index 0000000000..09a0129bf7 --- /dev/null +++ b/services/scraper/discoverer.go @@ -0,0 +1,17 @@ +package scraper + +import "github.com/prometheus/prometheus/config" + +// Discoverer represents a service that discovers hosts to scrape +type Discoverer interface { + // Service returns the service type of the Discoverer + Service() string + // ID returns the unique ID of this specific discoverer + ServiceID() string + // Prom creates a prometheus scrape configuration. + // TODO: replace when reimplement TargetManager + Prom(c *config.ScrapeConfig) + // TODO: future work + // Run(ctx context.Context) + // Find() hosts or something +} diff --git a/services/scraper/log.go b/services/scraper/log.go new file mode 100644 index 0000000000..21c218b857 --- /dev/null +++ b/services/scraper/log.go @@ -0,0 +1,99 @@ +package scraper + +import ( + "log" + + plog "github.com/prometheus/common/log" +) + +// Logger wraps kapacitor logging for prometheus +type Logger struct { + *log.Logger +} + +// NewLogger wraps a logger to be used for prometheus +func NewLogger(l *log.Logger) *Logger { + return &Logger{ + Logger: l, + } +} + +// Debug logs a message at level Debug on the standard logger. +func (l *Logger) Debug(v ...interface{}) { + l.Logger.Print("D! ", v) +} + +// Debugln logs a message at level Debug on the standard logger. +func (l *Logger) Debugln(v ...interface{}) { + l.Logger.Println("D! ", v) +} + +// Debugf logs a message at level Debug on the standard logger. +func (l *Logger) Debugf(s string, v ...interface{}) { + l.Logger.Printf("D! "+s, v) +} + +// Info logs a message at level Info on the standard logger. +func (l *Logger) Info(v ...interface{}) { + l.Logger.Print("I! ", v) +} + +// Infoln logs a message at level Info on the standard logger. +func (l *Logger) Infoln(v ...interface{}) { + l.Logger.Println("I! ", v) +} + +// Infof logs a message at level Info on the standard logger. +func (l *Logger) Infof(s string, v ...interface{}) { + l.Logger.Printf("I! "+s, v) +} + +// Warn logs a message at level Warn on the standard logger. +func (l *Logger) Warn(v ...interface{}) { + l.Logger.Print("W! ", v) +} + +// Warnln logs a message at level Warn on the standard logger. +func (l *Logger) Warnln(v ...interface{}) { + l.Logger.Println("W! ", v) +} + +// Warnf logs a message at level Warn on the standard logger. +func (l *Logger) Warnf(s string, v ...interface{}) { + l.Logger.Printf("W! "+s, v) +} + +// Error logs a message at level Error on the standard logger. +func (l *Logger) Error(v ...interface{}) { + l.Logger.Print("E! ", v) +} + +// Errorln logs a message at level Error on the standard logger. +func (l *Logger) Errorln(v ...interface{}) { + l.Logger.Println("E! ", v) +} + +// Errorf logs a message at level Error on the standard logger. +func (l *Logger) Errorf(s string, v ...interface{}) { + l.Logger.Printf("E! "+s, v) +} + +// Fatal logs a message at level Fatal on the standard logger. +func (l *Logger) Fatal(v ...interface{}) { + l.Logger.Fatal(v) +} + +// Fatalln logs a message at level Fatal on the standard logger. +func (l *Logger) Fatalln(v ...interface{}) { + l.Logger.Fatalln(v) +} + +// Fatalf logs a message at level Fatal on the standard logger. +func (l *Logger) Fatalf(s string, v ...interface{}) { + l.Logger.Fatalf(s, v) +} + +// With adds a field to the logger. +func (l *Logger) With(key string, value interface{}) plog.Logger { + return l +} diff --git a/services/scraper/registry.go b/services/scraper/registry.go new file mode 100644 index 0000000000..025a5be7d8 --- /dev/null +++ b/services/scraper/registry.go @@ -0,0 +1,23 @@ +package scraper + +// Pair is the linked discovery/scraper pair +type Pair struct { + Discoverer Discoverer + Scraper Config +} + +// Registry represents the combined configuration state of discoverers and scrapers +type Registry interface { + // Commit finishes the update to the registry configuration + Commit() error + // AddDiscoverer adds discoverers to the registry + AddDiscoverer(Discoverer) + // RemoveDiscoverer removes discoverers from the registry + RemoveDiscoverer(Discoverer) + // AddScrapers adds scrapers to the registry + AddScrapers([]Config) + // RemoveScrapers removes scrapers from the registry + RemoveScrapers([]Config) + // Pairs returns the linked scraper/discovery combinations + Pairs() []Pair +} diff --git a/services/scraper/service.go b/services/scraper/service.go new file mode 100644 index 0000000000..33c07e47bd --- /dev/null +++ b/services/scraper/service.go @@ -0,0 +1,327 @@ +package scraper + +import ( + "fmt" + "log" + "sync" + + "github.com/influxdata/influxdb/models" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval" + "github.com/prometheus/prometheus/storage" +) + +var ( + _ Registry = &Service{} + _ storage.SampleAppender = &Service{} +) + +// Service represents the scraper manager +type Service struct { + PointsWriter interface { + WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + } + mu sync.Mutex + wg sync.WaitGroup + + open bool + closing chan struct{} + updating chan *config.Config + + configs []Config + + logger *log.Logger + + discoverers []Discoverer + + // TargetManager represents a scraping/discovery manager + mgr interface { + ApplyConfig(cfg *config.Config) error + Stop() + Run() + } +} + +// NewService creates a new scraper service +func NewService(c []Config, l *log.Logger) *Service { + s := &Service{ + configs: c, + logger: l, + } + s.mgr = retrieval.NewTargetManager(s, NewLogger(l)) + return s +} + +// Open starts the scraper service +func (s *Service) Open() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.open { + return nil + } + + s.open = true + s.updating = make(chan *config.Config) + s.closing = make(chan struct{}) + + go s.scrape() + s.logger.Println("I! opened service") + + s.open = true + return nil +} + +// Close stops the scraper service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + close(s.closing) + + s.wg.Wait() + + s.logger.Println("I! closed service") + return nil +} + +func (s *Service) scrape() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.mu.Lock() + pairs := s.pairs() + conf := s.prom(pairs) + s.mu.Unlock() + + s.mgr.ApplyConfig(conf) + + s.mgr.Run() + }() + + for { + select { + case <-s.closing: + s.mgr.Stop() + return + case conf := <-s.updating: + s.logger.Println("I! updating scraper service") + s.mgr.ApplyConfig(conf) + } + } +} + +// Append tranforms prometheus samples and inserts data into the tasks pipeline +func (s *Service) Append(sample *model.Sample) error { + var err error + db := "" + rp := "" + job := "" + + tags := map[string]string{} + for name, value := range sample.Metric { + if name == "job" { + db, rp, job, err = decodeJobName(string(value)) + continue + } + tags[string(name)] = string(value) + } + + blacklist := func() bool { + s.mu.Lock() + defer s.mu.Unlock() + // If instance is blacklisted then do not send to PointsWriter + if instance, ok := tags["instance"]; ok { + for i := range s.configs { + if s.configs[i].Name == job { + for _, listed := range s.configs[i].Blacklist { + if instance == listed { + return true + } + } + } + } + } + return false + } + if blacklist() { + return nil + } + + fields := models.Fields{ + "value": float64(sample.Value), + } + + pt, err := models.NewPoint(job, models.NewTags(tags), fields, sample.Timestamp.Time()) + if err != nil { + return err + } + + return s.PointsWriter.WritePoints(db, rp, models.ConsistencyLevelAny, []models.Point{pt}) +} + +// NeedsThrottling conforms to SampleAppender and never returns true currently. +func (s *Service) NeedsThrottling() bool { + return false +} + +type testOptions struct { + Name string `json:"name"` +} + +// TestOptions returns options that are allowed for the Test +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test tests the options for the scrapers +func (s *Service) Test(options interface{}) error { + return nil +} + +// Update will replace all scraper configurations and apply the configuration +// to the target manager +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.configs = configs + if s.open { + pairs := s.pairs() + conf := s.prom(pairs) + select { + case <-s.closing: + return fmt.Errorf("error writing configuration to closed scraper") + case s.updating <- conf: + } + } + return nil +} + +// prom assumes service is locked +func (s *Service) prom(pairs []Pair) *config.Config { + conf := &config.Config{ + ScrapeConfigs: make([]*config.ScrapeConfig, len(pairs)), + } + + for i, pair := range pairs { + sc := pair.Scraper.Prom() + pair.Discoverer.Prom(sc) + conf.ScrapeConfigs[i] = sc + } + return conf +} + +// Commit applies the configuration to the scraper +func (s *Service) Commit() error { + s.mu.Lock() + defer s.mu.Unlock() + if !s.open { + return nil + } + + pairs := s.pairs() + conf := s.prom(pairs) + select { + case <-s.closing: + return fmt.Errorf("error writing configuration to closed registry") + case s.updating <- conf: + } + + return nil +} + +// AddDiscoverer adds discoverer to the registry +func (s *Service) AddDiscoverer(discoverer Discoverer) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.open { + return + } + + s.discoverers = append(s.discoverers, discoverer) +} + +// RemoveDiscoverer removes discoverer from the registry +func (s *Service) RemoveDiscoverer(rm Discoverer) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.open { + return + } + + for i, d := range s.discoverers { + if d.ServiceID() == rm.ServiceID() && d.Service() == rm.Service() { + s.discoverers = append(s.discoverers[:i], s.discoverers[i+1:]...) + } + } +} + +// AddScrapers adds scrapers to the registry +func (s *Service) AddScrapers(scrapers []Config) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.open { + return + } + + s.configs = append(s.configs, scrapers...) +} + +// RemoveScrapers removes scrapers from the registry +func (s *Service) RemoveScrapers(scrapers []Config) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.open { + return + } + + for _, rm := range scrapers { + for i, c := range s.configs { + if c.Name == rm.Name { + s.configs = append(s.configs[:i], s.configs[i+1:]...) + } + } + } +} + +// pairs returns all named pairs of scrapers and discoverers from registry must be locked +func (s *Service) pairs() []Pair { + pairs := []Pair{} + for _, scr := range s.configs { + if !scr.Enabled { + continue + } + for _, d := range s.discoverers { + if scr.DiscoverID == d.ServiceID() && scr.DiscoverService == d.Service() { + pairs = append(pairs, Pair{ + Scraper: scr, + Discoverer: d, + }) + } + } + } + return pairs +} + +// Pairs returns all named pairs of scrapers and discoverers from registry must be locked +func (s *Service) Pairs() []Pair { + s.mu.Lock() + defer s.mu.Unlock() + + return s.pairs() +} diff --git a/services/serverset/config.go b/services/serverset/config.go new file mode 100644 index 0000000000..6ea8149350 --- /dev/null +++ b/services/serverset/config.go @@ -0,0 +1,64 @@ +package serverset + +import ( + "fmt" + "strings" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a Serverset service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Servers []string `toml:"servers" override:"servers"` + Paths []string `toml:"paths" override:"paths"` + Timeout toml.Duration `toml:"timeout" override:"timeout"` +} + +// Init adds default values to Serverset configuration +func (s *Config) Init() { + s.Timeout = toml.Duration(10 * time.Second) +} + +// Validate validates Serverset configuration +func (s Config) Validate() error { + if s.ID == "" { + return fmt.Errorf("server set discovery must be given a ID") + } + for _, path := range s.Paths { + if !strings.HasPrefix(path, "/") { + return fmt.Errorf("serverset discovery config paths must begin with '/': %s", path) + } + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (s Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.ServersetSDConfigs = []*config.ServersetSDConfig{ + s.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (s Config) PromConfig() *config.ServersetSDConfig { + return &config.ServersetSDConfig{ + Servers: s.Servers, + Paths: s.Paths, + Timeout: model.Duration(s.Timeout), + } +} + +// Service return discoverer type +func (s Config) Service() string { + return "serverset" +} + +// ServiceID returns the discoverers name +func (s Config) ServiceID() string { + return s.ID +} diff --git a/services/serverset/service.go b/services/serverset/service.go new file mode 100644 index 0000000000..24d00ec289 --- /dev/null +++ b/services/serverset/service.go @@ -0,0 +1,153 @@ +package serverset + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + pzookeeper "github.com/prometheus/prometheus/discovery/zookeeper" +) + +// Service is the serverset discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := pzookeeper.NewServersetDiscovery(sd, scraper.NewLogger(s.logger)) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/static/config.go b/services/static/config.go new file mode 100644 index 0000000000..60a2c47be2 --- /dev/null +++ b/services/static/config.go @@ -0,0 +1,73 @@ +package static + +import ( + "fmt" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a static list of of labeled target groups +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + // Targets is a list of targets identified by a label set. Each target is + // uniquely identifiable in the group by its address label. + Targets []map[string]string `toml:"targets" override:"targets"` + // Labels is a set of labels that is common across all targets in the group. + Labels map[string]string `toml:"labels" override:"labels"` +} + +// Init the static configuration to an empty set of structures +func (s *Config) Init() { + s.Targets = []map[string]string{} + s.Labels = map[string]string{} +} + +// Validate validates Static configuration values +func (s Config) Validate() error { + if s.ID == "" { + return fmt.Errorf("azure discovery must be given a ID") + } + return nil +} + +// Prom writes the prometheus configuration for discoverer into ScrapeConfig +func (s Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.StaticConfigs = s.PromConfig() +} + +// PromConfig returns the prometheus configuration for this discoverer +func (s Config) PromConfig() []*config.TargetGroup { + set := func(l map[string]string) model.LabelSet { + res := make(model.LabelSet) + for k, v := range l { + res[model.LabelName(k)] = model.LabelValue(v) + } + return res + } + target := func(t []map[string]string) []model.LabelSet { + res := make([]model.LabelSet, len(t)) + for i, l := range t { + res[i] = set(l) + } + return res + } + return []*config.TargetGroup{ + &config.TargetGroup{ + Targets: target(s.Targets), + Labels: set(s.Labels), + Source: s.ID, + }, + } +} + +// Service return discoverer type +func (s Config) Service() string { + return "static" +} + +// ServiceID returns the discoverers name +func (s Config) ServiceID() string { + return s.ID +} diff --git a/services/static/service.go b/services/static/service.go new file mode 100644 index 0000000000..f32a031ae8 --- /dev/null +++ b/services/static/service.go @@ -0,0 +1,153 @@ +package static + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" +) + +// Service is the static discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer := discovery.NewStaticProvider(sd) + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + var err error + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/services/triton/config.go b/services/triton/config.go new file mode 100644 index 0000000000..d246bd4336 --- /dev/null +++ b/services/triton/config.go @@ -0,0 +1,83 @@ +package triton + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/toml" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Config is a Triton service discovery configuration +type Config struct { + Enabled bool `toml:"enabled" override:"enabled"` + ID string `toml:"id" override:"id"` + Account string `toml:"account" override:"account"` + DNSSuffix string `toml:"dns-suffix" override:"dns-suffix"` + Endpoint string `toml:"endpoint" override:"endpoint"` + Port int `toml:"port" override:"port"` + RefreshInterval toml.Duration `toml:"refresh-interval" override:"refresh-interval"` + Version int `toml:"version" override:"version"` + // Path to CA file + SSLCA string `toml:"ssl-ca" override:"ssl-ca"` + // Path to host cert file + SSLCert string `toml:"ssl-cert" override:"ssl-cert"` + // Path to cert key file + SSLKey string `toml:"ssl-key" override:"ssl-key"` + // SSLServerName is used to verify the hostname for the targets. + SSLServerName string `toml:"ssl-server-name" override:"ssl-server-name"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` +} + +// Init adds default values to Triton configuration +func (t *Config) Init() { + t.Port = 9163 + t.RefreshInterval = toml.Duration(60 * time.Second) + t.Version = 1 +} + +// Validate validates Triton configuration values +func (t Config) Validate() error { + if t.ID == "" { + return fmt.Errorf("triton discovery must be given an ID") + } + return nil +} + +// Prom creates a prometheus configuration for Triton +func (t Config) Prom(c *config.ScrapeConfig) { + c.ServiceDiscoveryConfig.TritonSDConfigs = []*config.TritonSDConfig{ + t.PromConfig(), + } +} + +// PromConfig returns the prometheus configuration for this discoverer +func (t Config) PromConfig() *config.TritonSDConfig { + return &config.TritonSDConfig{ + Account: t.Account, + DNSSuffix: t.DNSSuffix, + Endpoint: t.Endpoint, + Port: t.Port, + RefreshInterval: model.Duration(t.RefreshInterval), + Version: t.Version, + TLSConfig: config.TLSConfig{ + CAFile: t.SSLCA, + CertFile: t.SSLCert, + KeyFile: t.SSLKey, + ServerName: t.SSLServerName, + InsecureSkipVerify: t.InsecureSkipVerify, + }, + } +} + +// Service return discoverer type +func (t Config) Service() string { + return "triton" +} + +// ServiceID returns the discoverers name +func (t Config) ServiceID() string { + return t.ID +} diff --git a/services/triton/service.go b/services/triton/service.go new file mode 100644 index 0000000000..3c1b756ecc --- /dev/null +++ b/services/triton/service.go @@ -0,0 +1,155 @@ +package triton + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/kapacitor/services/scraper" + "github.com/prometheus/prometheus/config" + ptriton "github.com/prometheus/prometheus/discovery/triton" +) + +// Service is the triton discovery service +type Service struct { + Configs []Config + mu sync.Mutex + + registry scraper.Registry + + logger *log.Logger + open bool +} + +// NewService creates a new unopened service +func NewService(c []Config, r scraper.Registry, l *log.Logger) *Service { + return &Service{ + Configs: c, + registry: r, + logger: l, + } +} + +// Open starts the service +func (s *Service) Open() error { + if s.open { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.open = true + s.register() + + s.logger.Println("I! opened service") + return s.registry.Commit() +} + +// Close stops the service +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.open { + return nil + } + + s.open = false + s.deregister() + + s.logger.Println("I! closed service") + return s.registry.Commit() +} + +func (s *Service) deregister() { + // Remove all the configurations in the registry + for _, d := range s.Configs { + s.registry.RemoveDiscoverer(&d) + } +} + +func (s *Service) register() { + // Add all configurations to registry + for _, d := range s.Configs { + if d.Enabled { + s.registry.AddDiscoverer(&d) + } + } +} + +// Update updates configuration while running +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + configs := make([]Config, len(newConfigs)) + for i, c := range newConfigs { + if config, ok := c.(Config); ok { + configs[i] = config + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", c, config) + } + } + + s.deregister() + s.Configs = configs + s.register() + + return s.registry.Commit() +} + +type testOptions struct { + ID string `json:"id"` +} + +// TestOptions returns an object that is in turn passed to Test. +func (s *Service) TestOptions() interface{} { + return &testOptions{} +} + +// Test a service with the provided options. +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + + found := -1 + for i := range s.Configs { + if s.Configs[i].ID == o.ID && s.Configs[i].Enabled { + found = i + } + } + if found < 0 { + return fmt.Errorf("discoverer %q is not enabled or does not exist", o.ID) + } + + sd := s.Configs[found].PromConfig() + discoverer, err := ptriton.New(scraper.NewLogger(s.logger), sd) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + updates := make(chan []*config.TargetGroup) + go discoverer.Run(ctx, updates) + + select { + case _, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + err = fmt.Errorf("discoverer %q exited ", o.ID) + } + break + case <-time.After(30 * time.Second): + err = fmt.Errorf("timeout waiting for discoverer %q to return", o.ID) + break + } + cancel() + + return err +} diff --git a/task_master.go b/task_master.go index 6f032f0e02..957af6f8b6 100644 --- a/task_master.go +++ b/task_master.go @@ -134,7 +134,7 @@ type TaskMaster struct { NewTimer(timer.Setter) timer.Timer } K8sService interface { - Client() (k8s.Client, error) + Client(string) (k8s.Client, error) } LogService LogService diff --git a/test.sh b/test.sh index 83b952dbc6..4f84d34bad 100755 --- a/test.sh +++ b/test.sh @@ -140,24 +140,24 @@ fi case $ENVIRONMENT_INDEX in 0) # 64 bit tests - run_test_docker Dockerfile_build_ubuntu64 test_64bit --debug --test --generate $no_uncommitted_arg + run_test_docker Dockerfile_build_ubuntu64 test_64bit --test --generate $no_uncommitted_arg rc=$? ;; 1) # 64 bit race tests GORACE="halt_on_error=1" - run_test_docker Dockerfile_build_ubuntu64 test_64bit_race --debug --test --generate $no_uncommitted_arg --race + run_test_docker Dockerfile_build_ubuntu64 test_64bit_race --test --generate $no_uncommitted_arg --race rc=$? ;; 2) # 32 bit tests - run_test_docker Dockerfile_build_ubuntu32 test_32bit --debug --test --generate $no_uncommitted_arg --arch=i386 + run_test_docker Dockerfile_build_ubuntu32 test_32bit --test --generate $no_uncommitted_arg --arch=i386 rc=$? ;; #3) # # 64 bit tests on golang HEAD # GO_CHECKOUT=HEAD - # run_test_docker Dockerfile_build_ubuntu64_git test_64bit_go_tip --debug --test --generate $no_uncommitted_arg + # run_test_docker Dockerfile_build_ubuntu64_git test_64bit_go_tip --test --generate $no_uncommitted_arg # rc=$? # ;; "save") diff --git a/vendor/github.com/prometheus/prometheus/MAINTAINERS.md b/vendor/github.com/prometheus/prometheus/MAINTAINERS.md index c55b0530ef..73e24d439f 100644 --- a/vendor/github.com/prometheus/prometheus/MAINTAINERS.md +++ b/vendor/github.com/prometheus/prometheus/MAINTAINERS.md @@ -1,7 +1,7 @@ Maintainers of this repository with their focus areas: -* Björn Rabenstein : Local storage; general code-level issues. -* Brian Brazil : Console templates; semantics of PromQL, service discovery, and relabeling. -* Fabian Reinartz : PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery. -* Julius Volz : Remote storage integrations; web UI. +* Björn Rabenstein @beorn7: Local storage; general code-level issues. +* Brian Brazil @brian-brazil: Console templates; semantics of PromQL, service discovery, and relabeling. +* Fabian Reinartz @fabxc: PromQL parsing and evaluation; implementation of retrieval, alert notification, and service discovery. +* Julius Volz @juliusv: Remote storage integrations; web UI. diff --git a/vendor/github.com/prometheus/prometheus/circle.yml b/vendor/github.com/prometheus/prometheus/circle.yml index 4f9519a0dd..7e09a2cbed 100644 --- a/vendor/github.com/prometheus/prometheus/circle.yml +++ b/vendor/github.com/prometheus/prometheus/circle.yml @@ -49,6 +49,7 @@ deployment: owner: prometheus commands: - promu crossbuild tarballs + - promu checksum .tarballs - promu release .tarballs - mkdir $CIRCLE_ARTIFACTS/releases/ && cp -a .tarballs/* $CIRCLE_ARTIFACTS/releases/ - docker login -e $DOCKER_EMAIL -u $DOCKER_LOGIN -p $DOCKER_PASSWORD diff --git a/vendor/github.com/prometheus/prometheus/discovery/azure/azure.go b/vendor/github.com/prometheus/prometheus/discovery/azure/azure.go index 34ee5de4de..27c40e0ffe 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/azure/azure.go +++ b/vendor/github.com/prometheus/prometheus/discovery/azure/azure.go @@ -66,14 +66,16 @@ type Discovery struct { cfg *config.AzureSDConfig interval time.Duration port int + logger log.Logger } // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewDiscovery(cfg *config.AzureSDConfig) *Discovery { +func NewDiscovery(cfg *config.AzureSDConfig, logger log.Logger) *Discovery { return &Discovery{ cfg: cfg, interval: time.Duration(cfg.RefreshInterval), port: cfg.Port, + logger: logger, } } @@ -91,7 +93,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { tg, err := d.refresh() if err != nil { - log.Errorf("unable to refresh during Azure discovery: %s", err) + d.logger.Errorf("unable to refresh during Azure discovery: %s", err) } else { select { case <-ctx.Done(): @@ -141,13 +143,13 @@ type azureResource struct { } // Create a new azureResource object from an ID string. -func newAzureResourceFromID(id string) (azureResource, error) { +func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error) { // Resource IDs have the following format. // /subscriptions/SUBSCRIPTION_ID/resourceGroups/RESOURCE_GROUP/providers/PROVIDER/TYPE/NAME s := strings.Split(id, "/") if len(s) != 9 { err := fmt.Errorf("invalid ID '%s'. Refusing to create azureResource", id) - log.Error(err) + logger.Error(err) return azureResource{}, err } return azureResource{ @@ -185,7 +187,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { } machines = append(machines, *result.Value...) } - log.Debugf("Found %d virtual machines during Azure discovery.", len(machines)) + d.logger.Debugf("Found %d virtual machines during Azure discovery.", len(machines)) // We have the slice of machines. Now turn them into targets. // Doing them in go routines because the network interface calls are slow. @@ -197,7 +199,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { ch := make(chan target, len(machines)) for i, vm := range machines { go func(i int, vm compute.VirtualMachine) { - r, err := newAzureResourceFromID(*vm.ID) + r, err := newAzureResourceFromID(*vm.ID, d.logger) if err != nil { ch <- target{labelSet: nil, err: err} return @@ -219,14 +221,14 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { // Get the IP address information via separate call to the network provider. for _, nic := range *vm.Properties.NetworkProfile.NetworkInterfaces { - r, err := newAzureResourceFromID(*nic.ID) + r, err := newAzureResourceFromID(*nic.ID, d.logger) if err != nil { ch <- target{labelSet: nil, err: err} return } networkInterface, err := client.nic.Get(r.ResourceGroup, r.Name, "") if err != nil { - log.Errorf("Unable to get network interface %s: %s", r.Name, err) + d.logger.Errorf("Unable to get network interface %s: %s", r.Name, err) ch <- target{labelSet: nil, err: err} // Get out of this routine because we cannot continue without a network interface. return @@ -237,7 +239,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { // yet support this. On deallocated machines, this value happens to be nil so it // is a cheap and easy way to determine if a machine is allocated or not. if networkInterface.Properties.Primary == nil { - log.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name) + d.logger.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name) ch <- target{} return } @@ -272,6 +274,6 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { } } - log.Debugf("Azure discovery completed.") + d.logger.Debugf("Azure discovery completed.") return tg, nil } diff --git a/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go b/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go index 562a2700d3..5b30f992e2 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go +++ b/vendor/github.com/prometheus/prometheus/discovery/consul/consul.go @@ -89,10 +89,11 @@ type Discovery struct { clientDatacenter string tagSeparator string watchedServices []string // Set of services which will be discovered. + logger log.Logger } // NewDiscovery returns a new Discovery for the given config. -func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.ConsulSDConfig, logger log.Logger) (*Discovery, error) { tls, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -121,6 +122,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { tagSeparator: conf.TagSeparator, watchedServices: conf.Services, clientDatacenter: clientConf.Datacenter, + logger: logger, } return cd, nil } @@ -163,7 +165,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } if err != nil { - log.Errorf("Error refreshing service list: %s", err) + d.logger.Errorf("Error refreshing service list: %s", err) rpcFailuresCount.Inc() time.Sleep(retryInterval) continue @@ -179,7 +181,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { if d.clientDatacenter == "" { info, err := d.client.Agent().Self() if err != nil { - log.Errorf("Error retrieving datacenter name: %s", err) + d.logger.Errorf("Error retrieving datacenter name: %s", err) time.Sleep(retryInterval) continue } @@ -203,6 +205,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { datacenterLabel: model.LabelValue(d.clientDatacenter), }, tagSeparator: d.tagSeparator, + logger: d.logger, } wctx, cancel := context.WithCancel(ctx) @@ -235,6 +238,7 @@ type consulService struct { labels model.LabelSet client *consul.Client tagSeparator string + logger log.Logger } func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) { @@ -258,7 +262,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG } if err != nil { - log.Errorf("Error refreshing service %s: %s", srv.name, err) + srv.logger.Errorf("Error refreshing service %s: %s", srv.name, err) rpcFailuresCount.Inc() time.Sleep(retryInterval) continue diff --git a/vendor/github.com/prometheus/prometheus/discovery/consul/consul_test.go b/vendor/github.com/prometheus/prometheus/discovery/consul/consul_test.go index 2040879b90..1e7cfd5292 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/consul/consul_test.go +++ b/vendor/github.com/prometheus/prometheus/discovery/consul/consul_test.go @@ -16,13 +16,14 @@ package consul import ( "testing" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" ) func TestConfiguredService(t *testing.T) { conf := &config.ConsulSDConfig{ Services: []string{"configuredServiceName"}} - consulDiscovery, err := NewDiscovery(conf) + consulDiscovery, err := NewDiscovery(conf, log.Base()) if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) @@ -37,7 +38,7 @@ func TestConfiguredService(t *testing.T) { func TestNonConfiguredService(t *testing.T) { conf := &config.ConsulSDConfig{} - consulDiscovery, err := NewDiscovery(conf) + consulDiscovery, err := NewDiscovery(conf, log.Base()) if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) diff --git a/vendor/github.com/prometheus/prometheus/discovery/discovery.go b/vendor/github.com/prometheus/prometheus/discovery/discovery.go index 91d7eb7ab5..43485178a9 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/discovery.go +++ b/vendor/github.com/prometheus/prometheus/discovery/discovery.go @@ -50,7 +50,7 @@ type TargetProvider interface { } // ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetProvider { +func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider { providers := map[string]TargetProvider{} app := func(mech string, i int, tp TargetProvider) { @@ -58,59 +58,59 @@ func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetPro } for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(c)) + app("dns", i, dns.NewDiscovery(c, logger)) } for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c)) + app("file", i, file.NewDiscovery(c, logger)) } for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c) + k, err := consul.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot create Consul discovery: %s", err) + logger.Errorf("Cannot create Consul discovery: %s", err) continue } app("consul", i, k) } for i, c := range cfg.MarathonSDConfigs { - m, err := marathon.NewDiscovery(c) + m, err := marathon.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot create Marathon discovery: %s", err) + logger.Errorf("Cannot create Marathon discovery: %s", err) continue } app("marathon", i, m) } for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.Base(), c) + k, err := kubernetes.New(logger, c) if err != nil { - log.Errorf("Cannot create Kubernetes discovery: %s", err) + logger.Errorf("Cannot create Kubernetes discovery: %s", err) continue } app("kubernetes", i, k) } for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c)) + app("serverset", i, zookeeper.NewServersetDiscovery(c, logger)) } for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c)) + app("nerve", i, zookeeper.NewNerveDiscovery(c, logger)) } for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c)) + app("ec2", i, ec2.NewDiscovery(c, logger)) } for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(c) + gced, err := gce.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot initialize GCE discovery: %s", err) + logger.Errorf("Cannot initialize GCE discovery: %s", err) continue } app("gce", i, gced) } for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c)) + app("azure", i, azure.NewDiscovery(c, logger)) } for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With("sd", "triton"), c) + t, err := triton.New(logger.With("sd", "triton"), c) if err != nil { - log.Errorf("Cannot create Triton discovery: %s", err) + logger.Errorf("Cannot create Triton discovery: %s", err) continue } app("triton", i, t) diff --git a/vendor/github.com/prometheus/prometheus/discovery/discovery_test.go b/vendor/github.com/prometheus/prometheus/discovery/discovery_test.go index d74b1df5e2..b98191c72f 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/discovery_test.go +++ b/vendor/github.com/prometheus/prometheus/discovery/discovery_test.go @@ -16,6 +16,7 @@ package discovery import ( "testing" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" "golang.org/x/net/context" yaml "gopkg.in/yaml.v2" @@ -53,7 +54,7 @@ static_configs: go ts.Run(ctx) - ts.UpdateProviders(ProvidersFromConfig(*cfg)) + ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base())) <-called verifyPresence(ts.tgroups, "static/0/0", true) @@ -67,7 +68,7 @@ static_configs: t.Fatalf("Unable to load YAML config sTwo: %s", err) } - ts.UpdateProviders(ProvidersFromConfig(*cfg)) + ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base())) <-called verifyPresence(ts.tgroups, "static/0/0", true) diff --git a/vendor/github.com/prometheus/prometheus/discovery/dns/dns.go b/vendor/github.com/prometheus/prometheus/discovery/dns/dns.go index d5d506dedc..61ab2dd3a3 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/dns/dns.go +++ b/vendor/github.com/prometheus/prometheus/discovery/dns/dns.go @@ -66,10 +66,11 @@ type Discovery struct { interval time.Duration port int qtype uint16 + logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.DNSSDConfig) *Discovery { +func NewDiscovery(conf *config.DNSSDConfig, logger log.Logger) *Discovery { qtype := dns.TypeSRV switch strings.ToUpper(conf.Type) { case "A": @@ -84,6 +85,7 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { interval: time.Duration(conf.RefreshInterval), qtype: qtype, port: conf.Port, + logger: logger, } } @@ -112,7 +114,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr for _, name := range d.names { go func(n string) { if err := d.refresh(ctx, n, ch); err != nil { - log.Errorf("Error refreshing DNS targets: %s", err) + d.logger.Errorf("Error refreshing DNS targets: %s", err) } wg.Done() }(name) @@ -122,7 +124,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr } func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error { - response, err := lookupAll(name, d.qtype) + response, err := lookupAll(name, d.qtype, d.logger) dnsSDLookupsCount.Inc() if err != nil { dnsSDLookupFailuresCount.Inc() @@ -147,7 +149,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi case *dns.AAAA: target = hostPort(addr.AAAA.String(), d.port) default: - log.Warnf("%q is not a valid SRV record", record) + d.logger.Warnf("%q is not a valid SRV record", record) continue } @@ -167,7 +169,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi return nil } -func lookupAll(name string, qtype uint16) (*dns.Msg, error) { +func lookupAll(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) { conf, err := dns.ClientConfigFromFile(resolvConf) if err != nil { return nil, fmt.Errorf("could not load resolv.conf: %s", err) @@ -181,7 +183,7 @@ func lookupAll(name string, qtype uint16) (*dns.Msg, error) { for _, lname := range conf.NameList(name) { response, err = lookup(lname, qtype, client, servAddr, false) if err != nil { - log. + logger. With("server", server). With("name", name). With("reason", err). diff --git a/vendor/github.com/prometheus/prometheus/discovery/ec2/ec2.go b/vendor/github.com/prometheus/prometheus/discovery/ec2/ec2.go index 56fa07f9a3..853fbe5d59 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/ec2/ec2.go +++ b/vendor/github.com/prometheus/prometheus/discovery/ec2/ec2.go @@ -72,10 +72,11 @@ type Discovery struct { interval time.Duration profile string port int + logger log.Logger } // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.EC2SDConfig) *Discovery { +func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery { creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "") if conf.AccessKey == "" && conf.SecretKey == "" { creds = nil @@ -88,6 +89,7 @@ func NewDiscovery(conf *config.EC2SDConfig) *Discovery { profile: conf.Profile, interval: time.Duration(conf.RefreshInterval), port: conf.Port, + logger: logger, } } @@ -99,7 +101,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Get an initial set right away. tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) } else { select { case ch <- []*config.TargetGroup{tg}: @@ -113,7 +115,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-ticker.C: tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) continue } diff --git a/vendor/github.com/prometheus/prometheus/discovery/file/file.go b/vendor/github.com/prometheus/prometheus/discovery/file/file.go index 7e4d11eb42..b74c6e5102 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/file/file.go +++ b/vendor/github.com/prometheus/prometheus/discovery/file/file.go @@ -63,13 +63,15 @@ type Discovery struct { // and how many target groups they contained. // This is used to detect deleted target groups. lastRefresh map[string]int + logger log.Logger } // NewDiscovery returns a new file discovery for the given paths. -func NewDiscovery(conf *config.FileSDConfig) *Discovery { +func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery { return &Discovery{ paths: conf.Files, interval: time.Duration(conf.RefreshInterval), + logger: logger, } } @@ -79,7 +81,7 @@ func (d *Discovery) listFiles() []string { for _, p := range d.paths { files, err := filepath.Glob(p) if err != nil { - log.Errorf("Error expanding glob %q: %s", p, err) + d.logger.Errorf("Error expanding glob %q: %s", p, err) continue } paths = append(paths, files...) @@ -100,7 +102,7 @@ func (d *Discovery) watchFiles() { p = "./" } if err := d.watcher.Add(p); err != nil { - log.Errorf("Error adding file watch for %q: %s", p, err) + d.logger.Errorf("Error adding file watch for %q: %s", p, err) } } } @@ -111,7 +113,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { watcher, err := fsnotify.NewWatcher() if err != nil { - log.Errorf("Error creating file watcher: %s", err) + d.logger.Errorf("Error creating file watcher: %s", err) return } d.watcher = watcher @@ -149,7 +151,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case err := <-d.watcher.Errors: if err != nil { - log.Errorf("Error on file watch: %s", err) + d.logger.Errorf("Error on file watch: %s", err) } } } @@ -157,7 +159,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // stop shuts down the file watcher. func (d *Discovery) stop() { - log.Debugf("Stopping file discovery for %s...", d.paths) + d.logger.Debugf("Stopping file discovery for %s...", d.paths) done := make(chan struct{}) defer close(done) @@ -175,10 +177,10 @@ func (d *Discovery) stop() { } }() if err := d.watcher.Close(); err != nil { - log.Errorf("Error closing file watcher for %s: %s", d.paths, err) + d.logger.Errorf("Error closing file watcher for %s: %s", d.paths, err) } - log.Debugf("File discovery for %s stopped.", d.paths) + d.logger.Debugf("File discovery for %s stopped.", d.paths) } // refresh reads all files matching the discovery's patterns and sends the respective @@ -194,7 +196,7 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup tgroups, err := readFile(p) if err != nil { fileSDReadErrorsCount.Inc() - log.Errorf("Error reading file %q: %s", p, err) + d.logger.Errorf("Error reading file %q: %s", p, err) // Prevent deletion down below. ref[p] = d.lastRefresh[p] continue diff --git a/vendor/github.com/prometheus/prometheus/discovery/file/file_test.go b/vendor/github.com/prometheus/prometheus/discovery/file/file_test.go index 39ce91f476..3dbc6ae8a8 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/file/file_test.go +++ b/vendor/github.com/prometheus/prometheus/discovery/file/file_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" @@ -41,7 +42,7 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewDiscovery(&conf) + fsd = NewDiscovery(&conf, log.Base()) ch = make(chan []*config.TargetGroup) ctx, cancel = context.WithCancel(context.Background()) ) diff --git a/vendor/github.com/prometheus/prometheus/discovery/gce/gce.go b/vendor/github.com/prometheus/prometheus/discovery/gce/gce.go index 44ac349fea..2b6df1b916 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/gce/gce.go +++ b/vendor/github.com/prometheus/prometheus/discovery/gce/gce.go @@ -76,10 +76,11 @@ type Discovery struct { interval time.Duration port int tagSeparator string + logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.GCESDConfig, logger log.Logger) (*Discovery, error) { gd := &Discovery{ project: conf.Project, zone: conf.Zone, @@ -87,6 +88,7 @@ func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) { interval: time.Duration(conf.RefreshInterval), port: conf.Port, tagSeparator: conf.TagSeparator, + logger: logger, } var err error gd.client, err = google.DefaultClient(oauth2.NoContext, compute.ComputeReadonlyScope) @@ -106,7 +108,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Get an initial set right away. tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) } else { select { case ch <- []*config.TargetGroup{tg}: @@ -122,7 +124,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-ticker.C: tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) continue } select { diff --git a/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon.go b/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon.go index 5d715d2b77..2e831ef607 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon.go +++ b/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon.go @@ -85,10 +85,11 @@ type Discovery struct { lastRefresh map[string]*config.TargetGroup appsClient AppListClient token string + logger log.Logger } // NewDiscovery returns a new Marathon Discovery. -func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.MarathonSDConfig, logger log.Logger) (*Discovery, error) { tls, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -116,6 +117,7 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { refreshInterval: time.Duration(conf.RefreshInterval), appsClient: fetchApps, token: token, + logger: logger, }, nil } @@ -128,7 +130,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-time.After(d.refreshInterval): err := d.updateServices(ctx, ch) if err != nil { - log.Errorf("Error while updating services: %s", err) + d.logger.Errorf("Error while updating services: %s", err) } } } @@ -167,7 +169,7 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Targ case <-ctx.Done(): return ctx.Err() case ch <- []*config.TargetGroup{{Source: source}}: - log.Debugf("Removing group for %s", source) + d.logger.Debugf("Removing group for %s", source) } } } diff --git a/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon_test.go b/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon_test.go index 913380ad61..95e420dd88 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon_test.go +++ b/vendor/github.com/prometheus/prometheus/discovery/marathon/marathon_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" @@ -32,7 +33,7 @@ var ( ) func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error { - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { return err } @@ -140,7 +141,7 @@ func TestMarathonSDSendGroup(t *testing.T) { func TestMarathonSDRemoveApp(t *testing.T) { var ch = make(chan []*config.TargetGroup, 1) - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { t.Fatalf("%s", err) } @@ -176,7 +177,7 @@ func TestMarathonSDRunAndStop(t *testing.T) { ch = make(chan []*config.TargetGroup) doneCh = make(chan error) ) - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { t.Fatalf("%s", err) } diff --git a/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go b/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go index 205eff7262..a0c7c5fd02 100644 --- a/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go +++ b/vendor/github.com/prometheus/prometheus/discovery/zookeeper/zookeeper.go @@ -24,6 +24,7 @@ import ( "github.com/samuel/go-zookeeper/zk" "golang.org/x/net/context" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/treecache" @@ -39,17 +40,18 @@ type Discovery struct { updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache - parse func(data []byte, path string) (model.LabelSet, error) + parse func(data []byte, path string) (model.LabelSet, error) + logger log.Logger } // NewNerveDiscovery returns a new Discovery for the given Nerve config. -func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery { - return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) +func NewNerveDiscovery(conf *config.NerveSDConfig, logger log.Logger) *Discovery { + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember) } // NewServersetDiscovery returns a new Discovery for the given serverset config. -func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery { - return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) +func NewServersetDiscovery(conf *config.ServersetSDConfig, logger log.Logger) *Discovery { + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember) } // NewDiscovery returns a new discovery along Zookeeper parses with @@ -58,6 +60,7 @@ func NewDiscovery( srvs []string, timeout time.Duration, paths []string, + logger log.Logger, pf func(data []byte, path string) (model.LabelSet, error), ) *Discovery { conn, _, err := zk.Connect(srvs, timeout) @@ -71,6 +74,7 @@ func NewDiscovery( updates: updates, sources: map[string]*config.TargetGroup{}, parse: pf, + logger: logger, } for _, path := range paths { sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) diff --git a/vendor/github.com/prometheus/prometheus/retrieval/targetmanager.go b/vendor/github.com/prometheus/prometheus/retrieval/targetmanager.go index 1798355a5e..791d59a055 100644 --- a/vendor/github.com/prometheus/prometheus/retrieval/targetmanager.go +++ b/vendor/github.com/prometheus/prometheus/retrieval/targetmanager.go @@ -38,6 +38,7 @@ type TargetManager struct { // Set of unqiue targets by scrape configuration. targetSets map[string]*targetSet + logger log.Logger } type targetSet struct { @@ -49,16 +50,17 @@ type targetSet struct { } // NewTargetManager creates a new TargetManager. -func NewTargetManager(app storage.SampleAppender) *TargetManager { +func NewTargetManager(app storage.SampleAppender, logger log.Logger) *TargetManager { return &TargetManager{ appender: app, targetSets: map[string]*targetSet{}, + logger: logger, } } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { - log.Info("Starting target manager...") + tm.logger.Info("Starting target manager...") tm.mtx.Lock() @@ -72,7 +74,7 @@ func (tm *TargetManager) Run() { // Stop all background processing. func (tm *TargetManager) Stop() { - log.Infoln("Stopping target manager...") + tm.logger.Infoln("Stopping target manager...") tm.mtx.Lock() // Cancel the base context, this will cause all target providers to shut down @@ -84,7 +86,7 @@ func (tm *TargetManager) Stop() { // Wait for all scrape inserts to complete. tm.wg.Wait() - log.Debugln("Target manager stopped") + tm.logger.Debugln("Target manager stopped") } func (tm *TargetManager) reload() { @@ -118,7 +120,7 @@ func (tm *TargetManager) reload() { } else { ts.sp.reload(scfg) } - ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig)) + ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger)) } // Remove old target sets. Waiting for scrape pools to complete pending