Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move default hostname logic from the aggregator to the sender and dsd #2334

Merged
merged 14 commits into from
Sep 28, 2018
21 changes: 6 additions & 15 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ func NewBufferedAggregator(s *serializer.Serializer, hostname string, flushInter
checkMetricIn: make(chan senderMetricSample, 100), // TODO make buffer size configurable
serviceCheckIn: make(chan metrics.ServiceCheck, 100), // TODO make buffer size configurable
eventIn: make(chan metrics.Event, 100), // TODO make buffer size configurable
sampler: *NewTimeSampler(bucketSize, hostname),
sampler: *NewTimeSampler(bucketSize),
checkSamplers: make(map[check.ID]*CheckSampler),
distSampler: newDistSampler(bucketSize, hostname),
distSampler: newDistSampler(bucketSize),
flushInterval: flushInterval,
serializer: s,
hostname: hostname,
Expand Down Expand Up @@ -227,6 +227,7 @@ func (agg *BufferedAggregator) AddAgentStartupEvent(agentVersion string) {
event := metrics.Event{
Text: fmt.Sprintf("Version %s", agentVersion),
SourceTypeName: "System",
Host: agg.hostname,
EventType: "Agent Startup",
}
agg.eventIn <- event
Expand All @@ -238,7 +239,7 @@ func (agg *BufferedAggregator) registerSender(id check.ID) error {
if _, ok := agg.checkSamplers[id]; ok {
return fmt.Errorf("Sender with ID '%s' has already been registered, will use existing sampler", id)
}
agg.checkSamplers[id] = newCheckSampler(agg.hostname)
agg.checkSamplers[id] = newCheckSampler()
return nil
}

Expand Down Expand Up @@ -266,9 +267,6 @@ func (agg *BufferedAggregator) handleSenderSample(ss senderMetricSample) {

// addServiceCheck adds the service check to the slice of current service checks
func (agg *BufferedAggregator) addServiceCheck(sc metrics.ServiceCheck) {
if sc.Host == "" {
sc.Host = agg.hostname
}
if sc.Ts == 0 {
sc.Ts = time.Now().Unix()
}
Expand All @@ -279,9 +277,6 @@ func (agg *BufferedAggregator) addServiceCheck(sc metrics.ServiceCheck) {

// addEvent adds the event to the slice of current events
func (agg *BufferedAggregator) addEvent(e metrics.Event) {
if e.Host == "" {
e.Host = agg.hostname
}
if e.Ts == 0 {
e.Ts = time.Now().Unix()
}
Expand Down Expand Up @@ -375,6 +370,7 @@ func (agg *BufferedAggregator) flushServiceChecks() {
agg.addServiceCheck(metrics.ServiceCheck{
CheckName: "datadog.agent.up",
Status: metrics.ServiceCheckOK,
Host: agg.hostname,
})

serviceChecks := agg.GetServiceChecks()
Expand Down Expand Up @@ -504,12 +500,7 @@ func (agg *BufferedAggregator) run() {
case h := <-agg.hostnameUpdate:
aggregatorHostnameUpdate.Add(1)
agg.hostname = h
agg.mu.Lock()
for _, checkSampler := range agg.checkSamplers {
checkSampler.defaultHostname = h
}
agg.sampler.defaultHostname = h
agg.mu.Unlock()
changeAllSendersDefaultHostname(h)
agg.hostnameUpdateDone <- struct{}{}
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) {
})

require.Len(t, agg.serviceChecks, 2)
assert.Equal(t, "resolved-hostname", agg.serviceChecks[0].Host)
assert.Equal(t, "", agg.serviceChecks[0].Host)
assert.Equal(t, []string{"bar", "foo"}, agg.serviceChecks[0].Tags)
assert.NotZero(t, agg.serviceChecks[0].Ts) // should be set to the current time, let's just check that it's not 0
assert.Equal(t, "my-hostname", agg.serviceChecks[1].Host)
Expand Down Expand Up @@ -105,10 +105,10 @@ func TestAddEventDefaultValues(t *testing.T) {
})

require.Len(t, agg.events, 2)
// Default values are set on Host and Ts only
// Default values are set on Ts
event1 := agg.events[0]
assert.Equal(t, "An event occurred", event1.Title)
assert.Equal(t, "resolved-hostname", event1.Host)
assert.Equal(t, "", event1.Host)
assert.NotZero(t, event1.Ts) // should be set to the current time, let's just check that it's not 0
assert.Zero(t, event1.Priority)
assert.Zero(t, event1.Tags)
Expand All @@ -132,6 +132,13 @@ func TestSetHostname(t *testing.T) {
resetAggregator()
agg := InitAggregator(nil, "hostname")
assert.Equal(t, "hostname", agg.hostname)
sender, err := GetSender(checkID1)
require.NoError(t, err)
checkSender, ok := sender.(*checkSender)
require.True(t, ok)
assert.Equal(t, "hostname", checkSender.defaultHostname)

agg.SetHostname("different-hostname")
assert.Equal(t, "different-hostname", agg.hostname)
assert.Equal(t, "different-hostname", checkSender.defaultHostname)
}
10 changes: 2 additions & 8 deletions pkg/aggregator/check_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ type CheckSampler struct {
series []*metrics.Serie
contextResolver *ContextResolver
metrics metrics.ContextMetrics
defaultHostname string
}

// newCheckSampler returns a newly initialized CheckSampler
func newCheckSampler(hostname string) *CheckSampler {
func newCheckSampler() *CheckSampler {
return &CheckSampler{
series: make([]*metrics.Serie, 0),
contextResolver: newContextResolver(),
metrics: metrics.MakeContextMetrics(),
defaultHostname: hostname,
}
}

Expand Down Expand Up @@ -58,12 +56,8 @@ func (cs *CheckSampler) commit(timestamp float64) {
}
serie.Name = context.Name + serie.NameSuffix
serie.Tags = context.Tags
serie.Host = context.Host
serie.SourceTypeName = checksSourceTypeName // this source type is required for metrics coming from the checks
if context.Host != "" {
serie.Host = context.Host
} else {
serie.Host = cs.defaultHostname
}

cs.series = append(cs.series, serie)
}
Expand Down
38 changes: 3 additions & 35 deletions pkg/aggregator/check_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestCheckGaugeSampling(t *testing.T) {
checkSampler := newCheckSampler("")
checkSampler := newCheckSampler()

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestCheckGaugeSampling(t *testing.T) {
}

func TestCheckRateSampling(t *testing.T) {
checkSampler := newCheckSampler("")
checkSampler := newCheckSampler()

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestCheckRateSampling(t *testing.T) {
}

func TestHistogramIntervalSampling(t *testing.T) {
checkSampler := newCheckSampler("")
checkSampler := newCheckSampler()

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -190,35 +190,3 @@ func TestHistogramIntervalSampling(t *testing.T) {

assert.True(t, foundCount)
}

func TestCheckSamplerHostname(t *testing.T) {
checkSampler := newCheckSampler("my.test.hostname")

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Value: 1,
Mtype: metrics.GaugeType,
Tags: []string{"foo", "bar"},
SampleRate: 1,
Timestamp: 12345.0,
}
mSample2 := metrics.MetricSample{
Name: "my.metric.name",
Value: 1,
Mtype: metrics.GaugeType,
Tags: []string{"foo", "bar"},
Host: "metric-hostname",
SampleRate: 1,
Timestamp: 12345,
}

checkSampler.addSample(&mSample1)
checkSampler.addSample(&mSample2)
checkSampler.commit(12346.0)
series := checkSampler.flush()

require.Len(t, series, 2)
actualHostnames := []string{series[0].Host, series[1].Host}
assert.Contains(t, actualHostnames, "my.test.hostname")
assert.Contains(t, actualHostnames, "metric-hostname")
}
16 changes: 5 additions & 11 deletions pkg/aggregator/dist_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@ import (
)

type distSampler struct {
interval int64
defaultHostname string
interval int64

m sketchMap
ctxResolver *ContextResolver
}

func newDistSampler(interval int64, defaultHostname string) distSampler {
func newDistSampler(interval int64) distSampler {
if interval == 0 {
interval = bucketSize
}

return distSampler{
interval: interval,
defaultHostname: defaultHostname,
m: make(sketchMap),
ctxResolver: newContextResolver(),
interval: interval,
m: make(sketchMap),
ctxResolver: newContextResolver(),
}
}

Expand Down Expand Up @@ -75,10 +73,6 @@ func (d *distSampler) newSeries(ck ckey.ContextKey, points []metrics.SketchPoint
ContextKey: ck,
}

if ss.Host == "" {
ss.Host = d.defaultHostname
}

return ss
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/aggregator/dist_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/quantile"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDistSampler(t *testing.T) {
const (
defaultHost = "default_host"
defaultBucketSize = 10
)

var (
d = newDistSampler(0, defaultHost)
d = newDistSampler(0)

insert = func(t *testing.T, ts float64, ctx Context, values ...float64) {
t.Helper()
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestDistSampler(t *testing.T) {

func TestDistSamplerBucketSampling(t *testing.T) {

distSampler := newDistSampler(10, "")
distSampler := newDistSampler(10)

mSample1 := metrics.MetricSample{
Name: "test.metric.name",
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestDistSamplerBucketSampling(t *testing.T) {
}

func TestDistSamplerContextSampling(t *testing.T) {
distSampler := newDistSampler(10, "")
distSampler := newDistSampler(10)

mSample1 := metrics.MetricSample{
Name: "test.metric.name1",
Expand Down
5 changes: 5 additions & 0 deletions pkg/aggregator/mocksender/mocked_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (m *MockSender) ServiceCheck(checkName string, status metrics.ServiceCheckS
m.Called(checkName, status, hostname, tags, message)
}

//DisableDefaultHostname enables the hostname mock call.
func (m *MockSender) DisableDefaultHostname(d bool) {
m.Called(d)
}

//Event enables the event mock call.
func (m *MockSender) Event(e metrics.Event) {
m.Called(e)
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/mocksender/mocksender.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *MockSender) SetupAcceptAll() {
).Return()
m.On("Event", mock.AnythingOfType("metrics.Event")).Return()
m.On("GetMetricStats", mock.AnythingOfType("map[string]int64")).Return()

m.On("DisableDefaultHostname", mock.AnythingOfType("bool")).Return()
m.On("Commit").Return()
}

Expand Down
Loading