Skip to content

Commit

Permalink
Add k8s scraper service
Browse files Browse the repository at this point in the history
  • Loading branch information
goller committed May 3, 2017
1 parent 5406b07 commit ab83de7
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 40 deletions.
13 changes: 7 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
s.appendReplayService()

// Append third-party integrations
if err := s.appendK8sService(); err != nil {
return nil, errors.Wrap(err, "kubernetes service")
}

// Append extra input services
s.appendCollectdService()
s.appendUDPServices()
Expand All @@ -254,6 +250,11 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,

// Append Scraper and discovery services
s.appendScraperService()

if err := s.appendK8sService(s.ScraperService); err != nil {
return nil, errors.Wrap(err, "kubernetes service")
}

s.appendAzureService(s.ScraperService)
s.appendConsulService(s.ScraperService)
s.appendDNSService(s.ScraperService)
Expand Down Expand Up @@ -415,10 +416,10 @@ func (s *Server) appendReplayService() {
s.AppendService("replay", srv)
}

func (s *Server) appendK8sService() error {
func (s *Server) appendK8sService(r scraper.Registry) error {
c := s.config.Kubernetes
l := s.LogService.NewLogger("[kubernetes] ", log.LstdFlags)
srv, err := k8s.NewService(c, l)
srv, err := k8s.NewService(c, r, l)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6154,6 +6154,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"in-cluster": false,
"namespace": "",
"token": false,
"resource": "",
},
Redacted: []string{
"token",
Expand All @@ -6170,6 +6171,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"in-cluster": false,
"namespace": "",
"token": false,
"resource": "",
},
Redacted: []string{
"token",
Expand All @@ -6194,6 +6196,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"in-cluster": false,
"namespace": "",
"token": true,
"resource": "",
},
Redacted: []string{
"token",
Expand All @@ -6210,6 +6213,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"in-cluster": false,
"namespace": "",
"token": true,
"resource": "",
},
Redacted: []string{
"token",
Expand Down
60 changes: 60 additions & 0 deletions services/k8s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/url"

"github.com/influxdata/kapacitor/listmap"
"github.com/influxdata/kapacitor/services/k8s/client"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/config"
)

type Config struct {
Expand All @@ -18,6 +20,7 @@ type Config struct {
Token string `toml:"token" override:"token,redact"`
CAPath string `toml:"ca-path" override:"ca-path"`
Namespace string `toml:"namespace" override:"namespace"`
Resource string `toml:"resource" override:"resource"`
}

func NewConfig() Config {
Expand All @@ -43,6 +46,21 @@ func (c Config) Validate() error {
}
} else if len(c.APIServers) == 0 {
return errors.New("no api-servers specified, must provide at least one server URL")
} else {
for _, s := range c.APIServers {
_, err := url.Parse(s)
if err != nil {
return err
}
}
}
if c.Resource != "" {
switch c.Resource {
case "node", "pod", "service", "endpoint":
default:
return errors.New("Resource must be one of node, pod, service, or endpoints")
}

}
return nil
}
Expand Down Expand Up @@ -92,3 +110,45 @@ func (cs Configs) Validate() error {
}
return nil
}

// Prom writes the prometheus configuration for discoverer into ScrapeConfig
func (c Config) Prom(conf *config.ScrapeConfig) {
if len(c.APIServers) == 0 {
conf.ServiceDiscoveryConfig.KubernetesSDConfigs = []*config.KubernetesSDConfig{
&config.KubernetesSDConfig{
Role: config.KubernetesRole(c.Resource),
BearerToken: c.Token,
TLSConfig: config.TLSConfig{
CAFile: c.CAPath,
},
},
}
return
}

sds := make([]*config.KubernetesSDConfig, len(c.APIServers))
for i, srv := range c.APIServers {
url, _ := url.Parse(srv)
sds[i] = &config.KubernetesSDConfig{
APIServer: config.URL{
URL: url,
},
Role: config.KubernetesRole(c.Resource),
BearerToken: c.Token,
TLSConfig: config.TLSConfig{
CAFile: c.CAPath,
},
}
}
conf.ServiceDiscoveryConfig.KubernetesSDConfigs = sds
}

// Service return discoverer type
func (c Config) Service() string {
return "kubernetes"
}

// ServiceID returns the discoverers name
func (c Config) ServiceID() string {
return c.ID
}
38 changes: 35 additions & 3 deletions services/k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ import (
"sync"

"github.com/influxdata/kapacitor/services/k8s/client"
"github.com/influxdata/kapacitor/services/scraper"
)

// Service is the kubernetes discovery and autoscale service
type Service struct {
mu sync.Mutex
configs []Config
clusters map[string]*Cluster
registry scraper.Registry
logger *log.Logger
}

func NewService(c []Config, l *log.Logger) (*Service, error) {
// NewService creates a new unopened k8s service
func NewService(c []Config, r scraper.Registry, l *log.Logger) (*Service, error) {
l.Println("D! k8s configs", c)

clusters := make(map[string]*Cluster, len(c))
Expand All @@ -28,10 +33,13 @@ func NewService(c []Config, l *log.Logger) (*Service, error) {

return &Service{
clusters: clusters,
configs: c,
logger: l,
registry: r,
}, nil
}

// Open starts the kubernetes service
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -40,26 +48,47 @@ func (s *Service) Open() error {
return err
}
}
return nil
s.register()
return s.registry.Commit()
}

func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
for _, c := range s.clusters {
c.Close()
}
return nil
s.deregister()
return s.registry.Commit()
}

func (s *Service) deregister() {
// Remove all the configurations in the registry
for _, d := range s.configs {
s.registry.RemoveDiscoverer(&d)
}
}

func (s *Service) register() {
// Add all configurations to registry
for _, d := range s.configs {
if d.Enabled {
s.registry.AddDiscoverer(&d)
}
}
}

func (s *Service) Update(newConfigs []interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
configs := make([]Config, len(newConfigs))
existingClusters := make(map[string]bool, len(newConfigs))
for i := range newConfigs {
c, ok := newConfigs[i].(Config)
if !ok {
return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfigs[i])
}
configs[i] = c
cluster, ok := s.clusters[c.ID]
if !ok {
var err error
Expand All @@ -86,6 +115,9 @@ func (s *Service) Update(newConfigs []interface{}) error {
delete(s.clusters, id)
}
}
s.deregister()
s.configs = configs
s.register()
return nil
}

Expand Down
31 changes: 0 additions & 31 deletions services/scraper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,34 +131,3 @@ func decodeJobName(job string) (string, string, string, error) {
}
return split[0], split[1], split[2], nil
}

type KubernetesRole string

const (
KubernetesRoleNode = "node"
KubernetesRolePod = "pod"
KubernetesRoleService = "service"
KubernetesRoleEndpoint = "endpoints"
)

// Kubernetes is Kubernetes service discovery configuration
type Kubernetes struct {
APIServer url.URL `toml:"api_server" override:"api_server"`
Role KubernetesRole `toml:"role" override:"role"`
}

func (k Kubernetes) Prom(c *config.ScrapeConfig) {
// TODO: auth token tls
c.ServiceDiscoveryConfig.KubernetesSDConfigs = []*config.KubernetesSDConfig{
&config.KubernetesSDConfig{
APIServer: config.URL{
URL: &k.APIServer,
},
Role: config.KubernetesRole(k.Role),
},
}
}

func NewKubernetes() Kubernetes {
return Kubernetes{}
}

0 comments on commit ab83de7

Please sign in to comment.