Skip to content

Commit

Permalink
[Metricbeat] [Statsd] Add support for Graphite series 1.1.0+ tags (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
tehbooom authored May 30, 2024
1 parent f7888f3 commit 61622ac
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Metricbeat*

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]

*Osquerybeat*

Expand Down
17 changes: 17 additions & 0 deletions metricbeat/docs/modules/statsd.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ The module supports the following types of metrics:

*Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0).

[float]
=== Supported tag extensions

Example of tag styles supported by the `statsd` module:

https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD]

`<metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>`

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB]

`<metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate`

https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x]

`<metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate`

[float]
=== Module-specific configuration notes

Expand Down
17 changes: 17 additions & 0 deletions x-pack/metricbeat/module/statsd/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ The module supports the following types of metrics:

*Set (s)*:: Measurement which counts unique occurrences until flushed (value set to 0).

[float]
=== Supported tag extensions

Example of tag styles supported by the `statsd` module:

https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD]

`<metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>`

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB]

`<metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate`

https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite_1.1.x]

`<metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate`

[float]
=== Module-specific configuration notes

Expand Down
21 changes: 19 additions & 2 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ type statsdMetric struct {

func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
var tagSplit [][]byte

if bytes.Contains(rawTags, []byte(",")) {
tagSplit = bytes.Split(rawTags, []byte(","))
} else {
tagSplit = bytes.Split(rawTags, []byte(";"))
}

for _, kv := range tagSplit {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
Expand All @@ -47,6 +55,7 @@ func splitTags(rawTags, kvSep []byte) map[string]string {
func parseSingle(b []byte) (statsdMetric, error) {
// format: <metric name>:<value>|<type>[|@samplerate][|#<k>:<v>,<k>:<v>]
// alternative: <metric name>[,<k>=<v>,<k>=<v>]:<value>|<type>[|@samplerate]
// alternative: <metric name>[;<k>=<v>;<k>=<v>]:<value>|<type>[|@samplerate]
s := statsdMetric{}

parts := bytes.SplitN(b, []byte("|"), 4)
Expand All @@ -73,7 +82,15 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, errInvalidPacket
}

nameTagsSplit := bytes.SplitN(nameSplit[0], []byte(","), 2)
// Metric tags could be separated by `,` or `;`
// We split here based on the separator
var nameTagsSplit [][]byte
if bytes.Contains(nameSplit[0], []byte(",")) {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(","), 2)
} else {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(";"), 2)
}

s.name = string(nameTagsSplit[0])
if len(nameTagsSplit) > 1 {
s.tags = splitTags(nameTagsSplit[1], []byte("="))
Expand Down
41 changes: 38 additions & 3 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,20 @@ func TestParseMetrics(t *testing.T) {
},
},
},
{ // Graphite 1.1.x Tags
input: "tags3;k1=v1;k2=v2:1|c",
expected: []statsdMetric{
{
name: "tags3",
metricType: "c",
value: "1",
tags: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
},
},
/// errors
{
input: "meter1-1.4|m",
Expand Down Expand Up @@ -1064,6 +1078,17 @@ func TestParseSingle(t *testing.T) {
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: counter with Graphite tags": {
input: "tags2;k1=v1;k2=v2:1|c",
err: nil,
want: statsdMetric{
name: "tags2",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
Expand Down Expand Up @@ -1124,13 +1149,14 @@ func TestTagsGrouping(t *testing.T) {

"metric3:3|c|@0.1|#k1:v2,k2:v3",
"metric4:4|ms|#k1:v2,k2:v3",
"metric5;k1=v3;k2=v4:5|c",
}

err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 4)
assert.Len(t, events, 5)

actualTags := []mapstr.M{}
for _, e := range events {
Expand Down Expand Up @@ -1162,6 +1188,12 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v3",
"k2": "v4",
},
},
}

assert.ElementsMatch(t, expectedTags, actualTags)
Expand All @@ -1173,14 +1205,15 @@ func TestTagsCleanup(t *testing.T) {
"metric1:1|g|#k1:v1,k2:v2",

"metric2:3|ms|#k1:v2,k2:v3",
"metric3;k1=v3;k2=v4:5|c",
}
err := process(testData, ms)
require.NoError(t, err)

time.Sleep(1000 * time.Millisecond)

// they will be reported at least once
assert.Len(t, ms.getEvents(), 2)
assert.Len(t, ms.getEvents(), 3)

testData = []string{
"metric1:+2|g|#k1:v1,k2:v2",
Expand Down Expand Up @@ -1229,12 +1262,13 @@ func TestData(t *testing.T) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}
err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 10)
assert.Len(t, events, 11)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down Expand Up @@ -1379,6 +1413,7 @@ func BenchmarkIngest(b *testing.B) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}

events := make([]*testUDPEvent, len(tests))
Expand Down

0 comments on commit 61622ac

Please sign in to comment.