From 7d9d814066d61cb3a5342881780cfb33885b7092 Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Tue, 7 Nov 2023 12:33:04 -0800 Subject: [PATCH 1/4] [statsdreceiver] support simple tags dogstatsd supports two types of tags on metrics: simple and dimensional tags[^1]. the former is just a key, the latter is a key and a value. with the assumption that many users of the statsdreceiver are enabling ingest of dogstatsd metrics, this makes the statsd parser more optimistic, so it can handle tags w/o a value. when this happens, we set an attribute that has a zero value. so far as i know, this is allowed in the opentelemetry spec. the decision of how to handle attributes w/ zero values is best left to configuration w/in the pipeline itself, as different users may have different opinions or approaches that work best with their systems. [^1]: https://www.datadoghq.com/blog/the-power-of-tagged-metrics/#whats-a-metric-tag --- .../internal/protocol/statsd_parser.go | 15 +++++++++++---- .../internal/protocol/statsd_parser_test.go | 18 +++++++++++++++--- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index ab624dd93df4..dea891e769ef 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -410,11 +410,18 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err for _, tagSet := range tagSets { tagParts := strings.SplitN(tagSet, ":", 2) - if len(tagParts) != 2 { - return result, fmt.Errorf("invalid tag format: %s", tagParts) - } k := tagParts[0] - v := tagParts[1] + if k == "" { + return result, fmt.Errorf("invalid tag format: %q", tagSet) + } + + // support both simple tags (w/o value) and dimension tags (w/ value). + // dogstatsd notably allows simple tags. + var v string + if len(tagParts) == 2 { + v = tagParts[1] + } + kvs = append(kvs, attribute.String(k, v)) } default: diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index 2228a29567c4..5313c0b8d9c3 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -53,8 +53,8 @@ func Test_ParseMessageToMetric(t *testing.T) { }, { name: "invalid tag format", - input: "test.metric:42|c|#key1", - err: errors.New("invalid tag format: [key1]"), + input: "test.metric:42|c|#:val1", + err: errors.New("invalid tag format: \":val1\""), }, { name: "unrecognized message part", @@ -90,7 +90,7 @@ func Test_ParseMessageToMetric(t *testing.T) { err: errors.New("unsupported metric type: unhandled_type"), }, { - name: "counter metric with sample rate and tag", + name: "counter metric with sample rate and (dimensional) tag", input: "test.metric:42|c|@0.1|#key:value", wantMetric: testStatsDMetric( "test.metric", @@ -101,6 +101,18 @@ func Test_ParseMessageToMetric(t *testing.T) { []string{"key"}, []string{"value"}), }, + { + name: "counter metric with sample rate and (simple) tag", + input: "test.metric:42|c|@0.1|#key", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{""}), + }, { name: "counter metric with sample rate(not divisible) and tag", input: "test.metric:42|c|@0.8|#key:value", From befda567cd580cc54451de764e8fed0b2c0782e8 Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Tue, 7 Nov 2023 13:20:37 -0800 Subject: [PATCH 2/4] put behind config --- receiver/statsdreceiver/config.go | 1 + .../internal/protocol/parser.go | 2 +- .../internal/protocol/statsd_parser.go | 12 ++- .../internal/protocol/statsd_parser_test.go | 97 ++++++++++++++----- receiver/statsdreceiver/receiver.go | 1 + 5 files changed, 84 insertions(+), 29 deletions(-) diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index 76dd700b6374..4e9a2d5eb75d 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -19,6 +19,7 @@ type Config struct { NetAddr confignet.NetAddr `mapstructure:",squash"` AggregationInterval time.Duration `mapstructure:"aggregation_interval"` EnableMetricType bool `mapstructure:"enable_metric_type"` + EnableSimpleTags bool `mapstructure:"enable_simple_tags"` IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"` } diff --git a/receiver/statsdreceiver/internal/protocol/parser.go b/receiver/statsdreceiver/internal/protocol/parser.go index abfea560ce24..bc72f7e3f160 100644 --- a/receiver/statsdreceiver/internal/protocol/parser.go +++ b/receiver/statsdreceiver/internal/protocol/parser.go @@ -12,7 +12,7 @@ import ( // Parser is something that can map input StatsD strings to OTLP Metric representations. type Parser interface { - Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error + Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error GetMetrics() []BatchMetrics Aggregate(line string, addr net.Addr) error } diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index dea891e769ef..474fe03945ba 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -79,6 +79,7 @@ var defaultObserverCategory = ObserverCategory{ type StatsDParser struct { instrumentsByAddress map[netAddr]*instruments enableMetricType bool + enableSimpleTags bool isMonotonicCounter bool timerEvents ObserverCategory histogramEvents ObserverCategory @@ -156,12 +157,13 @@ func (p *StatsDParser) resetState(when time.Time) { p.instrumentsByAddress = make(map[netAddr]*instruments) } -func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { +func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { p.resetState(timeNowFunc()) p.histogramEvents = defaultObserverCategory p.timerEvents = defaultObserverCategory p.enableMetricType = enableMetricType + p.enableSimpleTags = enableSimpleTags p.isMonotonicCounter = isMonotonicCounter // Note: validation occurs in ("../".Config).validate() for _, eachMap := range sendTimerHistogram { @@ -270,7 +272,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory { // Aggregate for each metric line. func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { - parsedMetric, err := parseMessageToMetric(line, p.enableMetricType) + parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags) if err != nil { return err } @@ -349,7 +351,7 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { return nil } -func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) { +func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) { result := statsDMetric{} parts := strings.Split(line, "|") @@ -422,6 +424,10 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err v = tagParts[1] } + if v == "" && !enableSimpleTags { + return result, fmt.Errorf("invalid tag format: %q", tagSet) + } + kvs = append(kvs, attribute.String(k, v)) } default: diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index 5313c0b8d9c3..d91ee7c3ed27 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -90,7 +90,7 @@ func Test_ParseMessageToMetric(t *testing.T) { err: errors.New("unsupported metric type: unhandled_type"), }, { - name: "counter metric with sample rate and (dimensional) tag", + name: "counter metric with sample rate and tag", input: "test.metric:42|c|@0.1|#key:value", wantMetric: testStatsDMetric( "test.metric", @@ -101,18 +101,6 @@ func Test_ParseMessageToMetric(t *testing.T) { []string{"key"}, []string{"value"}), }, - { - name: "counter metric with sample rate and (simple) tag", - input: "test.metric:42|c|@0.1|#key", - wantMetric: testStatsDMetric( - "test.metric", - 42, - false, - "c", - 0.1, - []string{"key"}, - []string{""}), - }, { name: "counter metric with sample rate(not divisible) and tag", input: "test.metric:42|c|@0.8|#key:value", @@ -247,7 +235,7 @@ func Test_ParseMessageToMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, false) + got, err := parseMessageToMetric(tt.input, false, false) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -445,7 +433,66 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, true) + got, err := parseMessageToMetric(tt.input, true, false) + + if tt.err != nil { + assert.Equal(t, tt.err, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantMetric, got) + } + }) + } +} + +func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) { + tests := []struct { + name string + input string + wantMetric statsDMetric + err error + }{ + { + name: "counter metric with sample rate and (dimensional) tag", + input: "test.metric:42|c|@0.1|#key:value", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{"value"}), + }, + { + name: "counter metric with sample rate and (simple) tag", + input: "test.metric:42|c|@0.1|#key", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{""}), + }, + { + name: "counter metric with sample rate and two (simple) tags", + input: "test.metric:42|c|@0.1|#key,key2", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key", "key2"}, + []string{"", ""}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseMessageToMetric(tt.input, false, true) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -689,7 +736,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -758,7 +805,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) for i, addr := range tt.addresses { for _, line := range tt.input[i] { @@ -826,7 +873,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -876,7 +923,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -998,7 +1045,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) for _, line := range tt.input { @@ -1015,7 +1062,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { func TestStatsDParser_Initialize(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) teststatsdDMetricdescription := statsDMetricDescription{ name: "test", metricType: "g", @@ -1034,7 +1081,7 @@ func TestStatsDParser_Initialize(t *testing.T) { func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) instrument := newInstruments(nil) instrument.gauges[testDescription("statsdTestMetric1", "g", []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric(testStatsDMetric("testGauge1", 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0)) @@ -1107,7 +1154,7 @@ func TestStatsDParser_Mappings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tc.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tc.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") assert.NoError(t, p.Aggregate("H:10|h", addr)) @@ -1141,7 +1188,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) { } testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") - err := p.Initialize(true, false, + err := p.Initialize(true, false, false, []TimerHistogramMapping{ {StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "histogram"}, @@ -1411,7 +1458,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tt.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tt.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") for _, line := range tt.input { err = p.Aggregate(line, addr) diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 74d4354e4a62..3034fc561bbc 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -91,6 +91,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { ticker := time.NewTicker(r.config.AggregationInterval) err = r.parser.Initialize( r.config.EnableMetricType, + r.config.EnableSimpleTags, r.config.IsMonotonicCounter, r.config.TimerHistogramMapping, ) From 52b1333911a6d17bf8232a078e5c67b541a0c88c Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Wed, 8 Nov 2023 14:31:02 -0800 Subject: [PATCH 3/4] add changelog entry --- .chloggen/statsd-support-simple-tags.yaml | 28 +++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100755 .chloggen/statsd-support-simple-tags.yaml diff --git a/.chloggen/statsd-support-simple-tags.yaml b/.chloggen/statsd-support-simple-tags.yaml new file mode 100755 index 000000000000..c753eccf6493 --- /dev/null +++ b/.chloggen/statsd-support-simple-tags.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "statsdreceiver" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for 'simple' tags that do not have a defined value, to accommodate DogStatsD metrics that may utilize these. + + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29012] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: "This functionality is gated behind a new `enable_simple_tags` config boolean, as it is not part of the StatsD spec." + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ['user'] From e24f785dd1f94d4215e666df2f1550afa9c225b5 Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Thu, 9 Nov 2023 08:57:00 -0800 Subject: [PATCH 4/4] update docs --- receiver/statsdreceiver/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index db55b5aa773c..50124c748450 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -33,6 +33,8 @@ The Following settings are optional: - `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label. +- `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging. + - `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic. - `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.