Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #21 from IzabellaRaulin/switch-to-lib
Browse files Browse the repository at this point in the history
Switched dependency to snap-plugin-lib-go
  • Loading branch information
IzabellaRaulin authored Jan 19, 2017
2 parents 8677242 + 20e25ac commit 9c38485
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 322 deletions.
71 changes: 41 additions & 30 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package: github.com/intelsdi-x/snap-plugin-collector-influxdb
import:
- package: github.com/intelsdi-x/snap-plugin-utilities
- package: github.com/Sirupsen/logrus
version: ^0.11.0
- package: github.com/intelsdi-x/snap-plugin-lib-go
subpackages:
- config
- package: github.com/intelsdi-x/snap
subpackages:
- control/plugin
- control/plugin/cpolicy
- core
- v1/plugin
testImport:
- package: github.com/smartystreets/goconvey
version: ^1.6.2
subpackages:
- convey
- package: github.com/stretchr/testify
version: ^1.1.4
subpackages:
- mock
174 changes: 106 additions & 68 deletions influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,22 @@ package influxdb
import (
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
log "github.com/Sirupsen/logrus"

"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin"

"github.com/intelsdi-x/snap-plugin-collector-influxdb/influxdb/dtype"
"github.com/intelsdi-x/snap-plugin-collector-influxdb/influxdb/monitor"
"github.com/intelsdi-x/snap-plugin-utilities/config"
)

