From 551b0f85108f254aecc69db3381b7fc8a34c0d46 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 15:23:57 -0600 Subject: [PATCH 1/7] add API endpoint to refresh subscriptions --- CHANGELOG.md | 1 + server/server.go | 1 + services/influxdb/service.go | 38 ++++++++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b07f1cff..e7e6a7b55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ The corresponding alert states are: - [#740](https://github.com/influxdata/kapacitor/pull/740): Support reset expressions to prevent an alert from being lowered in severity. Thanks @minhdanh! - [#670](https://github.com/influxdata/kapacitor/issues/670): Add ability to supress OK recovery alert events. +- [#](https://github.com/influxdata/kapacitor/pull/): Add API endpoint for refreshing subscriptions. ### Bugfixes diff --git a/server/server.go b/server/server.go index 751882ef7..8b3e240d6 100644 --- a/server/server.go +++ b/server/server.go @@ -217,6 +217,7 @@ func (s *Server) appendInfluxDBService() error { return errors.Wrap(err, "failed to get http port") } srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, s.config.HTTP.AuthEnabled, l) + srv.HTTPDService = s.HTTPDService srv.PointsWriter = s.TaskMaster srv.LogService = s.LogService srv.AuthService = s.AuthService diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 591ae23b5..4c0bd7da0 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "log" "net" + "net/http" "net/url" "strconv" "strings" @@ -34,12 +35,17 @@ const ( // Size in bytes of a token for subscription authentication tokenSize = 64 + + // API endpoint paths + subscriptionsPath = "/subscriptions" + subscriptionsPathAnchored = "/subscriptions/" ) // Handles requests to write or read from an InfluxDB cluster type Service struct { defaultInfluxDB string clusters map[string]*influxdbCluster + routes []httpd.Route PointsWriter interface { WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error @@ -47,6 +53,10 @@ type Service struct { LogService interface { NewLogger(string, int) *log.Logger } + HTTPDService interface { + AddRoutes([]httpd.Route) error + DelRoutes([]httpd.Route) + } ClientCreator interface { Create(influxdb.HTTPConfig) (influxdb.Client, error) } @@ -146,10 +156,38 @@ func (s *Service) Open() error { return err } } + + // Define API routes + s.routes = []httpd.Route{ + { + Name: "subscriptions", + Method: "POST", + Pattern: subscriptionsPath, + HandlerFunc: s.handleSubscriptions, + }, + } + + err := s.HTTPDService.AddRoutes(s.routes) + if err != nil { + return errors.Wrap(err, "adding API routes") + } return nil } +// Refresh the subscriptions linking for all clusters. +func (s *Service) handleSubscriptions(w http.ResponseWriter, r *http.Request) { + for _, cluster := range s.clusters { + err := cluster.linkSubscriptions() + if err != nil { + httpd.HttpError(w, fmt.Sprintf("failed to link subscriptions: %s", err.Error()), true, http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusNoContent) +} + func (s *Service) Close() error { + s.HTTPDService.DelRoutes(s.routes) var lastErr error for _, cluster := range s.clusters { err := cluster.Close() From 632b3a75da4f158bb1f3f62901c35d5e7efc7c90 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 15:27:22 -0600 Subject: [PATCH 2/7] better logging --- services/influxdb/service.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 4c0bd7da0..46d6b796d 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -116,6 +116,7 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string } runningSubs := make(map[subEntry]bool, len(c.Subscriptions)) clusters[c.Name] = &influxdbCluster{ + name: c.Name, configs: urls, configSubs: subs, exConfigSubs: exSubs, @@ -212,6 +213,7 @@ func (s *Service) NewNamedClient(name string) (influxdb.Client, error) { } type influxdbCluster struct { + name string configs []influxdb.HTTPConfig i int configSubs map[subEntry]bool @@ -329,7 +331,7 @@ func (s *influxdbCluster) NewClient() (c influxdb.Client, err error) { } func (s *influxdbCluster) linkSubscriptions() error { - s.logger.Println("D! linking subscriptions") + s.logger.Println("D! linking subscriptions for cluster", s.name) b := backoff.NewExponentialBackOff() b.MaxElapsedTime = s.startupTimeout ticker := backoff.NewTicker(b) From d2a262715bb466093a44f7931a5630f3c4e92d0e Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 17:12:23 -0600 Subject: [PATCH 3/7] fix issues with removing dropped subs, and adding deleted subs --- services/influxdb/service.go | 83 +++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 46d6b796d..c44773df2 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -115,6 +115,7 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string } } runningSubs := make(map[subEntry]bool, len(c.Subscriptions)) + services := make(map[subEntry]openCloser, len(c.Subscriptions)) clusters[c.Name] = &influxdbCluster{ name: c.Name, configs: urls, @@ -133,7 +134,9 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string disableSubs: c.DisableSubscriptions, protocol: c.SubscriptionProtocol, runningSubs: runningSubs, - useTokens: useTokens, + services: services, + // Do not use tokens for non http protocols + useTokens: useTokens && (c.SubscriptionProtocol == "http" || c.SubscriptionProtocol == "https"), } if defaultInfluxDB == i { defaultInfluxDBName = c.Name @@ -250,14 +253,16 @@ type influxdbCluster struct { RevokeSubscriptionAccess(token string) error } - services []interface { - Open() error - Close() error - } + services map[subEntry]openCloser mu sync.Mutex } +type openCloser interface { + Open() error + Close() error +} + type subEntry struct { cluster string rp string @@ -273,7 +278,6 @@ func (s *influxdbCluster) Open() error { s.mu.Lock() defer s.mu.Unlock() if !s.disableSubs { - err := s.linkSubscriptions() if s.subscriptionSyncInterval != 0 { s.subSyncTicker = time.NewTicker(s.subscriptionSyncInterval) go func() { @@ -282,9 +286,8 @@ func (s *influxdbCluster) Open() error { } }() } - return err } - return nil + return s.linkSubscriptions() } func (s *influxdbCluster) Close() error { @@ -331,6 +334,9 @@ func (s *influxdbCluster) NewClient() (c influxdb.Client, err error) { } func (s *influxdbCluster) linkSubscriptions() error { + if s.disableSubs { + return nil + } s.logger.Println("D! linking subscriptions for cluster", s.name) b := backoff.NewExponentialBackOff() b.MaxElapsedTime = s.startupTimeout @@ -471,8 +477,9 @@ func (s *influxdbCluster) linkSubscriptions() error { // Check if the hostname, port or protocol have changed if host != s.hostname || u.Scheme != s.protocol || - ((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) || - (s.useTokens && (u.User == nil || u.User.Username() != httpd.SubscriptionUser)) { + ((u.Scheme == "http" || u.Scheme == "https") && + (int(pn) != s.httpPort || + (s.useTokens && (u.User == nil || u.User.Username() != httpd.SubscriptionUser)))) { // Remove access for changing subscriptions. if u.User != nil { if p, ok := u.User.Password(); ok { @@ -481,6 +488,7 @@ func (s *influxdbCluster) linkSubscriptions() error { } // Something changed, drop the sub and let it get recreated s.dropSub(cli, se.name, se.cluster, se.rp) + s.runningSubs[se] = false } else { existingSubs[se] = si // Do not revoke tokens that are still in use @@ -496,7 +504,6 @@ func (s *influxdbCluster) linkSubscriptions() error { } // Compare to configured list - startedSubs := make(map[subEntry]bool) all := len(s.configSubs) == 0 for se, si := range existingSubs { if (s.configSubs[se] || all) && !s.exConfigSubs[se] && !s.runningSubs[se] { @@ -511,12 +518,11 @@ func (s *influxdbCluster) linkSubscriptions() error { if host == s.hostname { numSubscriptions++ if u.Scheme == "udp" { - _, err := s.startUDPListener(se.cluster, se.rp, port) + _, err := s.startUDPListener(se, port) if err != nil { s.logger.Println("E! failed to start UDP listener:", err) } } - startedSubs[se] = true s.runningSubs[se] = true break } @@ -524,9 +530,11 @@ func (s *influxdbCluster) linkSubscriptions() error { } } // create and start any new subscriptions + // stop any removed subscriptions for _, se := range allSubs { + _, exists := existingSubs[se] // If we have been configured to subscribe and the subscription is not started yet. - if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] && !s.runningSubs[se] { + if (s.configSubs[se] || all) && !s.exConfigSubs[se] && !(s.runningSubs[se] && exists) { var destination string switch s.protocol { case "http", "https": @@ -554,7 +562,7 @@ func (s *influxdbCluster) linkSubscriptions() error { destination = u.String() } case "udp": - addr, err := s.startUDPListener(se.cluster, se.rp, "0") + addr, err := s.startUDPListener(se, "0") if err != nil { s.logger.Println("E! failed to start UDP listener:", err) } @@ -563,11 +571,36 @@ func (s *influxdbCluster) linkSubscriptions() error { numSubscriptions++ - err = s.createSub(cli, se.name, se.cluster, se.rp, "ANY", []string{destination}) + mode := "ANY" + destinations := []string{destination} + err = s.createSub(cli, se.name, se.cluster, se.rp, mode, destinations) if err != nil { return err } + // Mark as running s.runningSubs[se] = true + // Add info to exiting set + existingSubs[se] = subInfo{ + Mode: mode, + Destinations: destinations, + } + } + } + for se, running := range s.runningSubs { + if !running { + continue + } + if _, exists := existingSubs[se]; !exists { + // Close the service and stop tracking it. + if service, ok := s.services[se]; ok { + s.logger.Println("D! deleting service for", se) + err := service.Close() + if err != nil { + s.logger.Printf("E! failed to close service for %v: %s", se, err) + } + } + delete(s.runningSubs, se) + delete(s.services, se) } } @@ -590,7 +623,7 @@ func (s *influxdbCluster) generateRandomToken() (string, error) { return base64.RawURLEncoding.EncodeToString(tokenBytes), nil } -func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode string, destinations []string) (err error) { +func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode string, destinations []string) error { var buf bytes.Buffer for i, dst := range destinations { if i != 0 { @@ -600,7 +633,7 @@ func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode buf.Write([]byte(dst)) buf.Write([]byte("'")) } - _, err = s.execQuery( + _, err := s.execQuery( cli, &influxql.CreateSubscriptionStatement{ Name: name, @@ -610,7 +643,7 @@ func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode Mode: strings.ToUpper(mode), }, ) - return + return errors.Wrapf(err, "creating sub %s for db %q and rp %q", name, cluster, rp) } func (s *influxdbCluster) dropSub(cli influxdb.Client, name, cluster, rp string) (err error) { @@ -625,24 +658,24 @@ func (s *influxdbCluster) dropSub(cli influxdb.Client, name, cluster, rp string) return } -func (s *influxdbCluster) startUDPListener(cluster, rp, port string) (*net.UDPAddr, error) { +func (s *influxdbCluster) startUDPListener(se subEntry, port string) (*net.UDPAddr, error) { c := udp.Config{} c.Enabled = true c.BindAddress = fmt.Sprintf("%s:%s", s.udpBind, port) - c.Database = cluster - c.RetentionPolicy = rp + c.Database = se.cluster + c.RetentionPolicy = se.rp c.Buffer = s.udpBuffer c.ReadBuffer = s.udpReadBuffer - l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags) + l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", se.cluster, se.rp), log.LstdFlags) service := udp.NewService(c, l) service.PointsWriter = s.PointsWriter err := service.Open() if err != nil { return nil, err } - s.services = append(s.services, service) - s.logger.Println("I! started UDP listener for", cluster, rp) + s.services[se] = service + s.logger.Println("I! started UDP listener for", se.cluster, se.rp) return service.Addr(), nil } From d07a8cdcada9c74512f1703e0c5af587430c4d5c Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 17:15:32 -0600 Subject: [PATCH 4/7] CHANGELOG.md --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7e6a7b55..ea234c5f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,9 @@ The corresponding alert states are: - [#740](https://github.com/influxdata/kapacitor/pull/740): Support reset expressions to prevent an alert from being lowered in severity. Thanks @minhdanh! - [#670](https://github.com/influxdata/kapacitor/issues/670): Add ability to supress OK recovery alert events. -- [#](https://github.com/influxdata/kapacitor/pull/): Add API endpoint for refreshing subscriptions. +- [#804](https://github.com/influxdata/kapacitor/pull/804): Add API endpoint for refreshing subscriptions. + Also fixes issue where subs were not relinked if the sub was deleted. + UDP listen ports are closed when a database is dropped. ### Bugfixes From f951c3c7c367eab42ad37aa0bf9d23c4845d79e8 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 17:20:15 -0600 Subject: [PATCH 5/7] add helper method for closing subs --- services/influxdb/service.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/services/influxdb/service.go b/services/influxdb/service.go index c44773df2..6f23afe6f 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -488,7 +488,7 @@ func (s *influxdbCluster) linkSubscriptions() error { } // Something changed, drop the sub and let it get recreated s.dropSub(cli, se.name, se.cluster, se.rp) - s.runningSubs[se] = false + s.closeSub(se) } else { existingSubs[se] = si // Do not revoke tokens that are still in use @@ -591,16 +591,10 @@ func (s *influxdbCluster) linkSubscriptions() error { continue } if _, exists := existingSubs[se]; !exists { - // Close the service and stop tracking it. - if service, ok := s.services[se]; ok { - s.logger.Println("D! deleting service for", se) - err := service.Close() - if err != nil { - s.logger.Printf("E! failed to close service for %v: %s", se, err) - } + err := s.closeSub(se) + if err != nil { + s.logger.Printf("E! failed to close service for %v: %s", se, err) } - delete(s.runningSubs, se) - delete(s.services, se) } } @@ -615,6 +609,17 @@ func (s *influxdbCluster) linkSubscriptions() error { return nil } +// Close the service and stop tracking it. +func (s *influxdbCluster) closeSub(se subEntry) (err error) { + if service, ok := s.services[se]; ok { + s.logger.Println("D! closing service for", se) + err = service.Close() + } + delete(s.runningSubs, se) + delete(s.services, se) + return +} + func (s *influxdbCluster) generateRandomToken() (string, error) { tokenBytes := make([]byte, tokenSize) if _, err := io.ReadFull(rand.Reader, tokenBytes); err != nil { From 19f18084abcc68ef938d6ff666441f817dea65fa Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 10 Aug 2016 17:23:05 -0600 Subject: [PATCH 6/7] add helper method for closing subs, lock linkSubs method --- services/influxdb/service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 6f23afe6f..879e5b1df 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -337,6 +337,8 @@ func (s *influxdbCluster) linkSubscriptions() error { if s.disableSubs { return nil } + s.mu.Lock() + defer s.mu.Unlock() s.logger.Println("D! linking subscriptions for cluster", s.name) b := backoff.NewExponentialBackOff() b.MaxElapsedTime = s.startupTimeout @@ -586,6 +588,7 @@ func (s *influxdbCluster) linkSubscriptions() error { } } } + // Close any subs for dbs that have been dropped for se, running := range s.runningSubs { if !running { continue From 26964def8f8a05db2ddd208d77ccd81eb8814c92 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 11 Aug 2016 09:51:37 -0600 Subject: [PATCH 7/7] fix locking issue --- services/influxdb/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 879e5b1df..3545e8b17 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -276,7 +276,6 @@ type subInfo struct { func (s *influxdbCluster) Open() error { s.mu.Lock() - defer s.mu.Unlock() if !s.disableSubs { if s.subscriptionSyncInterval != 0 { s.subSyncTicker = time.NewTicker(s.subscriptionSyncInterval) @@ -287,6 +286,8 @@ func (s *influxdbCluster) Open() error { }() } } + // Release lock so we can call linkSubscriptions. + s.mu.Unlock() return s.linkSubscriptions() }