Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus parser fix, parse headers properly #1461

Merged
merged 1 commit into from
Jul 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## v1.0
## v1.0 [unreleased]

### Release Notes

Expand Down Expand Up @@ -42,6 +42,7 @@ should now look like:
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.

## v1.0 beta 2 [2016-06-21]

Expand Down
117 changes: 42 additions & 75 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"math"
"mime"
"net/http"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -19,17 +20,9 @@ import (
"github.com/prometheus/common/expfmt"
)

// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
Expand All @@ -38,96 +31,70 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

// Get format
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)

if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
metricFamily := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil {
if err == io.EOF {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[metricFamily.GetName()] = metricFamily
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
}

// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())

} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
// standard metric
fields = getNameAndValue(m)
t = time.Now()
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
return metrics, err
}

// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}

return metrics[0], nil
}

/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
return metrics, err
}
*/

// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
Expand Down
22 changes: 6 additions & 16 deletions plugins/inputs/prometheus/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prometheus

import (
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}

// Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge))
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
Expand All @@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) {
}, metrics[0].Tags())

// Counter value
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"})
metrics, err = parser.Parse([]byte(validUniqueCounter))
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
Expand All @@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{}, metrics[0].Tags())

// Summary data
//parser.SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary))
//SetDefaultTags(map[string]string{})
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
Expand All @@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())

// histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram))
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
Expand All @@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) {
metrics[0].Tags())

}

func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)

}
19 changes: 5 additions & 14 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"
)

const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`

type Prometheus struct {
Urls []string

Expand Down Expand Up @@ -86,7 +88,7 @@ var client = &http.Client{
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now()
var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response

Expand Down Expand Up @@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
return fmt.Errorf("error reading body: %s", err)
}

// Headers
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}

// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}

metrics, err := promparser.Parse(body)
metrics, err := Parse(body, resp.Header)
if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s",
return fmt.Errorf("error reading metrics for %s: %s",
url, err)
}
// Add (or not) collected metrics
Expand Down