const (
// Name of plugin
Name = "influxdb"
// Version of plugin
Version = 4
// Type of plugin
Type = plugin.CollectorPluginType
Version = 5

nsVendor = "intel"
nsClass = "influxdb"
Expand All @@ -54,7 +50,7 @@ const (
// prefix in metric namespace
var prefix = []string{nsVendor, nsClass}

// InfluxdbCollector holds data retrived from influxDB system monitoring
// InfluxdbCollector holds data retrieved from influxDB system monitoring
type InfluxdbCollector struct {
data map[string]datum
service monitor.Monitoring
Expand All @@ -67,83 +63,126 @@ type datum struct {
}

// New returns new instance of snap-plugin-collector-influxdb
func New() *InfluxdbCollector {
func New() plugin.Collector {
return &InfluxdbCollector{initialized: false, service: &monitor.Monitor{}, data: map[string]datum{}}
}

// GetConfigPolicy returns a ConfigPolicy
func (ic *InfluxdbCollector) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
return cpolicy.New(), nil
func (ic *InfluxdbCollector) GetConfigPolicy() (plugin.ConfigPolicy, error) {
policy := plugin.NewConfigPolicy()
cfgKey := []string{"intel", "influxdb"}
policy.AddNewStringRule(cfgKey, "host", false, plugin.SetDefaultString("localhost"))
policy.AddNewIntRule(cfgKey, "port", false, plugin.SetDefaultInt(8086))
policy.AddNewStringRule(cfgKey, "user", true)
policy.AddNewStringRule(cfgKey, "password", true)
return *policy, nil
}

// GetMetricTypes returns list of metrics based on influxDB system monitoring
func (ic *InfluxdbCollector) GetMetricTypes(cfg plugin.ConfigType) ([]plugin.MetricType, error) {
mts := []plugin.MetricType{}
func (ic *InfluxdbCollector) GetMetricTypes(cfg plugin.Config) ([]plugin.Metric, error) {
mts := []plugin.Metric{}
if err := ic.init(cfg); err != nil {
return nil, err
}

ic.init(cfg)
ic.getStatistics() // get statistical information about influxDB
ic.getDiagnostics() // get diagnostic information about influxDB
// get InfluxDB internal statistics
if err := ic.getStatistics(); err != nil {
return nil, fmt.Errorf("Cannot get influxdb internal statistics, err=%s", err.Error())
}

// get InfluxDB internal diagnostics info
if err := ic.getDiagnostics(); err != nil {
return nil, fmt.Errorf("Cannot get influxdb diagnostic information, err=%s", err.Error())
}

for ns, dat := range ic.data {
mts = append(mts, plugin.MetricType{Namespace_: core.NewNamespace(splitNamespace(ns)...), Tags_: dat.tags})
for key, dat := range ic.data {
mts = append(mts, plugin.Metric{
Namespace: plugin.NewNamespace(prefix...).AddStaticElements(splitKey(key)...),
Tags: dat.tags,
Version: Version,
})
}

return mts, nil
}

// CollectMetrics collects given metrics
func (ic *InfluxdbCollector) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, error) {
metrics := []plugin.MetricType{}

func (ic *InfluxdbCollector) CollectMetrics(mts []plugin.Metric) ([]plugin.Metric, error) {
if !ic.initialized {
ic.init(mts[0]) // if CollectMetrics() is called, mts has one item at least
ic.getDiagnostics() // get diagnostic information (once only)
// mts has one item at least if CollectMetrics() has been called
if err := ic.init(mts[0].Config); err != nil {
return nil, err
}
// get diagnostic information (once only)
if err := ic.getDiagnostics(); err != nil {
return nil, fmt.Errorf("Cannot get influxdb diagnostic information, err=%s", err.Error())
}
}

ic.getStatistics() // get statistical information

for _, m := range mts {
if dat, ok := ic.data[strings.TrimLeft(m.Namespace().String(), "/")]; ok {
metric := plugin.MetricType{
Namespace_: m.Namespace(),
Data_: dat.value,
Timestamp_: time.Now(),
Tags_: dat.tags,
}
// get statistics
if err := ic.getStatistics(); err != nil {
return nil, fmt.Errorf("Cannot get influxdb internal statistics, err=%s", err.Error())
}

metrics = append(metrics, metric)
for i := range mts {
ns := mts[i].Namespace
if dat, ok := ic.data[reflectKey(ns)]; ok {
mts[i].Data = dat.value
mts[i].Timestamp = time.Now()
mts[i].Tags = dat.tags
} else {
// only log about it
log.WithFields(log.Fields{
"function": "CollectMetrics",
"metric": ns,
}).Error("No data found")
}
}

return metrics, nil
return mts, nil
}

// init initializes InfluxdbCollector instance based on config `cfg`
func (ic *InfluxdbCollector) init(cfg interface{}) {
settings, err := config.GetConfigItems(cfg, "host", "port", "user", "password")
handleErr(err)
// init initializes InfluxdbCollector instance based on plugin config `cfg`
func (ic *InfluxdbCollector) init(cfg plugin.Config) error {
host, err := cfg.GetString("host")
if err != nil {
return fmt.Errorf("Cannot get a hostname from plugin config, err=%s", err.Error())
}

err = ic.service.InitURLs(settings)
handleErr(err)
port, err := cfg.GetInt("port")
if err != nil {
return fmt.Errorf("Cannot get a port from plugin config, err=%s", err.Error())
}

user, err := cfg.GetString("user")
if err != nil {
return fmt.Errorf("Cannot get a username from plugin config, err=%s", err.Error())
}

passwd, err := cfg.GetString("password")
if err != nil {
return fmt.Errorf("Cannot get a password from plugin config, err=%s", err.Error())
}

if err := ic.service.InitURLs(host, port, user, passwd); err != nil {
return err
}

ic.initialized = true
log.WithFields(log.Fields{
"function": "init",
}).Info("Succeeded plugin initialization")
return nil
}

// getDiagnostics executes the command "SHOW DIAGNOSTICS" (indirectly)
func (ic *InfluxdbCollector) getDiagnostics() {
err := ic.getData(typeDiagn)
if err != nil {
fmt.Fprintln(os.Stderr, "Cannot get influxdb diagnostic information, err=", err)
}
func (ic *InfluxdbCollector) getDiagnostics() error {
return ic.getData(typeDiagn)
}

// getStatistics executes the command "SHOW STATS" (indirectly)
func (ic *InfluxdbCollector) getStatistics() {

err := ic.getData(typeStats)
if err != nil {
fmt.Fprintln(os.Stderr, "Cannot get influxdb internal statistics, err=", err)
}
func (ic *InfluxdbCollector) getStatistics() error {
return ic.getData(typeStats)
}

// getData executes a command specified by given `type` of desired data
Expand Down Expand Up @@ -172,7 +211,7 @@ func (ic *InfluxdbCollector) getData(kind int) error {

for seriesName, series := range results {
for columnName := range series.Data {
key := createNamespace(nsType, seriesName, columnName)
key := createKey(nsType, seriesName, columnName)
ic.data[key] = datum{
value: series.Data[columnName],
tags: series.Tags,
Expand All @@ -184,22 +223,21 @@ func (ic *InfluxdbCollector) getData(kind int) error {
return nil
}

// handleErr handles critical error indicated with an abnormal state of plugin
func handleErr(err error) {
if err != nil {
panic(err)
}
// createKey returns a key which identify metric's key which is composed from metric's type (might equal `stats` or `diagn`)
// and component name; all elements are joined to a single string
func createKey(nsType, seriesName, columnName string) string {
seriesName = strings.Replace(seriesName, "/", "_", -1)
columnName = strings.Replace(columnName, "/", "_", -1)
return strings.Join([]string{nsType, seriesName, columnName}, "/")
}

// createNamespace returns namespace composed from prefix, type of metric and metric name's components (the elements are joined to a single string)
func createNamespace(nsType, seriesName, columnName string) string {
ns := append(prefix, nsType)
ns = append(ns, seriesName)
ns = append(ns, columnName)
return strings.Join(ns, "/")
// reflectKey returns corresponding metric's key based on metric's namespace
func reflectKey(ns plugin.Namespace) string {
// skip metric's prefix and join the rest of elements
return strings.Join(ns.Strings()[len(prefix):], "/")
}

//splitNamespace splits namespace (repesented by single string `s`) and returns a slice of the substrings between slash separator
func splitNamespace(ns string) []string {
return strings.Split(ns, "/")
// splitKey returns a slice of the substrings between slash separator
func splitKey(key string) []string {
return strings.Split(key, "/")
}
Loading

0 comments on commit 9c38485

Please sign in to comment.