From e4af487e55cac0a70f66908f92ebdea6edb170cb Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 18 Sep 2018 16:58:34 +0200 Subject: [PATCH 01/14] Allow checks to send metrics with no hostname - move the default hostname responsibility from the aggregator / sampler to the sender - add a DisableDefaultHostname method to the Sender interface - make dogstatsd handle hostname for events and service checks --- pkg/aggregator/aggregator.go | 16 +-- pkg/aggregator/aggregator_test.go | 6 +- pkg/aggregator/check_sampler.go | 9 +- pkg/aggregator/check_sampler_test.go | 38 +----- pkg/aggregator/mocksender/mocked_methods.go | 5 + pkg/aggregator/mocksender/mocksender.go | 2 +- pkg/aggregator/sender.go | 62 +++++++-- pkg/aggregator/sender_test.go | 80 +++++++++++- pkg/dogstatsd/parser.go | 6 +- pkg/dogstatsd/parser_test.go | 136 ++++++++++++-------- pkg/dogstatsd/server.go | 4 +- 11 files changed, 235 insertions(+), 129 deletions(-) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index e8d1af413cc005..1839817e023e2d 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -238,7 +238,7 @@ func (agg *BufferedAggregator) registerSender(id check.ID) error { if _, ok := agg.checkSamplers[id]; ok { return fmt.Errorf("Sender with ID '%s' has already been registered, will use existing sampler", id) } - agg.checkSamplers[id] = newCheckSampler(agg.hostname) + agg.checkSamplers[id] = newCheckSampler() return nil } @@ -266,9 +266,6 @@ func (agg *BufferedAggregator) handleSenderSample(ss senderMetricSample) { // addServiceCheck adds the service check to the slice of current service checks func (agg *BufferedAggregator) addServiceCheck(sc metrics.ServiceCheck) { - if sc.Host == "" { - sc.Host = agg.hostname - } if sc.Ts == 0 { sc.Ts = time.Now().Unix() } @@ -279,9 +276,6 @@ func (agg *BufferedAggregator) addServiceCheck(sc metrics.ServiceCheck) { // addEvent adds the event to the slice of current events func (agg *BufferedAggregator) addEvent(e metrics.Event) { - if e.Host == "" { - e.Host = agg.hostname - } if e.Ts == 0 { e.Ts = time.Now().Unix() } @@ -375,6 +369,7 @@ func (agg *BufferedAggregator) flushServiceChecks() { agg.addServiceCheck(metrics.ServiceCheck{ CheckName: "datadog.agent.up", Status: metrics.ServiceCheckOK, + Host: agg.hostname, }) serviceChecks := agg.GetServiceChecks() @@ -504,12 +499,7 @@ func (agg *BufferedAggregator) run() { case h := <-agg.hostnameUpdate: aggregatorHostnameUpdate.Add(1) agg.hostname = h - agg.mu.Lock() - for _, checkSampler := range agg.checkSamplers { - checkSampler.defaultHostname = h - } - agg.sampler.defaultHostname = h - agg.mu.Unlock() + changeAllSendersDefaultHostname(h) agg.hostnameUpdateDone <- struct{}{} } } diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index b71a18f9c14dad..5dd8812d26430a 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -74,7 +74,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) { }) require.Len(t, agg.serviceChecks, 2) - assert.Equal(t, "resolved-hostname", agg.serviceChecks[0].Host) + assert.Equal(t, "", agg.serviceChecks[0].Host) assert.Equal(t, []string{"bar", "foo"}, agg.serviceChecks[0].Tags) assert.NotZero(t, agg.serviceChecks[0].Ts) // should be set to the current time, let's just check that it's not 0 assert.Equal(t, "my-hostname", agg.serviceChecks[1].Host) @@ -105,10 +105,10 @@ func TestAddEventDefaultValues(t *testing.T) { }) require.Len(t, agg.events, 2) - // Default values are set on Host and Ts only + // Default values are set on Ts event1 := agg.events[0] assert.Equal(t, "An event occurred", event1.Title) - assert.Equal(t, "resolved-hostname", event1.Host) + assert.Equal(t, "", event1.Host) assert.NotZero(t, event1.Ts) // should be set to the current time, let's just check that it's not 0 assert.Zero(t, event1.Priority) assert.Zero(t, event1.Tags) diff --git a/pkg/aggregator/check_sampler.go b/pkg/aggregator/check_sampler.go index 6d6da2bd0306f6..39ce305a26b98c 100644 --- a/pkg/aggregator/check_sampler.go +++ b/pkg/aggregator/check_sampler.go @@ -18,16 +18,14 @@ type CheckSampler struct { series []*metrics.Serie contextResolver *ContextResolver metrics metrics.ContextMetrics - defaultHostname string } // newCheckSampler returns a newly initialized CheckSampler -func newCheckSampler(hostname string) *CheckSampler { +func newCheckSampler() *CheckSampler { return &CheckSampler{ series: make([]*metrics.Serie, 0), contextResolver: newContextResolver(), metrics: metrics.MakeContextMetrics(), - defaultHostname: hostname, } } @@ -59,11 +57,6 @@ func (cs *CheckSampler) commit(timestamp float64) { serie.Name = context.Name + serie.NameSuffix serie.Tags = context.Tags serie.SourceTypeName = checksSourceTypeName // this source type is required for metrics coming from the checks - if context.Host != "" { - serie.Host = context.Host - } else { - serie.Host = cs.defaultHostname - } cs.series = append(cs.series, serie) } diff --git a/pkg/aggregator/check_sampler_test.go b/pkg/aggregator/check_sampler_test.go index 11250abbb7b995..677d90b03e0471 100644 --- a/pkg/aggregator/check_sampler_test.go +++ b/pkg/aggregator/check_sampler_test.go @@ -18,7 +18,7 @@ import ( ) func TestCheckGaugeSampling(t *testing.T) { - checkSampler := newCheckSampler("") + checkSampler := newCheckSampler() mSample1 := metrics.MetricSample{ Name: "my.metric.name", @@ -85,7 +85,7 @@ func TestCheckGaugeSampling(t *testing.T) { } func TestCheckRateSampling(t *testing.T) { - checkSampler := newCheckSampler("") + checkSampler := newCheckSampler() mSample1 := metrics.MetricSample{ Name: "my.metric.name", @@ -134,7 +134,7 @@ func TestCheckRateSampling(t *testing.T) { } func TestHistogramIntervalSampling(t *testing.T) { - checkSampler := newCheckSampler("") + checkSampler := newCheckSampler() mSample1 := metrics.MetricSample{ Name: "my.metric.name", @@ -190,35 +190,3 @@ func TestHistogramIntervalSampling(t *testing.T) { assert.True(t, foundCount) } - -func TestCheckSamplerHostname(t *testing.T) { - checkSampler := newCheckSampler("my.test.hostname") - - mSample1 := metrics.MetricSample{ - Name: "my.metric.name", - Value: 1, - Mtype: metrics.GaugeType, - Tags: []string{"foo", "bar"}, - SampleRate: 1, - Timestamp: 12345.0, - } - mSample2 := metrics.MetricSample{ - Name: "my.metric.name", - Value: 1, - Mtype: metrics.GaugeType, - Tags: []string{"foo", "bar"}, - Host: "metric-hostname", - SampleRate: 1, - Timestamp: 12345, - } - - checkSampler.addSample(&mSample1) - checkSampler.addSample(&mSample2) - checkSampler.commit(12346.0) - series := checkSampler.flush() - - require.Len(t, series, 2) - actualHostnames := []string{series[0].Host, series[1].Host} - assert.Contains(t, actualHostnames, "my.test.hostname") - assert.Contains(t, actualHostnames, "metric-hostname") -} diff --git a/pkg/aggregator/mocksender/mocked_methods.go b/pkg/aggregator/mocksender/mocked_methods.go index ac671a879682db..93824bc5f5e7f3 100644 --- a/pkg/aggregator/mocksender/mocked_methods.go +++ b/pkg/aggregator/mocksender/mocked_methods.go @@ -49,6 +49,11 @@ func (m *MockSender) ServiceCheck(checkName string, status metrics.ServiceCheckS m.Called(checkName, status, hostname, tags, message) } +//DisableDefaultHostname enables the hostname mock call. +func (m *MockSender) DisableDefaultHostname(d bool) { + m.Called(d) +} + //Event enables the event mock call. func (m *MockSender) Event(e metrics.Event) { m.Called(e) diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index a5eb374194a3af..e9d69536e0fc64 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -50,7 +50,7 @@ func (m *MockSender) SetupAcceptAll() { ).Return() m.On("Event", mock.AnythingOfType("metrics.Event")).Return() m.On("GetMetricStats", mock.AnythingOfType("map[string]int64")).Return() - + m.On("DisableDefaultHostname", mock.AnythingOfType("bool")).Return() m.On("Commit").Return() } diff --git a/pkg/aggregator/sender.go b/pkg/aggregator/sender.go index 51faf828077d18..c913a726499a03 100644 --- a/pkg/aggregator/sender.go +++ b/pkg/aggregator/sender.go @@ -34,6 +34,7 @@ type Sender interface { ServiceCheck(checkName string, status metrics.ServiceCheckStatus, hostname string, tags []string, message string) Event(e metrics.Event) GetMetricStats() map[string]int64 + DisableDefaultHostname(disable bool) } type metricStats struct { @@ -52,12 +53,14 @@ type RawSender interface { // checkSender implements Sender type checkSender struct { - id check.ID - metricStats metricStats - priormetricStats metricStats - smsOut chan<- senderMetricSample - serviceCheckOut chan<- metrics.ServiceCheck - eventOut chan<- metrics.Event + id check.ID + defaultHostname string + disableDefaultHostname bool + metricStats metricStats + priormetricStats metricStats + smsOut chan<- senderMetricSample + serviceCheckOut chan<- metrics.ServiceCheck + eventOut chan<- metrics.Event } type senderMetricSample struct { @@ -77,9 +80,10 @@ func init() { } } -func newCheckSender(id check.ID, smsOut chan<- senderMetricSample, serviceCheckOut chan<- metrics.ServiceCheck, eventOut chan<- metrics.Event) *checkSender { +func newCheckSender(id check.ID, defaultHostname string, smsOut chan<- senderMetricSample, serviceCheckOut chan<- metrics.ServiceCheck, eventOut chan<- metrics.Event) *checkSender { return &checkSender{ id: id, + defaultHostname: defaultHostname, smsOut: smsOut, serviceCheckOut: serviceCheckOut, eventOut: eventOut, @@ -127,12 +131,27 @@ func GetDefaultSender() (Sender, error) { senderInit.Do(func() { var defaultCheckID check.ID // the default value is the zero value aggregatorInstance.registerSender(defaultCheckID) - senderInstance = newCheckSender(defaultCheckID, aggregatorInstance.checkMetricIn, aggregatorInstance.serviceCheckIn, aggregatorInstance.eventIn) + senderInstance = newCheckSender(defaultCheckID, aggregatorInstance.hostname, aggregatorInstance.checkMetricIn, aggregatorInstance.serviceCheckIn, aggregatorInstance.eventIn) }) return senderInstance, nil } +// changeAllSendersDefaultHostname is to be called by the aggregator +// when its hostname changes. All existing senders will have their +// default hostname updated. +func changeAllSendersDefaultHostname(hostname string) { + if senderPool != nil { + senderPool.changeAllSendersDefaultHostname(hostname) + } +} + +// DisableDefaultHostname allows check to override the default hostname that will be injected +// when no hostname is specified at submission (for metrics, events and service checks). +func (s *checkSender) DisableDefaultHostname(disable bool) { + s.disableDefaultHostname = disable +} + // Commit commits the metric samples that were added during a check run // Should be called at the end of every check run func (s *checkSender) Commit() { @@ -184,6 +203,10 @@ func (s *checkSender) sendMetricSample(metric string, value float64, hostname st Timestamp: timeNowNano(), } + if !s.disableDefaultHostname && metricSample.Host == "" { + metricSample.Host = s.defaultHostname + } + s.smsOut <- senderMetricSample{s.id, metricSample, false} s.metricStats.Lock.Lock() @@ -248,6 +271,10 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck Message: message, } + if !s.disableDefaultHostname && serviceCheck.Host == "" { + serviceCheck.Host = s.defaultHostname + } + s.serviceCheckOut <- serviceCheck s.metricStats.Lock.Lock() @@ -259,6 +286,10 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck func (s *checkSender) Event(e metrics.Event) { log.Trace("Event submitted: ", e.Title, " for hostname: ", e.Host, " tags: ", e.Tags) + if !s.disableDefaultHostname && e.Host == "" { + e.Host = s.defaultHostname + } + s.eventOut <- e s.metricStats.Lock.Lock() @@ -266,6 +297,19 @@ func (s *checkSender) Event(e metrics.Event) { s.metricStats.Lock.Unlock() } +// changeAllSendersDefaultHostname u +func (sp *checkSenderPool) changeAllSendersDefaultHostname(hostname string) { + sp.m.Lock() + defer sp.m.Unlock() + for _, sender := range sp.senders { + cs, ok := sender.(*checkSender) + if !ok { + continue + } + cs.defaultHostname = hostname + } +} + func (sp *checkSenderPool) getSender(id check.ID) (Sender, error) { sp.m.Lock() defer sp.m.Unlock() @@ -281,7 +325,7 @@ func (sp *checkSenderPool) mkSender(id check.ID) (Sender, error) { defer sp.m.Unlock() err := aggregatorInstance.registerSender(id) - sender := newCheckSender(id, aggregatorInstance.checkMetricIn, aggregatorInstance.serviceCheckIn, aggregatorInstance.eventIn) + sender := newCheckSender(id, aggregatorInstance.hostname, aggregatorInstance.checkMetricIn, aggregatorInstance.serviceCheckIn, aggregatorInstance.eventIn) sp.senders[id] = sender return sender, err } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index dadc8bed9b033b..c190d9ae99633a 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -7,6 +7,7 @@ package aggregator import ( // stdlib + "fmt" "sync" "testing" @@ -107,7 +108,7 @@ func TestGetAndSetSender(t *testing.T) { senderMetricSampleChan := make(chan senderMetricSample, 10) serviceCheckChan := make(chan metrics.ServiceCheck, 10) eventChan := make(chan metrics.Event, 10) - testCheckSender := newCheckSender(checkID1, senderMetricSampleChan, serviceCheckChan, eventChan) + testCheckSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan) err := SetSender(testCheckSender, checkID1) assert.Nil(t, err) @@ -122,7 +123,7 @@ func TestCheckSenderInterface(t *testing.T) { senderMetricSampleChan := make(chan senderMetricSample, 10) serviceCheckChan := make(chan metrics.ServiceCheck, 10) eventChan := make(chan metrics.Event, 10) - checkSender := newCheckSender(checkID1, senderMetricSampleChan, serviceCheckChan, eventChan) + checkSender := newCheckSender(checkID1, "default-hostname", senderMetricSampleChan, serviceCheckChan, eventChan) checkSender.Gauge("my.metric", 1.0, "my-hostname", []string{"foo", "bar"}) checkSender.Rate("my.rate_metric", 2.0, "my-hostname", []string{"foo", "bar"}) checkSender.Count("my.count_metric", 123.0, "my-hostname", []string{"foo", "bar"}) @@ -147,6 +148,7 @@ func TestCheckSenderInterface(t *testing.T) { gaugeSenderSample := <-senderMetricSampleChan assert.EqualValues(t, checkID1, gaugeSenderSample.id) assert.Equal(t, metrics.GaugeType, gaugeSenderSample.metricSample.Mtype) + assert.Equal(t, "my-hostname", gaugeSenderSample.metricSample.Host) assert.Equal(t, false, gaugeSenderSample.commit) rateSenderSample := <-senderMetricSampleChan @@ -188,3 +190,77 @@ func TestCheckSenderInterface(t *testing.T) { event := <-eventChan assert.Equal(t, submittedEvent, event) } + +func TestCheckSenderHostname(t *testing.T) { + defaultHostname := "default-host" + + for nb, tc := range []struct { + disableDefaultHostname bool + submittedHostname string + expectedHostname string + }{ + { + disableDefaultHostname: false, + submittedHostname: "", + expectedHostname: defaultHostname, + }, + { + disableDefaultHostname: false, + submittedHostname: "custom", + expectedHostname: "custom", + }, + { + disableDefaultHostname: true, + submittedHostname: "", + expectedHostname: "", + }, + { + disableDefaultHostname: true, + submittedHostname: "custom", + expectedHostname: "custom", + }, + } { + t.Run(fmt.Sprintf("case %d: %q -> %q", nb, tc.submittedHostname, tc.expectedHostname), func(t *testing.T) { + senderMetricSampleChan := make(chan senderMetricSample, 10) + serviceCheckChan := make(chan metrics.ServiceCheck, 10) + eventChan := make(chan metrics.Event, 10) + checkSender := newCheckSender(checkID1, defaultHostname, senderMetricSampleChan, serviceCheckChan, eventChan) + checkSender.DisableDefaultHostname(tc.disableDefaultHostname) + + checkSender.Gauge("my.metric", 1.0, tc.submittedHostname, []string{"foo", "bar"}) + checkSender.Commit() + checkSender.ServiceCheck("my_service.can_connect", metrics.ServiceCheckOK, tc.submittedHostname, []string{"foo", "bar"}, "message") + submittedEvent := metrics.Event{ + Title: "Something happened", + Text: "Description of the event", + Ts: 12, + Priority: metrics.EventPriorityLow, + Host: tc.submittedHostname, + Tags: []string{"foo", "bar"}, + AlertType: metrics.EventAlertTypeInfo, + AggregationKey: "event_agg_key", + SourceTypeName: "docker", + } + checkSender.Event(submittedEvent) + + gaugeSenderSample := <-senderMetricSampleChan + assert.EqualValues(t, checkID1, gaugeSenderSample.id) + assert.Equal(t, metrics.GaugeType, gaugeSenderSample.metricSample.Mtype) + assert.Equal(t, tc.expectedHostname, gaugeSenderSample.metricSample.Host) + assert.Equal(t, false, gaugeSenderSample.commit) + + serviceCheck := <-serviceCheckChan + assert.Equal(t, "my_service.can_connect", serviceCheck.CheckName) + assert.Equal(t, metrics.ServiceCheckOK, serviceCheck.Status) + assert.Equal(t, tc.expectedHostname, serviceCheck.Host) + assert.Equal(t, []string{"foo", "bar"}, serviceCheck.Tags) + assert.Equal(t, "message", serviceCheck.Message) + + event := <-eventChan + assert.Equal(t, "Something happened", event.Title) + assert.Equal(t, int64(12), event.Ts) + assert.Equal(t, tc.expectedHostname, event.Host) + assert.Equal(t, []string{"foo", "bar"}, event.Tags) + }) + } +} diff --git a/pkg/dogstatsd/parser.go b/pkg/dogstatsd/parser.go index 2dae99f00d7163..7330d85f899aa8 100644 --- a/pkg/dogstatsd/parser.go +++ b/pkg/dogstatsd/parser.go @@ -84,7 +84,7 @@ func parseTags(rawTags []byte, extractHost bool, defaultHostname string) ([]stri return tagsList, host } -func parseServiceCheckMessage(message []byte) (*metrics.ServiceCheck, error) { +func parseServiceCheckMessage(message []byte, defaultHostname string) (*metrics.ServiceCheck, error) { // _sc|name|status|[metadata|...] separatorCount := bytes.Count(message, fieldSeparator) @@ -100,6 +100,7 @@ func parseServiceCheckMessage(message []byte) (*metrics.ServiceCheck, error) { service := metrics.ServiceCheck{ CheckName: string(rawName), + Host: defaultHostname, } if status, err := strconv.Atoi(string(rawStatus)); err != nil { @@ -139,7 +140,7 @@ func parseServiceCheckMessage(message []byte) (*metrics.ServiceCheck, error) { return &service, nil } -func parseEventMessage(message []byte) (*metrics.Event, error) { +func parseEventMessage(message []byte, defaultHostname string) (*metrics.Event, error) { // _e{title.length,text.length}:title|text // [ // |d:date_happened @@ -186,6 +187,7 @@ func parseEventMessage(message []byte) (*metrics.Event, error) { event := metrics.Event{ Priority: metrics.EventPriorityNormal, AlertType: metrics.EventAlertTypeInfo, + Host: defaultHostname, Title: string(rawTitle), Text: string(bytes.Replace(rawText, []byte("\\n"), []byte("\n"), -1)), } diff --git a/pkg/dogstatsd/parser_test.go b/pkg/dogstatsd/parser_test.go index f412196502b414..5ef793042244ad 100644 --- a/pkg/dogstatsd/parser_test.go +++ b/pkg/dogstatsd/parser_test.go @@ -322,11 +322,11 @@ func TestPacketStringEndings(t *testing.T) { } func TestServiceCheckMinimal(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0")) + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0"), "default-hostname") assert.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) - assert.Equal(t, "", sc.Host) + assert.Equal(t, "default-hostname", sc.Host) assert.Equal(t, int64(0), sc.Ts) assert.Equal(t, metrics.ServiceCheckOK, sc.Status) assert.Equal(t, "", sc.Message) @@ -335,35 +335,35 @@ func TestServiceCheckMinimal(t *testing.T) { func TestServiceCheckError(t *testing.T) { // not enough information - _, err := parseServiceCheckMessage([]byte("_sc|agent.up")) + _, err := parseServiceCheckMessage([]byte("_sc|agent.up"), "default-hostname") assert.Error(t, err) - _, err = parseServiceCheckMessage([]byte("_sc|agent.up|")) + _, err = parseServiceCheckMessage([]byte("_sc|agent.up|"), "default-hostname") assert.Error(t, err) // not invalid status - _, err = parseServiceCheckMessage([]byte("_sc|agent.up|OK")) + _, err = parseServiceCheckMessage([]byte("_sc|agent.up|OK"), "default-hostname") assert.Error(t, err) // not unknown status - _, err = parseServiceCheckMessage([]byte("_sc|agent.up|21")) + _, err = parseServiceCheckMessage([]byte("_sc|agent.up|21"), "default-hostname") assert.Error(t, err) // invalid timestamp - _, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|d:some_time")) + _, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|d:some_time"), "default-hostname") assert.NoError(t, err) // unknown metadata - _, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|u:unknown")) + _, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|u:unknown"), "default-hostname") assert.NoError(t, err) } func TestServiceCheckMetadataTimestamp(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21")) + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) - assert.Equal(t, "", sc.Host) + assert.Equal(t, "default-hostname", sc.Host) assert.Equal(t, int64(21), sc.Ts) assert.Equal(t, metrics.ServiceCheckOK, sc.Status) assert.Equal(t, "", sc.Message) @@ -371,7 +371,7 @@ func TestServiceCheckMetadataTimestamp(t *testing.T) { } func TestServiceCheckMetadataHostname(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|h:localhost")) + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|h:localhost"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) @@ -382,8 +382,8 @@ func TestServiceCheckMetadataHostname(t *testing.T) { assert.Equal(t, []string(nil), sc.Tags) } -func TestServiceCheckMetadataTags(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|#tag1,tag2:test,tag3")) +func TestServiceCheckMetadataEmptyHostname(t *testing.T) { + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|h:"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) @@ -391,15 +391,27 @@ func TestServiceCheckMetadataTags(t *testing.T) { assert.Equal(t, int64(0), sc.Ts) assert.Equal(t, metrics.ServiceCheckOK, sc.Status) assert.Equal(t, "", sc.Message) + assert.Equal(t, []string(nil), sc.Tags) +} + +func TestServiceCheckMetadataTags(t *testing.T) { + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|#tag1,tag2:test,tag3"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "agent.up", sc.CheckName) + assert.Equal(t, "default-hostname", sc.Host) + assert.Equal(t, int64(0), sc.Ts) + assert.Equal(t, metrics.ServiceCheckOK, sc.Status) + assert.Equal(t, "", sc.Message) assert.Equal(t, []string{"tag1", "tag2:test", "tag3"}, sc.Tags) } func TestServiceCheckMetadataMessage(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|m:this is fine")) + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|m:this is fine"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) - assert.Equal(t, "", sc.Host) + assert.Equal(t, "default-hostname", sc.Host) assert.Equal(t, int64(0), sc.Ts) assert.Equal(t, metrics.ServiceCheckOK, sc.Status) assert.Equal(t, "this is fine", sc.Message) @@ -408,7 +420,7 @@ func TestServiceCheckMetadataMessage(t *testing.T) { func TestServiceCheckMetadataMultiple(t *testing.T) { // all type - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21|h:localhost|#tag1:test,tag2|m:this is fine")) + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21|h:localhost|#tag1:test,tag2|m:this is fine"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) assert.Equal(t, "localhost", sc.Host) @@ -418,7 +430,7 @@ func TestServiceCheckMetadataMultiple(t *testing.T) { assert.Equal(t, []string{"tag1:test", "tag2"}, sc.Tags) // multiple time the same tag - sc, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21|h:localhost|h:localhost2|d:22")) + sc, err = parseServiceCheckMessage([]byte("_sc|agent.up|0|d:21|h:localhost|h:localhost2|d:22"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) assert.Equal(t, "localhost2", sc.Host) @@ -429,14 +441,14 @@ func TestServiceCheckMetadataMultiple(t *testing.T) { } func TestEventMinimal(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -445,14 +457,14 @@ func TestEventMinimal(t *testing.T) { } func TestEventMultilinesText(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,24}:test title|test\\line1\\nline2\\nline3")) + e, err := parseEventMessage([]byte("_e{10,24}:test title|test\\line1\\nline2\\nline3"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test\\line1\nline2\nline3", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -461,14 +473,14 @@ func TestEventMultilinesText(t *testing.T) { } func TestEventPipeInTitle(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,24}:test|title|test\\line1\\nline2\\nline3")) + e, err := parseEventMessage([]byte("_e{10,24}:test|title|test\\line1\\nline2\\nline3"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test|title", e.Title) assert.Equal(t, "test\\line1\nline2\nline3", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -478,78 +490,78 @@ func TestEventPipeInTitle(t *testing.T) { func TestEventError(t *testing.T) { // missing length header - _, err := parseEventMessage([]byte("_e:title|text")) + _, err := parseEventMessage([]byte("_e:title|text"), "default-hostname") assert.Error(t, err) // greater length than packet - _, err = parseEventMessage([]byte("_e{10,10}:title|text")) + _, err = parseEventMessage([]byte("_e{10,10}:title|text"), "default-hostname") assert.Error(t, err) // zero length - _, err = parseEventMessage([]byte("_e{0,0}:a|a")) + _, err = parseEventMessage([]byte("_e{0,0}:a|a"), "default-hostname") assert.Error(t, err) // missing title or text length - _, err = parseEventMessage([]byte("_e{5555:title|text")) + _, err = parseEventMessage([]byte("_e{5555:title|text"), "default-hostname") assert.Error(t, err) // missing wrong len format - _, err = parseEventMessage([]byte("_e{a,1}:title|text")) + _, err = parseEventMessage([]byte("_e{a,1}:title|text"), "default-hostname") assert.Error(t, err) - _, err = parseEventMessage([]byte("_e{1,a}:title|text")) + _, err = parseEventMessage([]byte("_e{1,a}:title|text"), "default-hostname") assert.Error(t, err) // missing title or text length - _, err = parseEventMessage([]byte("_e{5,}:title|text")) + _, err = parseEventMessage([]byte("_e{5,}:title|text"), "default-hostname") assert.Error(t, err) - _, err = parseEventMessage([]byte("_e{,4}:title|text")) + _, err = parseEventMessage([]byte("_e{,4}:title|text"), "default-hostname") assert.Error(t, err) - _, err = parseEventMessage([]byte("_e{}:title|text")) + _, err = parseEventMessage([]byte("_e{}:title|text"), "default-hostname") assert.Error(t, err) - _, err = parseEventMessage([]byte("_e{,}:title|text")) + _, err = parseEventMessage([]byte("_e{,}:title|text"), "default-hostname") assert.Error(t, err) // not enough information - _, err = parseEventMessage([]byte("_e|text")) + _, err = parseEventMessage([]byte("_e|text"), "default-hostname") assert.Error(t, err) - _, err = parseEventMessage([]byte("_e:|text")) + _, err = parseEventMessage([]byte("_e:|text"), "default-hostname") assert.Error(t, err) // invalid timestamp - _, err = parseEventMessage([]byte("_e{5,4}:title|text|d:abc")) + _, err = parseEventMessage([]byte("_e{5,4}:title|text|d:abc"), "default-hostname") assert.NoError(t, err) // invalid priority - _, err = parseEventMessage([]byte("_e{5,4}:title|text|p:urgent")) + _, err = parseEventMessage([]byte("_e{5,4}:title|text|p:urgent"), "default-hostname") assert.NoError(t, err) // invalid priority - _, err = parseEventMessage([]byte("_e{5,4}:title|text|p:urgent")) + _, err = parseEventMessage([]byte("_e{5,4}:title|text|p:urgent"), "default-hostname") assert.NoError(t, err) // invalid alert type - _, err = parseEventMessage([]byte("_e{5,4}:title|text|t:test")) + _, err = parseEventMessage([]byte("_e{5,4}:title|text|t:test"), "default-hostname") assert.NoError(t, err) // unknown metadata - _, err = parseEventMessage([]byte("_e{5,4}:title|text|x:1234")) + _, err = parseEventMessage([]byte("_e{5,4}:title|text|x:1234"), "default-hostname") assert.NoError(t, err) } func TestEventMetadataTimestamp(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|d:21")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|d:21"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(21), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -558,14 +570,14 @@ func TestEventMetadataTimestamp(t *testing.T) { } func TestEventMetadataPriority(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|p:low")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|p:low"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityLow, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -574,7 +586,7 @@ func TestEventMetadataPriority(t *testing.T) { } func TestEventMetadataHostname(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:localhost")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:localhost"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) @@ -589,8 +601,8 @@ func TestEventMetadataHostname(t *testing.T) { assert.Equal(t, "", e.EventType) } -func TestEventMetadataAlertType(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning")) +func TestEventMetadataEmptyHostname(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) @@ -599,6 +611,22 @@ func TestEventMetadataAlertType(t *testing.T) { assert.Equal(t, metrics.EventPriorityNormal, e.Priority) assert.Equal(t, "", e.Host) assert.Equal(t, []string(nil), e.Tags) + assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) + assert.Equal(t, "", e.AggregationKey) + assert.Equal(t, "", e.SourceTypeName) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataAlertType(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.Title) + assert.Equal(t, "test text", e.Text) + assert.Equal(t, int64(0), e.Ts) + assert.Equal(t, metrics.EventPriorityNormal, e.Priority) + assert.Equal(t, "default-hostname", e.Host) + assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeWarning, e.AlertType) assert.Equal(t, "", e.AggregationKey) assert.Equal(t, "", e.SourceTypeName) @@ -606,14 +634,14 @@ func TestEventMetadataAlertType(t *testing.T) { } func TestEventMetadataAggregatioKey(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|k:some aggregation key")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|k:some aggregation key"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "some aggregation key", e.AggregationKey) @@ -622,14 +650,14 @@ func TestEventMetadataAggregatioKey(t *testing.T) { } func TestEventMetadataSourceType(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|s:this is the source")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|s:this is the source"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string(nil), e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -638,14 +666,14 @@ func TestEventMetadataSourceType(t *testing.T) { } func TestEventMetadataTags(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#tag1,tag2:test")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#tag1,tag2:test"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) assert.Equal(t, "test text", e.Text) assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) - assert.Equal(t, "", e.Host) + assert.Equal(t, "default-hostname", e.Host) assert.Equal(t, []string{"tag1", "tag2:test"}, e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) @@ -654,7 +682,7 @@ func TestEventMetadataTags(t *testing.T) { } func TestEventMetadataMultiple(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test")) + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) diff --git a/pkg/dogstatsd/server.go b/pkg/dogstatsd/server.go index 3b8a38da3a40e7..f5591a32633a74 100644 --- a/pkg/dogstatsd/server.go +++ b/pkg/dogstatsd/server.go @@ -214,7 +214,7 @@ func (s *Server) worker(metricOut chan<- *metrics.MetricSample, eventOut chan<- } if bytes.HasPrefix(message, []byte("_sc")) { - serviceCheck, err := parseServiceCheckMessage(message) + serviceCheck, err := parseServiceCheckMessage(message, s.defaultHostname) if err != nil { log.Errorf("Dogstatsd: error parsing service check: %s", err) dogstatsdServiceCheckParseErrors.Add(1) @@ -226,7 +226,7 @@ func (s *Server) worker(metricOut chan<- *metrics.MetricSample, eventOut chan<- dogstatsdServiceCheckPackets.Add(1) serviceCheckOut <- *serviceCheck } else if bytes.HasPrefix(message, []byte("_e")) { - event, err := parseEventMessage(message) + event, err := parseEventMessage(message, s.defaultHostname) if err != nil { log.Errorf("Dogstatsd: error parsing event: %s", err) dogstatsdEventParseErrors.Add(1) From 536c6f07cfbb53e253b4ef6ca8cd3e6de81aabcf Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 18 Sep 2018 17:14:47 +0200 Subject: [PATCH 02/14] remove from distsampler and timesampler too --- pkg/aggregator/aggregator.go | 4 +- pkg/aggregator/dist_sampler.go | 16 ++-- pkg/aggregator/dist_sampler_test.go | 12 +-- pkg/aggregator/time_sampler.go | 7 +- pkg/aggregator/time_sampler_test.go | 115 +--------------------------- 5 files changed, 19 insertions(+), 135 deletions(-) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index 1839817e023e2d..242fd921e06a4e 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -173,9 +173,9 @@ func NewBufferedAggregator(s *serializer.Serializer, hostname string, flushInter checkMetricIn: make(chan senderMetricSample, 100), // TODO make buffer size configurable serviceCheckIn: make(chan metrics.ServiceCheck, 100), // TODO make buffer size configurable eventIn: make(chan metrics.Event, 100), // TODO make buffer size configurable - sampler: *NewTimeSampler(bucketSize, hostname), + sampler: *NewTimeSampler(bucketSize), checkSamplers: make(map[check.ID]*CheckSampler), - distSampler: newDistSampler(bucketSize, hostname), + distSampler: newDistSampler(bucketSize), flushInterval: flushInterval, serializer: s, hostname: hostname, diff --git a/pkg/aggregator/dist_sampler.go b/pkg/aggregator/dist_sampler.go index 8225bef518d8a7..a8a6eb760b7ba6 100644 --- a/pkg/aggregator/dist_sampler.go +++ b/pkg/aggregator/dist_sampler.go @@ -16,23 +16,21 @@ import ( ) type distSampler struct { - interval int64 - defaultHostname string + interval int64 m sketchMap ctxResolver *ContextResolver } -func newDistSampler(interval int64, defaultHostname string) distSampler { +func newDistSampler(interval int64) distSampler { if interval == 0 { interval = bucketSize } return distSampler{ - interval: interval, - defaultHostname: defaultHostname, - m: make(sketchMap), - ctxResolver: newContextResolver(), + interval: interval, + m: make(sketchMap), + ctxResolver: newContextResolver(), } } @@ -75,10 +73,6 @@ func (d *distSampler) newSeries(ck ckey.ContextKey, points []metrics.SketchPoint ContextKey: ck, } - if ss.Host == "" { - ss.Host = d.defaultHostname - } - return ss } diff --git a/pkg/aggregator/dist_sampler_test.go b/pkg/aggregator/dist_sampler_test.go index 1dfcbc5f18cf73..fd2afed589883b 100644 --- a/pkg/aggregator/dist_sampler_test.go +++ b/pkg/aggregator/dist_sampler_test.go @@ -9,21 +9,21 @@ import ( "sort" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" "github.com/DataDog/datadog-agent/pkg/metrics" "github.com/DataDog/datadog-agent/pkg/quantile" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestDistSampler(t *testing.T) { const ( - defaultHost = "default_host" defaultBucketSize = 10 ) var ( - d = newDistSampler(0, defaultHost) + d = newDistSampler(0) insert = func(t *testing.T, ts float64, ctx Context, values ...float64) { t.Helper() @@ -85,7 +85,7 @@ func TestDistSampler(t *testing.T) { func TestDistSamplerBucketSampling(t *testing.T) { - distSampler := newDistSampler(10, "") + distSampler := newDistSampler(10) mSample1 := metrics.MetricSample{ Name: "test.metric.name", @@ -128,7 +128,7 @@ func TestDistSamplerBucketSampling(t *testing.T) { } func TestDistSamplerContextSampling(t *testing.T) { - distSampler := newDistSampler(10, "") + distSampler := newDistSampler(10) mSample1 := metrics.MetricSample{ Name: "test.metric.name1", diff --git a/pkg/aggregator/time_sampler.go b/pkg/aggregator/time_sampler.go index 1e6a3a2287b97c..26afe5b0b75950 100644 --- a/pkg/aggregator/time_sampler.go +++ b/pkg/aggregator/time_sampler.go @@ -6,11 +6,10 @@ package aggregator import ( - "github.com/DataDog/datadog-agent/pkg/util/log" - "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/log" ) const defaultExpiry = 300.0 // number of seconds after which contexts are expired @@ -28,18 +27,16 @@ type TimeSampler struct { interval int64 contextResolver *ContextResolver metricsByTimestamp map[int64]metrics.ContextMetrics - defaultHostname string counterLastSampledByContext map[ckey.ContextKey]float64 lastCutOffTime int64 } // NewTimeSampler returns a newly initialized TimeSampler -func NewTimeSampler(interval int64, defaultHostname string) *TimeSampler { +func NewTimeSampler(interval int64) *TimeSampler { return &TimeSampler{ interval: interval, contextResolver: newContextResolver(), metricsByTimestamp: map[int64]metrics.ContextMetrics{}, - defaultHostname: defaultHostname, counterLastSampledByContext: map[ckey.ContextKey]float64{}, } } diff --git a/pkg/aggregator/time_sampler_test.go b/pkg/aggregator/time_sampler_test.go index b212f3e7bbd145..eed83fe40b9cef 100644 --- a/pkg/aggregator/time_sampler_test.go +++ b/pkg/aggregator/time_sampler_test.go @@ -6,15 +6,12 @@ package aggregator import ( - // stdlib "sort" "testing" - // 3p "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - // project "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" "github.com/DataDog/datadog-agent/pkg/metrics" ) @@ -37,7 +34,7 @@ func (os OrderedSeries) Swap(i, j int) { // TimeSampler func TestCalculateBucketStart(t *testing.T) { - sampler := NewTimeSampler(10, "") + sampler := NewTimeSampler(10) assert.Equal(t, int64(123450), sampler.calculateBucketStart(123456.5)) assert.Equal(t, int64(123460), sampler.calculateBucketStart(123460.5)) @@ -45,7 +42,7 @@ func TestCalculateBucketStart(t *testing.T) { } func TestBucketSampling(t *testing.T) { - sampler := NewTimeSampler(10, "") + sampler := NewTimeSampler(10) mSample := metrics.MetricSample{ Name: "my.metric.name", @@ -76,7 +73,7 @@ func TestBucketSampling(t *testing.T) { } func TestContextSampling(t *testing.T) { - sampler := NewTimeSampler(10, "default-hostname") + sampler := NewTimeSampler(10) mSample1 := metrics.MetricSample{ Name: "my.metric.name1", @@ -142,7 +139,7 @@ func TestContextSampling(t *testing.T) { } func TestCounterExpirySeconds(t *testing.T) { - sampler := NewTimeSampler(10, "default-hostname") + sampler := NewTimeSampler(10) sampleCounter1 := &metrics.MetricSample{ Name: "my.counter1", @@ -267,107 +264,3 @@ func TestCounterExpirySeconds(t *testing.T) { assert.Equal(t, 0, len(sampler.counterLastSampledByContext)) assert.Equal(t, 0, len(sampler.contextResolver.contextsByKey)) } - -//func TestOne(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestFormatter(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestCounterNormalization(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestHistogramNormalization(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestCounter(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestSampledCounter(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestGauge(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestSets(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestStringSets(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestRate(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestRateErrors(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestGaugeSampleRate(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestHistogram(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestSampledHistogram(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestBatchSubmission(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestMonokeyBatchingNoTags(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestMonokeyBatchingWithTags(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestMonokeyBatchingWithTagsWithSampling(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestBadPacketsThrowErrors(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestMetricsExpiry(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestDiagnosticStats(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestHistogramCounter(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestEventTags(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestServiceCheckBasic(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestServiceCheckTags(t *testing.T) { -// assert.Equal(t, 1, 1) -//} -// -//func TestRecentPointThreshold(t *testing.T) { -// assert.Equal(t, 1, 1) -//} From e5bba7e3c79a7ae78388d25b7f60f7b1421012d9 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 18 Sep 2018 17:34:40 +0200 Subject: [PATCH 03/14] improve testing --- pkg/aggregator/aggregator_test.go | 7 +++++++ pkg/aggregator/sender_test.go | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index 5dd8812d26430a..33e239fe593828 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -132,6 +132,13 @@ func TestSetHostname(t *testing.T) { resetAggregator() agg := InitAggregator(nil, "hostname") assert.Equal(t, "hostname", agg.hostname) + sender, err := GetSender(checkID1) + require.NoError(t, err) + checkSender, ok := sender.(*checkSender) + require.True(t, ok) + assert.Equal(t, "hostname", checkSender.defaultHostname) + agg.SetHostname("different-hostname") assert.Equal(t, "different-hostname", agg.hostname) + assert.Equal(t, "different-hostname", checkSender.defaultHostname) } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index c190d9ae99633a..0e55da2567a0f3 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -264,3 +264,25 @@ func TestCheckSenderHostname(t *testing.T) { }) } } + +func TestChangeAllSendersDefaultHostname(t *testing.T) { + senderMetricSampleChan := make(chan senderMetricSample, 10) + serviceCheckChan := make(chan metrics.ServiceCheck, 10) + eventChan := make(chan metrics.Event, 10) + checkSender := newCheckSender(checkID1, "hostname1", senderMetricSampleChan, serviceCheckChan, eventChan) + SetSender(checkSender, checkID1) + + checkSender.Gauge("my.metric", 1.0, "", nil) + gaugeSenderSample := <-senderMetricSampleChan + assert.Equal(t, "hostname1", gaugeSenderSample.metricSample.Host) + + changeAllSendersDefaultHostname("hostname2") + checkSender.Gauge("my.metric", 1.0, "", nil) + gaugeSenderSample = <-senderMetricSampleChan + assert.Equal(t, "hostname2", gaugeSenderSample.metricSample.Host) + + changeAllSendersDefaultHostname("hostname1") + checkSender.Gauge("my.metric", 1.0, "", nil) + gaugeSenderSample = <-senderMetricSampleChan + assert.Equal(t, "hostname1", gaugeSenderSample.metricSample.Host) +} From 3419582fd7fb6c0f7ae5fe88afa34c888f369e6b Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 19 Sep 2018 12:03:09 +0200 Subject: [PATCH 04/14] fix hostname passing in checksampler --- pkg/aggregator/check_sampler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/aggregator/check_sampler.go b/pkg/aggregator/check_sampler.go index 39ce305a26b98c..0933e6cc16815f 100644 --- a/pkg/aggregator/check_sampler.go +++ b/pkg/aggregator/check_sampler.go @@ -56,6 +56,7 @@ func (cs *CheckSampler) commit(timestamp float64) { } serie.Name = context.Name + serie.NameSuffix serie.Tags = context.Tags + serie.Host = context.Host serie.SourceTypeName = checksSourceTypeName // this source type is required for metrics coming from the checks cs.series = append(cs.series, serie) From 9251008d0ab33036de6d1310bf40da3c1ccb531c Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 19 Sep 2018 14:42:59 +0200 Subject: [PATCH 05/14] improve testing --- pkg/aggregator/sender.go | 6 +++--- pkg/aggregator/sender_test.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/aggregator/sender.go b/pkg/aggregator/sender.go index c913a726499a03..cd5682d3c1893c 100644 --- a/pkg/aggregator/sender.go +++ b/pkg/aggregator/sender.go @@ -203,7 +203,7 @@ func (s *checkSender) sendMetricSample(metric string, value float64, hostname st Timestamp: timeNowNano(), } - if !s.disableDefaultHostname && metricSample.Host == "" { + if hostname == "" && !s.disableDefaultHostname { metricSample.Host = s.defaultHostname } @@ -271,7 +271,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck Message: message, } - if !s.disableDefaultHostname && serviceCheck.Host == "" { + if hostname == "" && !s.disableDefaultHostname { serviceCheck.Host = s.defaultHostname } @@ -286,7 +286,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck func (s *checkSender) Event(e metrics.Event) { log.Trace("Event submitted: ", e.Title, " for hostname: ", e.Host, " tags: ", e.Tags) - if !s.disableDefaultHostname && e.Host == "" { + if e.Host == "" && !s.disableDefaultHostname { e.Host = s.defaultHostname } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 0e55da2567a0f3..8ffd4ce3dc344c 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -14,6 +14,7 @@ import ( // 3p "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/DataDog/datadog-agent/pkg/collector/check" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -119,6 +120,21 @@ func TestGetAndSetSender(t *testing.T) { } +func TestGetSenderDefaultHostname(t *testing.T) { + resetAggregator() + InitAggregator(nil, "testhostname") + + sender, err := GetSender(checkID1) + require.NoError(t, err) + + checksender, ok := sender.(*checkSender) + require.True(t, ok) + + assert.Equal(t, "testhostname", checksender.defaultHostname) + assert.Equal(t, false, checksender.disableDefaultHostname) + +} + func TestCheckSenderInterface(t *testing.T) { senderMetricSampleChan := make(chan senderMetricSample, 10) serviceCheckChan := make(chan metrics.ServiceCheck, 10) From 30377f3db19b3145cb92d47827b2d03e300e2c91 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 19 Sep 2018 14:50:37 +0200 Subject: [PATCH 06/14] use agent hostname in the startup event --- pkg/aggregator/aggregator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index 242fd921e06a4e..2b14fa24f6535f 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -227,6 +227,7 @@ func (agg *BufferedAggregator) AddAgentStartupEvent(agentVersion string) { event := metrics.Event{ Text: fmt.Sprintf("Version %s", agentVersion), SourceTypeName: "System", + Host: agg.hostname, EventType: "Agent Startup", } agg.eventIn <- event From 50b4434b8f03c8e985b5255cda19dea151e2c259 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 19 Sep 2018 16:02:26 +0200 Subject: [PATCH 07/14] handle empty host: tag for dsd events and service checks --- pkg/dogstatsd/parser.go | 31 ++++++++++++++++++++++------ pkg/dogstatsd/parser_test.go | 40 ++++++++++++++++++++++++++++++------ 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/pkg/dogstatsd/parser.go b/pkg/dogstatsd/parser.go index 7330d85f899aa8..db7abb9cb0ae6e 100644 --- a/pkg/dogstatsd/parser.go +++ b/pkg/dogstatsd/parser.go @@ -100,7 +100,6 @@ func parseServiceCheckMessage(message []byte, defaultHostname string) (*metrics. service := metrics.ServiceCheck{ CheckName: string(rawName), - Host: defaultHostname, } if status, err := strconv.Atoi(string(rawStatus)); err != nil { @@ -111,6 +110,11 @@ func parseServiceCheckMessage(message []byte, defaultHostname string) (*metrics. service.Status = serviceStatus } + // Handle hostname, with a priority to the h: field, then the host: + // tag and finally the defaultHostname value + var hostFromField string + hostFromTags := defaultHostname + // Metadata for { var rawMetadataField []byte @@ -127,9 +131,9 @@ func parseServiceCheckMessage(message []byte, defaultHostname string) (*metrics. } service.Ts = ts } else if bytes.HasPrefix(rawMetadataField, []byte("h:")) { - service.Host = string(rawMetadataField[2:]) + hostFromField = string(rawMetadataField[2:]) } else if bytes.HasPrefix(rawMetadataField, []byte("#")) { - service.Tags, _ = parseTags(rawMetadataField[1:], false, "") + service.Tags, hostFromTags = parseTags(rawMetadataField[1:], true, defaultHostname) } else if bytes.HasPrefix(rawMetadataField, []byte("m:")) { service.Message = string(rawMetadataField[2:]) } else { @@ -137,6 +141,11 @@ func parseServiceCheckMessage(message []byte, defaultHostname string) (*metrics. } } + if hostFromField != "" { + service.Host = hostFromField + } else { + service.Host = hostFromTags + } return &service, nil } @@ -187,11 +196,15 @@ func parseEventMessage(message []byte, defaultHostname string) (*metrics.Event, event := metrics.Event{ Priority: metrics.EventPriorityNormal, AlertType: metrics.EventAlertTypeInfo, - Host: defaultHostname, Title: string(rawTitle), Text: string(bytes.Replace(rawText, []byte("\\n"), []byte("\n"), -1)), } + // Handle hostname, with a priority to the h: field, then the host: + // tag and finally the defaultHostname value + var hostFromField string + hostFromTags := defaultHostname + // Metadata if len(message) > 1 { rawMetadataFields := bytes.Split(message[1:], []byte("|")) @@ -212,7 +225,7 @@ func parseEventMessage(message []byte, defaultHostname string) (*metrics.Event, } event.Priority = priority } else if bytes.HasPrefix(rawMetadataFields[i], []byte("h:")) { - event.Host = string(rawMetadataFields[i][2:]) + hostFromField = string(rawMetadataFields[i][2:]) } else if bytes.HasPrefix(rawMetadataFields[i], []byte("t:")) { t, err := metrics.GetAlertTypeFromString(string(rawMetadataFields[i][2:])) if err != nil { @@ -225,13 +238,19 @@ func parseEventMessage(message []byte, defaultHostname string) (*metrics.Event, } else if bytes.HasPrefix(rawMetadataFields[i], []byte("s:")) { event.SourceTypeName = string(rawMetadataFields[i][2:]) } else if bytes.HasPrefix(rawMetadataFields[i], []byte("#")) { - event.Tags, _ = parseTags(rawMetadataFields[i][1:], false, "") + event.Tags, hostFromTags = parseTags(rawMetadataFields[i][1:], true, defaultHostname) + } else { log.Warnf("unknown metadata type: '%s'", rawMetadataFields[i]) } } } + if hostFromField != "" { + event.Host = hostFromField + } else { + event.Host = hostFromTags + } return &event, nil } diff --git a/pkg/dogstatsd/parser_test.go b/pkg/dogstatsd/parser_test.go index 5ef793042244ad..7ae32e57fea2ad 100644 --- a/pkg/dogstatsd/parser_test.go +++ b/pkg/dogstatsd/parser_test.go @@ -382,8 +382,20 @@ func TestServiceCheckMetadataHostname(t *testing.T) { assert.Equal(t, []string(nil), sc.Tags) } -func TestServiceCheckMetadataEmptyHostname(t *testing.T) { - sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|h:"), "default-hostname") +func TestServiceCheckMetadataHostnameInTag(t *testing.T) { + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|#host:localhost"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "agent.up", sc.CheckName) + assert.Equal(t, "localhost", sc.Host) + assert.Equal(t, int64(0), sc.Ts) + assert.Equal(t, metrics.ServiceCheckOK, sc.Status) + assert.Equal(t, "", sc.Message) + assert.Equal(t, []string{}, sc.Tags) +} + +func TestServiceCheckMetadataEmptyHostTag(t *testing.T) { + sc, err := parseServiceCheckMessage([]byte("_sc|agent.up|0|#host:,other:tag"), "default-hostname") require.Nil(t, err) assert.Equal(t, "agent.up", sc.CheckName) @@ -391,7 +403,7 @@ func TestServiceCheckMetadataEmptyHostname(t *testing.T) { assert.Equal(t, int64(0), sc.Ts) assert.Equal(t, metrics.ServiceCheckOK, sc.Status) assert.Equal(t, "", sc.Message) - assert.Equal(t, []string(nil), sc.Tags) + assert.Equal(t, []string{"other:tag"}, sc.Tags) } func TestServiceCheckMetadataTags(t *testing.T) { @@ -601,8 +613,24 @@ func TestEventMetadataHostname(t *testing.T) { assert.Equal(t, "", e.EventType) } -func TestEventMetadataEmptyHostname(t *testing.T) { - e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|h:"), "default-hostname") +func TestEventMetadataHostnameInTag(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:localhost"), "default-hostname") + + require.Nil(t, err) + assert.Equal(t, "test title", e.Title) + assert.Equal(t, "test text", e.Text) + assert.Equal(t, int64(0), e.Ts) + assert.Equal(t, metrics.EventPriorityNormal, e.Priority) + assert.Equal(t, "localhost", e.Host) + assert.Equal(t, []string{}, e.Tags) + assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) + assert.Equal(t, "", e.AggregationKey) + assert.Equal(t, "", e.SourceTypeName) + assert.Equal(t, "", e.EventType) +} + +func TestEventMetadataEmptyHostTag(t *testing.T) { + e, err := parseEventMessage([]byte("_e{10,9}:test title|test text|#host:,other:tag"), "default-hostname") require.Nil(t, err) assert.Equal(t, "test title", e.Title) @@ -610,7 +638,7 @@ func TestEventMetadataEmptyHostname(t *testing.T) { assert.Equal(t, int64(0), e.Ts) assert.Equal(t, metrics.EventPriorityNormal, e.Priority) assert.Equal(t, "", e.Host) - assert.Equal(t, []string(nil), e.Tags) + assert.Equal(t, []string{"other:tag"}, e.Tags) assert.Equal(t, metrics.EventAlertTypeInfo, e.AlertType) assert.Equal(t, "", e.AggregationKey) assert.Equal(t, "", e.SourceTypeName) From 7b7afe95ea452ca1dc2ea6061573337c157a426d Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 19 Sep 2018 16:04:23 +0200 Subject: [PATCH 08/14] reno --- .../notes/dogstatsd-empty-hostname-fefa16f74994210f.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/dogstatsd-empty-hostname-fefa16f74994210f.yaml diff --git a/releasenotes/notes/dogstatsd-empty-hostname-fefa16f74994210f.yaml b/releasenotes/notes/dogstatsd-empty-hostname-fefa16f74994210f.yaml new file mode 100644 index 00000000000000..b706dc4e0e32b5 --- /dev/null +++ b/releasenotes/notes/dogstatsd-empty-hostname-fefa16f74994210f.yaml @@ -0,0 +1,4 @@ +--- +enhancements: + - Dogstatsd supports removing the hostname on events and services checks + as it did with metrics, by adding an empty ``host:`` tag From 32b29bb94f6c8b367d62c671366a7ddbaa8e8de8 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 26 Sep 2018 15:30:59 +0200 Subject: [PATCH 09/14] clear cci cache --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 477dab8036fb3c..ff2308a9e40dc2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,8 +27,8 @@ templates: # The first match will be used. Doing that so new branches # use master's cache but don't pollute it back. - v2-godeps-{{ .Branch }}-{{ .Revision }} - - v2-godeps-{{ .Branch }}- - - v2-godeps-master- + #- v2-godeps-{{ .Branch }}- + #- v2-godeps-master- - run: &enter_venv name: add virtualenv to bashrc command: echo "source /go/src/github.com/DataDog/datadog-agent/venv/bin/activate" >> $BASH_ENV From 2287e6c4a548c89422c6b3fefc1d60aa34302fbc Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 26 Sep 2018 15:43:09 +0200 Subject: [PATCH 10/14] Revert "clear cci cache" This reverts commit a99fc330ad7ae60c18ab21071428008fe3747b2b. --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ff2308a9e40dc2..477dab8036fb3c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,8 +27,8 @@ templates: # The first match will be used. Doing that so new branches # use master's cache but don't pollute it back. - v2-godeps-{{ .Branch }}-{{ .Revision }} - #- v2-godeps-{{ .Branch }}- - #- v2-godeps-master- + - v2-godeps-{{ .Branch }}- + - v2-godeps-master- - run: &enter_venv name: add virtualenv to bashrc command: echo "source /go/src/github.com/DataDog/datadog-agent/venv/bin/activate" >> $BASH_ENV From 3eafdaaddc358c91647ad60a89d5b174d55cf90d Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 27 Sep 2018 10:14:48 +0200 Subject: [PATCH 11/14] add end-to-end testing for check and dsd hostnames --- test/e2e/argo-workflows/agent.yaml | 216 ++++++++++++++++-- test/e2e/containers/dsd_sender/Dockerfile | 7 + test/e2e/containers/dsd_sender/Makefile | 9 + test/e2e/containers/dsd_sender/sender.py | 22 ++ .../scripts/run-instance/22-argo-submit.sh | 10 +- 5 files changed, 245 insertions(+), 19 deletions(-) create mode 100644 test/e2e/containers/dsd_sender/Dockerfile create mode 100644 test/e2e/containers/dsd_sender/Makefile create mode 100644 test/e2e/containers/dsd_sender/sender.py diff --git a/test/e2e/argo-workflows/agent.yaml b/test/e2e/argo-workflows/agent.yaml index 7c55d709b25f5d..0ed9663da39e97 100644 --- a/test/e2e/argo-workflows/agent.yaml +++ b/test/e2e/argo-workflows/agent.yaml @@ -96,6 +96,39 @@ spec: memory: "64Mi" cpu: "400m" + - name: dsd-hostname + value: | + apiVersion: extensions/v1beta1 + kind: Deployment + metadata: + name: dsd-hostname + namespace: default + spec: + replicas: 1 + template: + metadata: + labels: + app: dsd-hostname + spec: + containers: + - name: sender + image: datadog/docker-library:e2e-dsd-sender_latest + resources: + requests: + memory: "32Mi" + cpu: "100m" + limits: + memory: "32Mi" + cpu: "100m" + volumeMounts: + - name: dogstatsd + mountPath: /var/run/dogstatsd + readOnly: true + volumes: + - hostPath: + path: /var/run/dogstatsd + name: dogstatsd + - name: agent-service-account value: | kind: ServiceAccount @@ -183,6 +216,9 @@ spec: polling: true leader_election: true kubernetes_metadata_tag_update_freq: 20 + dogstatsd_socket: /var/run/dogstatsd/dsd.socket + # Hardcode a hostname to test dogstatsd hostname injection + hostname: e2e-test kubelet.yaml: | init_config: @@ -207,7 +243,6 @@ spec: instances: - {} - - name: agent-daemonset value: | apiVersion: extensions/v1beta1 @@ -280,7 +315,9 @@ spec: - name: dockersocket mountPath: /var/run/docker.sock readOnly: true - + - name: dogstatsd + mountPath: /var/run/dogstatsd + readOnly: false volumes: - name: datadog-config configMap: @@ -294,6 +331,9 @@ spec: - hostPath: path: /var/run/docker.sock name: dockersocket + - hostPath: + path: /var/run/dogstatsd + name: dogstatsd - name: fake-datadog-service value: | @@ -464,6 +504,7 @@ spec: inputs: parameters: - name: cpu-stress + - name: dsd-hostname - name: redis-service - name: redis-deployment - name: agent-configmap @@ -500,6 +541,19 @@ spec: withItems: - "{{inputs.parameters.redis-deployment}}" + - name: kube-state-metrics-setup + template: manifest + arguments: + parameters: + - name: action + value: "apply" + - name: manifest + value: "{{item}}" + withItems: + - "{{inputs.parameters.kube-state-metrics-rbac}}" + - "{{inputs.parameters.kube-state-metrics-deployment}}" + - "{{inputs.parameters.kube-state-metrics-service}}" + - name: cpu-stress-setup template: manifest arguments: @@ -511,10 +565,7 @@ spec: withItems: - "{{inputs.parameters.cpu-stress}}" - - - name: fake-dd-reset - template: fake-dd-reset - - - - name: agent-setup + - name: dsd-hostname template: manifest arguments: parameters: @@ -523,13 +574,12 @@ spec: - name: manifest value: "{{item}}" withItems: - - "{{inputs.parameters.agent-configmap}}" - - "{{inputs.parameters.agent-service-account}}" - - "{{inputs.parameters.agent-cluster-role}}" - - "{{inputs.parameters.agent-cluster-role-binding}}" - - "{{inputs.parameters.agent-daemonset}}" + - "{{inputs.parameters.dsd-hostname}}" + + - - name: fake-dd-reset + template: fake-dd-reset - - - name: kube-state-metrics-setup + - - name: agent-setup template: manifest arguments: parameters: @@ -538,9 +588,11 @@ spec: - name: manifest value: "{{item}}" withItems: - - "{{inputs.parameters.kube-state-metrics-rbac}}" - - "{{inputs.parameters.kube-state-metrics-deployment}}" - - "{{inputs.parameters.kube-state-metrics-service}}" + - "{{inputs.parameters.agent-configmap}}" + - "{{inputs.parameters.agent-service-account}}" + - "{{inputs.parameters.agent-cluster-role}}" + - "{{inputs.parameters.agent-cluster-role-binding}}" + - "{{inputs.parameters.agent-daemonset}}" - - name: health template: datadog-agent-health @@ -569,6 +621,12 @@ spec: - name: find-metrics-cpu-system template: find-metrics-cpu-system + - name: find-dogstatsd-hostname + template: find-dogstatsd-hostname + + - name: find-checks-hostname + template: find-checks-hostname + - - name: redis-service-setup template: manifest arguments: @@ -615,6 +673,7 @@ spec: - name: redis-service - name: redis-deployment - name: cpu-stress + - name: dsd-hostname - name: agent-configmap - name: agent-service-account - name: agent-cluster-role @@ -638,6 +697,7 @@ spec: - "{{inputs.parameters.redis-service}}" - "{{inputs.parameters.redis-deployment}}" - "{{inputs.parameters.cpu-stress}}" + - "{{inputs.parameters.dsd-hostname}}" - "{{inputs.parameters.agent-configmap}}" - "{{inputs.parameters.agent-service-account}}" - "{{inputs.parameters.agent-cluster-role}}" @@ -847,6 +907,132 @@ spec: sleep(2000); } + - name: find-dogstatsd-hostname + activeDeadlineSeconds: 200 + script: + image: mongo:3.6.3 + command: [mongo, "fake-datadog.default.svc.cluster.local/datadog"] + source: | + while (1) { + sleep(2000); + + // Gauges + var nb = db.series.find({ + metric: "dsd.hostname.e2e", + host: "e2e-test", + tags: {$all: ["case:nominal"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e metric with nominal hostname"); + continue; + } + var nb = db.series.find({ + metric: "dsd.hostname.e2e", + host: "forced", + tags: {$all: ["case:forced"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e metric with forced hostname"); + continue; + } + var nb = db.series.find({ + metric: "dsd.hostname.e2e", + host: "", + tags: {$all: ["case:empty"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e metric with empty hostname"); + continue; + } + + // Service checks + var nb = db.check_run.find({ + check: "dsd.hostname.e2e", + host_name: "e2e-test", + tags: {$all: ["case:nominal"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e servicecheck with nominal hostname"); + continue; + } + var nb = db.check_run.find({ + check: "dsd.hostname.e2e", + host_name: "forced", + tags: {$all: ["case:forced"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e servicecheck with forced hostname"); + continue; + } + var nb = db.check_run.find({ + check: "dsd.hostname.e2e", + host_name: "", + tags: {$all: ["case:empty"]} + }).count(); + if (nb == 0) { + print("no dsd.hostname.e2e servicecheck with empty hostname"); + continue; + } + + // Events + var nb = db.intake.find({ + "events.api": { $elemMatch: { + msg_title: "dsd.hostname.e2e", + host: "e2e-test", + tags: {$all: ["case:nominal"]} + }}}).count(); + if (nb == 0) { + print("no dsd.hostname.e2e event with nominal hostname"); + continue; + } + var nb = db.intake.find({ + "events.api": { $elemMatch: { + msg_title: "dsd.hostname.e2e", + host: "forced", + tags: {$all: ["case:forced"]} + }}}).count(); + if (nb == 0) { + print("no dsd.hostname.e2e event with forced hostname"); + continue; + } + var nb = db.intake.find({ + "events.api": { $elemMatch: { + msg_title: "dsd.hostname.e2e", + host: "", + tags: {$all: ["case:empty"]} + }}}).count(); + if (nb == 0) { + print("no dsd.hostname.e2e event with empty hostname"); + continue; + } + + print("All good"); + break; + } + + - name: find-checks-hostname + activeDeadlineSeconds: 200 + script: + image: mongo:3.6.3 + command: [mongo, "fake-datadog.default.svc.cluster.local/datadog"] + source: | + while (1) { + sleep(2000); + + // Gauges + var nb = db.series.find({ + metric: "system.cpu.user", + host: "e2e-test" + }).count(); + if (nb == 0) { + print("no system.cpu.user metric with nominal hostname"); + continue; + } + + print("All good"); + break; + } + - name: find-metrics-cpu-system activeDeadlineSeconds: 200 script: diff --git a/test/e2e/containers/dsd_sender/Dockerfile b/test/e2e/containers/dsd_sender/Dockerfile new file mode 100644 index 00000000000000..1b6a5ae33c311d --- /dev/null +++ b/test/e2e/containers/dsd_sender/Dockerfile @@ -0,0 +1,7 @@ +FROM datadog/docker-library:python_2_7-alpine3_6 + +RUN pip install datadog + +COPY sender.py /sender.py + +CMD [ "python", "/sender.py" ] diff --git a/test/e2e/containers/dsd_sender/Makefile b/test/e2e/containers/dsd_sender/Makefile new file mode 100644 index 00000000000000..bfdc5e51e02727 --- /dev/null +++ b/test/e2e/containers/dsd_sender/Makefile @@ -0,0 +1,9 @@ +TAG?=latest + +default: build push + +build: + docker build --force-rm -t datadog/docker-library:e2e-dsd-sender_$(TAG) . + +push: + docker push datadog/docker-library:e2e-dsd-sender_$(TAG) diff --git a/test/e2e/containers/dsd_sender/sender.py b/test/e2e/containers/dsd_sender/sender.py new file mode 100644 index 00000000000000..4e3736ea393de1 --- /dev/null +++ b/test/e2e/containers/dsd_sender/sender.py @@ -0,0 +1,22 @@ +import datadog +import time + +client = datadog.dogstatsd.base.DogStatsd(socket_path="/var/run/dogstatsd/dsd.socket") + +while True: + # Nominal case, dsd will inject its hostname + client.gauge('dsd.hostname.e2e', 1, tags=["case:nominal"]) + client.service_check('dsd.hostname.e2e', 0, tags=["case:nominal"]) + client.event('dsd.hostname.e2e', 'text', tags=["case:nominal"]) + + # Force the hostname value + client.gauge('dsd.hostname.e2e', 1, tags=["case:forced", "host:forced"]) + client.service_check('dsd.hostname.e2e', 0, tags=["case:forced"], hostname="forced") + client.event('dsd.hostname.e2e', 'text', tags=["case:forced"], hostname="forced") + + # Force an empty hostname + client.gauge('dsd.hostname.e2e', 1, tags=["case:empty", "host:"]) + client.service_check('dsd.hostname.e2e', 0, tags=["case:empty", "host:"]) + client.event('dsd.hostname.e2e', 'text', tags=["case:empty", "host:"]) + + time.sleep(10) diff --git a/test/e2e/scripts/run-instance/22-argo-submit.sh b/test/e2e/scripts/run-instance/22-argo-submit.sh index e4b0eb2b090f3e..9127734dfff0e9 100755 --- a/test/e2e/scripts/run-instance/22-argo-submit.sh +++ b/test/e2e/scripts/run-instance/22-argo-submit.sh @@ -78,9 +78,6 @@ spec: - name: datadog-config mountPath: /etc/datadog-agent/conf.d/network.d/conf.yaml.default subPath: network.yaml - - name: datadog-config - mountPath: /etc/datadog-agent/conf.d/docker.d/conf.yaml.default - subPath: docker.yaml - name: proc mountPath: /host/proc readOnly: true @@ -90,7 +87,9 @@ spec: - name: dockersocket mountPath: /var/run/docker.sock readOnly: true - + - name: dogstatsd + mountPath: /var/run/dogstatsd + readOnly: false volumes: - name: datadog-config configMap: @@ -104,6 +103,9 @@ spec: - hostPath: path: /var/run/docker.sock name: dockersocket + - hostPath: + path: /var/run/dogstatsd + name: dogstatsd --- EOF ) From 9bc084dd967c973aa2e6583db4071934067a0ede Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 27 Sep 2018 15:14:52 +0200 Subject: [PATCH 12/14] rename checkSender internal member --- pkg/aggregator/sender.go | 24 ++++++++++++------------ pkg/aggregator/sender_test.go | 34 +++++++++++++++++----------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/aggregator/sender.go b/pkg/aggregator/sender.go index cd5682d3c1893c..9f4153241a065c 100644 --- a/pkg/aggregator/sender.go +++ b/pkg/aggregator/sender.go @@ -53,14 +53,14 @@ type RawSender interface { // checkSender implements Sender type checkSender struct { - id check.ID - defaultHostname string - disableDefaultHostname bool - metricStats metricStats - priormetricStats metricStats - smsOut chan<- senderMetricSample - serviceCheckOut chan<- metrics.ServiceCheck - eventOut chan<- metrics.Event + id check.ID + defaultHostname string + defaultHostnameDisabled bool + metricStats metricStats + priormetricStats metricStats + smsOut chan<- senderMetricSample + serviceCheckOut chan<- metrics.ServiceCheck + eventOut chan<- metrics.Event } type senderMetricSample struct { @@ -149,7 +149,7 @@ func changeAllSendersDefaultHostname(hostname string) { // DisableDefaultHostname allows check to override the default hostname that will be injected // when no hostname is specified at submission (for metrics, events and service checks). func (s *checkSender) DisableDefaultHostname(disable bool) { - s.disableDefaultHostname = disable + s.defaultHostnameDisabled = disable } // Commit commits the metric samples that were added during a check run @@ -203,7 +203,7 @@ func (s *checkSender) sendMetricSample(metric string, value float64, hostname st Timestamp: timeNowNano(), } - if hostname == "" && !s.disableDefaultHostname { + if hostname == "" && !s.defaultHostnameDisabled { metricSample.Host = s.defaultHostname } @@ -271,7 +271,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck Message: message, } - if hostname == "" && !s.disableDefaultHostname { + if hostname == "" && !s.defaultHostnameDisabled { serviceCheck.Host = s.defaultHostname } @@ -286,7 +286,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck func (s *checkSender) Event(e metrics.Event) { log.Trace("Event submitted: ", e.Title, " for hostname: ", e.Host, " tags: ", e.Tags) - if e.Host == "" && !s.disableDefaultHostname { + if e.Host == "" && !s.defaultHostnameDisabled { e.Host = s.defaultHostname } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 8ffd4ce3dc344c..7bd2ce02248ef2 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -131,7 +131,7 @@ func TestGetSenderDefaultHostname(t *testing.T) { require.True(t, ok) assert.Equal(t, "testhostname", checksender.defaultHostname) - assert.Equal(t, false, checksender.disableDefaultHostname) + assert.Equal(t, false, checksender.defaultHostnameDisabled) } @@ -211,29 +211,29 @@ func TestCheckSenderHostname(t *testing.T) { defaultHostname := "default-host" for nb, tc := range []struct { - disableDefaultHostname bool - submittedHostname string - expectedHostname string + defaultHostnameDisabled bool + submittedHostname string + expectedHostname string }{ { - disableDefaultHostname: false, - submittedHostname: "", - expectedHostname: defaultHostname, + defaultHostnameDisabled: false, + submittedHostname: "", + expectedHostname: defaultHostname, }, { - disableDefaultHostname: false, - submittedHostname: "custom", - expectedHostname: "custom", + defaultHostnameDisabled: false, + submittedHostname: "custom", + expectedHostname: "custom", }, { - disableDefaultHostname: true, - submittedHostname: "", - expectedHostname: "", + defaultHostnameDisabled: true, + submittedHostname: "", + expectedHostname: "", }, { - disableDefaultHostname: true, - submittedHostname: "custom", - expectedHostname: "custom", + defaultHostnameDisabled: true, + submittedHostname: "custom", + expectedHostname: "custom", }, } { t.Run(fmt.Sprintf("case %d: %q -> %q", nb, tc.submittedHostname, tc.expectedHostname), func(t *testing.T) { @@ -241,7 +241,7 @@ func TestCheckSenderHostname(t *testing.T) { serviceCheckChan := make(chan metrics.ServiceCheck, 10) eventChan := make(chan metrics.Event, 10) checkSender := newCheckSender(checkID1, defaultHostname, senderMetricSampleChan, serviceCheckChan, eventChan) - checkSender.DisableDefaultHostname(tc.disableDefaultHostname) + checkSender.DisableDefaultHostname(tc.defaultHostnameDisabled) checkSender.Gauge("my.metric", 1.0, tc.submittedHostname, []string{"foo", "bar"}) checkSender.Commit() From e795c7f24e595cbcecafe2c87385ad77acc991cf Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 27 Sep 2018 18:30:57 +0200 Subject: [PATCH 13/14] fix docker e2e --- test/e2e/scripts/run-instance/22-argo-submit.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/e2e/scripts/run-instance/22-argo-submit.sh b/test/e2e/scripts/run-instance/22-argo-submit.sh index 9127734dfff0e9..ed8122d1681ff6 100755 --- a/test/e2e/scripts/run-instance/22-argo-submit.sh +++ b/test/e2e/scripts/run-instance/22-argo-submit.sh @@ -78,6 +78,9 @@ spec: - name: datadog-config mountPath: /etc/datadog-agent/conf.d/network.d/conf.yaml.default subPath: network.yaml + - name: datadog-config + mountPath: /etc/datadog-agent/conf.d/docker.d/conf.yaml.default + subPath: docker.yaml - name: proc mountPath: /host/proc readOnly: true @@ -90,6 +93,7 @@ spec: - name: dogstatsd mountPath: /var/run/dogstatsd readOnly: false + volumes: - name: datadog-config configMap: From 5d5bda4b74e4103c9ba3ad0f5facb5996e038170 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Fri, 28 Sep 2018 11:59:15 +0200 Subject: [PATCH 14/14] empty lines --- pkg/aggregator/sender_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 7bd2ce02248ef2..6e8ef5e18362d6 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -117,7 +117,6 @@ func TestGetAndSetSender(t *testing.T) { sender, err := GetSender(checkID1) assert.Nil(t, err) assert.Equal(t, testCheckSender, sender) - } func TestGetSenderDefaultHostname(t *testing.T) { @@ -132,7 +131,6 @@ func TestGetSenderDefaultHostname(t *testing.T) { assert.Equal(t, "testhostname", checksender.defaultHostname) assert.Equal(t, false, checksender.defaultHostnameDisabled) - } func TestCheckSenderInterface(t *testing.T) {