Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

{cmd,statsd,writer}: fix race conditions in tests #372

Merged
merged 2 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: 2
jobs:
build:
working_directory: /go/src/github.com/DataDog/datadog-trace-agent
resource_class: large

docker:
- image: circleci/golang:1.8
Expand Down
5 changes: 0 additions & 5 deletions cmd/trace-agent/service_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@ func TestServiceMapper(t *testing.T) {
mapper.Start()
defer mapper.Stop()

// Let's ensure we have a proper context
assert.Len(mapper.cache, 0)

input := model.ServicesMetadata{"service-a": {"app_type": "type-a"}}
in <- input
output := <-out

// When the service is ingested for the first time, we simply propagate it
// to the output channel and add an entry to the cache map
assert.Equal(input, output)
assert.Len(mapper.cache, 1)

// This entry will result in a cache-hit and therefore will be filtered out
in <- model.ServicesMetadata{"service-a": {"app_type": "SOMETHING_DIFFERENT"}}
Expand All @@ -35,7 +31,6 @@ func TestServiceMapper(t *testing.T) {
output = <-out

assert.Equal(newService, output)
assert.Len(mapper.cache, 2)
}

func TestCachePolicy(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion gorake.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def go_vet(path)
end

def go_test(path, opts = {})
cmd = 'go test'
cmd = 'go test -race'
filter = ''
if opts[:coverage_file]
cmd += " -coverprofile=#{opts[:coverage_file]} -coverpkg=./..."
Expand Down
16 changes: 10 additions & 6 deletions statsd/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ type GaugeSummary struct {

// TestStatsClient is a mocked StatsClient that records all calls and replies with configurable error return values.
type TestStatsClient struct {
GaugeErr error
GaugeCalls []StatsClientGaugeArgs
gaugeLock sync.Mutex
CountErr error
CountCalls []StatsClientCountArgs
countLock sync.Mutex
gaugeLock sync.Mutex
GaugeErr error
GaugeCalls []StatsClientGaugeArgs

countLock sync.RWMutex
CountErr error
CountCalls []StatsClientCountArgs

HistogramErr error
HistogramCalls []StatsClientHistogramArgs
histogramLock sync.Mutex
Expand Down Expand Up @@ -83,6 +85,8 @@ func (c *TestStatsClient) Histogram(name string, value float64, tags []string, r
func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary {
result := map[string]*CountSummary{}

c.countLock.RLock()
defer c.countLock.RUnlock()
for _, countCall := range c.CountCalls {
name := countCall.Name
summary, ok := result[name]
Expand Down
45 changes: 37 additions & 8 deletions writer/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,50 @@ type PayloadConstructedHandlerArgs struct {
// TestEndpoint represents a mocked endpoint that replies with a configurable error and records successful and failed
// payloads.
type TestEndpoint struct {
Err error
SuccessPayloads []Payload
ErrorPayloads []Payload
sync.RWMutex
err error
successPayloads []Payload
errorPayloads []Payload
}

// Write mocks the writing of a payload to a remote endpoint, recording it and replying with the configured error (or
// success in its absence).
func (e *TestEndpoint) Write(payload *Payload) error {
if e.Err != nil {
e.ErrorPayloads = append(e.ErrorPayloads, *payload)
e.Lock()
defer e.Unlock()
if e.err != nil {
e.errorPayloads = append(e.errorPayloads, *payload)
} else {
e.SuccessPayloads = append(e.SuccessPayloads, *payload)
e.successPayloads = append(e.successPayloads, *payload)
}
return e.err
}

func (e *TestEndpoint) Error() error {
e.RLock()
defer e.RUnlock()
return e.err
}

// ErrorPayloads returns all the error payloads registered with the test endpoint.
func (e *TestEndpoint) ErrorPayloads() []Payload {
e.RLock()
defer e.RUnlock()
return e.errorPayloads
}

// SuccessPayloads returns all the success payloads registered with the test endpoint.
func (e *TestEndpoint) SuccessPayloads() []Payload {
e.RLock()
defer e.RUnlock()
return e.successPayloads
}

return e.Err
// SetError sets the passed error on the endpoint.
func (e *TestEndpoint) SetError(err error) {
e.Lock()
defer e.Unlock()
e.err = err
}

func (e *TestEndpoint) String() string {
Expand Down Expand Up @@ -92,7 +121,7 @@ func (c *TestPayloadSender) Run() {

// Payloads allows access to all payloads recorded as being successfully sent by this sender.
func (c *TestPayloadSender) Payloads() []Payload {
return c.testEndpoint.SuccessPayloads
return c.testEndpoint.SuccessPayloads()
}

// Endpoint allows access to the underlying TestEndpoint.
Expand Down
40 changes: 20 additions & 20 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestQueuablePayloadSender_WorkingEndpoint(t *testing.T) {
errorPayloads := monitor.FailurePayloads()
assert.Equal([]Payload{*payload1, *payload2, *payload3, *payload4, *payload5}, successPayloads,
"Expect all sent payloads to have been successful")
assert.Equal(successPayloads, workingEndpoint.SuccessPayloads, "Expect sender and endpoint to match on successful payloads")
assert.Equal(successPayloads, workingEndpoint.SuccessPayloads(), "Expect sender and endpoint to match on successful payloads")
assert.Len(errorPayloads, 0, "No payloads should have errored out on send")
assert.Len(workingEndpoint.ErrorPayloads, 0, "No payloads should have errored out on send")
assert.Len(workingEndpoint.ErrorPayloads(), 0, "No payloads should have errored out on send")
}

func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads")

// With a failing endpoint with a retriable error
flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})
// We send some payloads
payload3 := RandomPayload()
queuableSender.Send(payload3)
Expand All @@ -112,7 +112,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
assert.Equal(2, queuableSender.NumQueuedPayloads(), "Expect 2 queued payloads")

// With the previously failing endpoint working again
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)
// We retry for the third time
testBackoffTimer.TriggerTick()

Expand All @@ -122,7 +122,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads")

// Finally, with a failing endpoint with a non-retriable error
flakyEndpoint.Err = fmt.Errorf("non retriable bleh")
flakyEndpoint.SetError(fmt.Errorf("non retriable bleh"))
// We send some payloads
payload5 := RandomPayload()
queuableSender.Send(payload5)
Expand All @@ -135,7 +135,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads")

// With the previously failing endpoint working again
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)
// We retry just in case there's something in the queue
testBackoffTimer.TriggerTick()

Expand All @@ -151,7 +151,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) {
retryPayloads := monitor.RetryPayloads()
assert.Equal([]Payload{*payload1, *payload2, *payload3, *payload4}, successPayloads,
"Expect all sent payloads to have been successful")
assert.Equal(successPayloads, flakyEndpoint.SuccessPayloads, "Expect sender and endpoint to match on successful payloads")
assert.Equal(successPayloads, flakyEndpoint.SuccessPayloads(), "Expect sender and endpoint to match on successful payloads")
// Expect 3 retry events for payload 3 (one because of first send, two others because of the two retries)
assert.Equal([]Payload{*payload3, *payload3, *payload3}, retryPayloads, "Expect payload 3 to have been retries 3 times")
// We expect payloads 5 and 6 to appear in error payloads as they failed for non-retriable errors.
Expand All @@ -163,7 +163,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) {

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &TestEndpoint{}
flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a test backoff timer that can be triggered on-demand
testBackoffTimer := fixtures.NewTestBackoffTimer()
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) {
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)

// And we trigger a retry
testBackoffTimer.TriggerTick()
Expand All @@ -215,7 +215,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) {

// Then endpoint should have received only payload3. Other should have been discarded because max queued payloads
// is 1
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads, "Endpoint should have received only payload 3")
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")

// Monitor should agree on previous fact
assert.Equal([]Payload{*payload3}, monitor.SuccessPayloads(),
Expand All @@ -233,7 +233,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) {

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &TestEndpoint{}
flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a test backoff timer that can be triggered on-demand
testBackoffTimer := fixtures.NewTestBackoffTimer()
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) {
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)

// And we trigger a retry
testBackoffTimer.TriggerTick()
Expand All @@ -285,7 +285,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) {

// Then endpoint should have received payload2 and payload3. Payload1 should have been discarded because keeping all
// 3 would have put us over the max size of sender
assert.Equal([]Payload{*payload2, *payload3}, flakyEndpoint.SuccessPayloads,
assert.Equal([]Payload{*payload2, *payload3}, flakyEndpoint.SuccessPayloads(),
"Endpoint should have received only payload 2 and 3 (in that order)")

// Monitor should agree on previous fact
Expand All @@ -302,7 +302,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) {

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &TestEndpoint{}
flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a test backoff timer that can be triggered on-demand
testBackoffTimer := fixtures.NewTestBackoffTimer()
Expand All @@ -329,7 +329,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) {
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)

// And we trigger a retry
testBackoffTimer.TriggerTick()
Expand All @@ -345,7 +345,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) {
monitor.Stop()

// Then endpoint should have received no payloads because payload1 was too big to store in queue.
assert.Len(flakyEndpoint.SuccessPayloads, 0, "Endpoint should have received no payloads")
assert.Len(flakyEndpoint.SuccessPayloads(), 0, "Endpoint should have received no payloads")

// And monitor should have received failed event for payload1 with correct reason
assert.Equal([]Payload{*payload1}, monitor.FailurePayloads(),
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestQueuablePayloadSender_SendBigPayloadsIfNoRetry(t *testing.T) {
monitor.Stop()

// Then endpoint should have received payload1 because although it was big, it didn't get queued.
assert.Equal([]Payload{*payload1}, workingEndpoint.SuccessPayloads, "Endpoint should have received payload1")
assert.Equal([]Payload{*payload1}, workingEndpoint.SuccessPayloads(), "Endpoint should have received payload1")

// And monitor should have received success event for payload1
assert.Equal([]Payload{*payload1}, monitor.SuccessPayloads(),
Expand All @@ -404,7 +404,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &TestEndpoint{}
flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a test backoff timer that can be triggered on-demand
testBackoffTimer := fixtures.NewTestBackoffTimer()
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.Err = nil
flakyEndpoint.SetError(nil)

// And we trigger a retry
testBackoffTimer.TriggerTick()
Expand All @@ -460,7 +460,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads, "Endpoint should have received only payload 3")
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")

// And monitor should have received failed events for payload1 and payload2 with correct reason
assert.Equal([]Payload{*payload1, *payload2}, monitor.FailurePayloads(),
Expand Down
13 changes: 7 additions & 6 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
}

mergedMetadata := mergeMetadataInOrder(metadata1, metadata2)
successPayloads := testEndpoint.SuccessPayloads()

assert.Len(testEndpoint.SuccessPayloads, 2, "There should be 2 payloads")
assertMetadata(assert, expectedHeaders, metadata1, testEndpoint.SuccessPayloads[0])
assertMetadata(assert, expectedHeaders, mergedMetadata, testEndpoint.SuccessPayloads[1])
assert.Len(successPayloads, 2, "There should be 2 payloads")
assertMetadata(assert, expectedHeaders, metadata1, successPayloads[0])
assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[1])
}

func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
time.Sleep(2 * serviceWriter.conf.FlushPeriod)

// And then sending a third payload with other 3 traces with an errored out endpoint with no retry
testEndpoint.Err = fmt.Errorf("non retriable error")
testEndpoint.SetError(fmt.Errorf("non retriable error"))
expectedNumErrors++
metadata3 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata3
Expand All @@ -116,10 +117,10 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
time.Sleep(2 * serviceWriter.conf.FlushPeriod)

// And then sending a third payload with other 3 traces with an errored out endpoint with retry
testEndpoint.Err = &RetriableError{
testEndpoint.SetError(&RetriableError{
err: fmt.Errorf("retriable error"),
endpoint: testEndpoint,
}
})
expectedMinNumRetries++
metadata4 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata4
Expand Down
11 changes: 6 additions & 5 deletions writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ func TestStatsWriter_StatHandling(t *testing.T) {
close(statsChannel)
statsWriter.Stop()

payloads := testEndpoint.SuccessPayloads()

// Then the endpoint should have received 2 payloads, containing all stat buckets
assert.Len(testEndpoint.SuccessPayloads, 2, "There should be 2 payloads")
assert.Len(payloads, 2, "There should be 2 payloads")

payloads := testEndpoint.SuccessPayloads
payload1 := payloads[0]
payload2 := payloads[1]

Expand Down Expand Up @@ -104,7 +105,7 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) {
time.Sleep(2 * statsWriter.conf.UpdateInfoPeriod)

// And then sending a third payload with other 3 buckets with an errored out endpoint
testEndpoint.Err = fmt.Errorf("non retriable error")
testEndpoint.SetError(fmt.Errorf("non retriable error"))
expectedNumErrors++
payload3Buckets := []model.StatsBucket{
fixtures.RandomStatsBucket(5),
Expand All @@ -119,10 +120,10 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) {
time.Sleep(2 * statsWriter.conf.UpdateInfoPeriod)

// And then sending a third payload with other 3 traces with an errored out endpoint with retry
testEndpoint.Err = &RetriableError{
testEndpoint.SetError(&RetriableError{
err: fmt.Errorf("non retriable error"),
endpoint: testEndpoint,
}
})
expectedMinNumRetries++
payload4Buckets := []model.StatsBucket{
fixtures.RandomStatsBucket(5),
Expand Down
Loading