From c10a0828e6c3da584f9cea15b39d6b7f9ce3578f Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 27 Jul 2016 15:20:54 -0600 Subject: [PATCH 1/2] add subscription token based auth --- auth/auth.go | 3 +++ server/server.go | 7 ++--- services/httpd/handler.go | 14 ++++++++++ services/influxdb/service.go | 52 +++++++++++++++++++++++++++++++++--- services/noauth/service.go | 14 ++++++++++ 5 files changed, 83 insertions(+), 7 deletions(-) diff --git a/auth/auth.go b/auth/auth.go index 3cf2a6440..05de6303f 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -10,6 +10,9 @@ import ( type Interface interface { Authenticate(username, password string) (User, error) User(username string) (User, error) + SubscriptionUser(token string) (User, error) + GrantSubscriptionAccess(token, db, rp string) error + RevokeSubscriptionAccess(token string) error } // ErrAuthenticate is returned when authentication fails. diff --git a/server/server.go b/server/server.go index aacd8deb2..0af9af19a 100644 --- a/server/server.go +++ b/server/server.go @@ -142,13 +142,13 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server, s.appendDeadmanService() s.appendSMTPService() s.InitHTTPDService() + s.appendStorageService() + s.appendAuthService() if err := s.appendInfluxDBService(); err != nil { return nil, errors.Wrap(err, "influxdb service") } - s.appendStorageService() s.appendTaskStoreService() s.appendReplayService() - s.appendAuthService() // Append Alert integration services s.appendOpsGenieService() @@ -215,9 +215,10 @@ func (s *Server) appendInfluxDBService() error { if err != nil { return errors.Wrap(err, "failed to get http port") } - srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, l) + srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, s.config.HTTP.AuthEnabled, l) srv.PointsWriter = s.TaskMaster srv.LogService = s.LogService + srv.AuthService = s.AuthService s.InfluxDBService = srv s.TaskMaster.InfluxDBService = srv diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 2d239af5e..90a9810ea 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -45,6 +45,7 @@ type AuthenticationMethod int const ( UserAuthentication AuthenticationMethod = iota BearerAuthentication + SubscriptionAuthentication ) type AuthorizationHandler func(http.ResponseWriter, *http.Request, auth.User) @@ -612,6 +613,11 @@ func authenticate(inner AuthorizationHandler, h *Handler, requireAuthentication HttpError(w, err.Error(), false, http.StatusUnauthorized) return } + case SubscriptionAuthentication: + if user, err = h.AuthService.SubscriptionUser(creds.Token); err != nil { + HttpError(w, err.Error(), false, http.StatusUnauthorized) + return + } default: HttpError(w, "unsupported authentication", false, http.StatusUnauthorized) } @@ -684,6 +690,7 @@ type credentials struct { // As params: http://127.0.0.1/query?u=username&p=password // As basic auth: http://username:password@127.0.0.1 // As Bearer token in Authorization header: Bearer +// As simple acccess token in InfluxDB-Access-Token: func parseCredentials(r *http.Request) (credentials, error) { q := r.URL.Query() @@ -706,6 +713,13 @@ func parseCredentials(r *http.Request) (credentials, error) { Password: p, }, nil } + } else if s := r.Header.Get("InfluxDB-Access-Token"); s != "" { + // Check for the HTTP InfluxDB-Access-Token header. + return credentials{ + Method: SubscriptionAuthentication, + Token: s, + }, nil + } // Check for username and password in URL params. diff --git a/services/influxdb/service.go b/services/influxdb/service.go index cd41a4cc9..08e7188a1 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -2,10 +2,12 @@ package influxdb import ( "bytes" + "crypto/rand" "crypto/tls" "crypto/x509" - "errors" + "encoding/base64" "fmt" + "io" "io/ioutil" "log" "net" @@ -21,12 +23,16 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/services/udp" + "github.com/pkg/errors" ) const ( // Legacy name given to all subscriptions. legacySubName = "kapacitor" subNamePrefix = "kapacitor-" + + // Size in bytes of a token for subscription authentication + tokenSize = 64 ) // Handles requests to write or read from an InfluxDB cluster @@ -40,10 +46,15 @@ type Service struct { LogService interface { NewLogger(string, int) *log.Logger } + + AuthService interface { + GrantSubscriptionAccess(token, db, rp string) error + RevokeSubscriptionAccess(token string) error + } logger *log.Logger } -func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, l *log.Logger) *Service { +func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, useTokens bool, l *log.Logger) *Service { clusterID := kapacitor.ClusterIDVar.StringValue() subName := subNamePrefix + clusterID clusters := make(map[string]*influxdb, len(configs)) @@ -100,6 +111,7 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string disableSubs: c.DisableSubscriptions, protocol: c.SubscriptionProtocol, runningSubs: runningSubs, + useTokens: useTokens, } if defaultInfluxDB == i { defaultInfluxDBName = c.Name @@ -116,6 +128,7 @@ func (s *Service) Open() error { for _, cluster := range s.clusters { cluster.PointsWriter = s.PointsWriter cluster.LogService = s.LogService + cluster.AuthService = s.AuthService err := cluster.Open() if err != nil { return err @@ -164,6 +177,7 @@ type influxdb struct { subscriptionSyncInterval time.Duration disableSubs bool runningSubs map[subEntry]bool + useTokens bool clusterID string subName string @@ -175,6 +189,10 @@ type influxdb struct { LogService interface { NewLogger(string, int) *log.Logger } + AuthService interface { + GrantSubscriptionAccess(token, db, rp string) error + RevokeSubscriptionAccess(token string) error + } services []interface { Open() error @@ -388,7 +406,12 @@ func (s *influxdb) linkSubscriptions() error { // Check if the hostname, port or protocol have changed if host != s.hostname || u.Scheme != s.protocol || - ((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) { + ((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) || + (s.useTokens && u.User == nil) { + // Remove access for changing subscriptions. + if u.User != nil { + s.AuthService.RevokeSubscriptionAccess(u.User.Username()) + } // Something changed, drop the sub and let it get recreated s.dropSub(cli, se.name, se.cluster, se.rp) } else { @@ -434,7 +457,20 @@ func (s *influxdb) linkSubscriptions() error { var destination string switch s.protocol { case "http", "https": - destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort) + if s.useTokens { + // Generate token + token, err := s.generateRandomToken() + if err != nil { + return errors.Wrap(err, "generating token") + } + err = s.AuthService.GrantSubscriptionAccess(token, se.cluster, se.rp) + if err != nil { + return err + } + destination = fmt.Sprintf("%s://%s@%s:%d", s.protocol, token, s.hostname, s.httpPort) + } else { + destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort) + } case "udp": addr, err := s.startUDPListener(se.cluster, se.rp, "0") if err != nil { @@ -457,6 +493,14 @@ func (s *influxdb) linkSubscriptions() error { return nil } +func (s *influxdb) generateRandomToken() (string, error) { + tokenBytes := make([]byte, tokenSize) + if _, err := io.ReadFull(rand.Reader, tokenBytes); err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(tokenBytes), nil +} + func (s *influxdb) createSub(cli client.Client, name, cluster, rp, mode string, destinations []string) (err error) { var buf bytes.Buffer for i, dst := range destinations { diff --git a/services/noauth/service.go b/services/noauth/service.go index b41e39156..cbe69124f 100644 --- a/services/noauth/service.go +++ b/services/noauth/service.go @@ -37,3 +37,17 @@ func (s *Service) User(username string) (auth.User, error) { s.logger.Println("W! using noauth auth backend. Faked authentication for user", username) return auth.NewUser(username, nil, true, nil), nil } + +// Return a user will all privileges. +func (s *Service) SubscriptionUser(token string) (auth.User, error) { + s.logger.Println("W! using noauth auth backend. Faked authentication for subscription user token") + return auth.NewUser("subscription-user", nil, true, nil), nil +} + +func (s *Service) GrantSubscriptionAccess(token, db, rp string) error { + return nil +} + +func (s *Service) RevokeSubscriptionAccess(token string) error { + return nil +} From 085d0fae24b386959fce2c312d07c5746cd5ecd8 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 27 Jul 2016 16:15:00 -0600 Subject: [PATCH 2/2] CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e146644c1..c3f442ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ stream - [#624](https://github.com/influxdata/kapacitor/issue/624): BREAKING: Add ability to GroupBy fields. First use EvalNode to create a tag from a field and then group by the new tag. Also allows for grouping by measurement. The breaking change is that the group ID format has changed to allow for the measurement name. +- [#759](https://github.com/influxdata/kapacitor/pull/759): Add mechanism for token based subscription auth. ### Bugfixes