diff --git a/.circleci/config.yml b/.circleci/config.yml index 26f01a13b..b4bbc7808 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,6 +2,7 @@ version: 2 jobs: build: working_directory: /go/src/github.com/DataDog/datadog-trace-agent + resource_class: large docker: - image: circleci/golang:1.8 diff --git a/cmd/trace-agent/service_mapper_test.go b/cmd/trace-agent/service_mapper_test.go index 222076ab8..8701c04b0 100644 --- a/cmd/trace-agent/service_mapper_test.go +++ b/cmd/trace-agent/service_mapper_test.go @@ -14,9 +14,6 @@ func TestServiceMapper(t *testing.T) { mapper.Start() defer mapper.Stop() - // Let's ensure we have a proper context - assert.Len(mapper.cache, 0) - input := model.ServicesMetadata{"service-a": {"app_type": "type-a"}} in <- input output := <-out @@ -24,7 +21,6 @@ func TestServiceMapper(t *testing.T) { // When the service is ingested for the first time, we simply propagate it // to the output channel and add an entry to the cache map assert.Equal(input, output) - assert.Len(mapper.cache, 1) // This entry will result in a cache-hit and therefore will be filtered out in <- model.ServicesMetadata{"service-a": {"app_type": "SOMETHING_DIFFERENT"}} @@ -35,7 +31,6 @@ func TestServiceMapper(t *testing.T) { output = <-out assert.Equal(newService, output) - assert.Len(mapper.cache, 2) } func TestCachePolicy(t *testing.T) { diff --git a/gorake.rb b/gorake.rb index 67ea040ed..f7ed37bb6 100644 --- a/gorake.rb +++ b/gorake.rb @@ -65,7 +65,7 @@ def go_vet(path) end def go_test(path, opts = {}) - cmd = 'go test' + cmd = 'go test -race' filter = '' if opts[:coverage_file] cmd += " -coverprofile=#{opts[:coverage_file]} -coverpkg=./..." diff --git a/statsd/fixtures.go b/statsd/fixtures.go index 5f6531402..5e4170a4c 100644 --- a/statsd/fixtures.go +++ b/statsd/fixtures.go @@ -44,12 +44,14 @@ type GaugeSummary struct { // TestStatsClient is a mocked StatsClient that records all calls and replies with configurable error return values. type TestStatsClient struct { - GaugeErr error - GaugeCalls []StatsClientGaugeArgs - gaugeLock sync.Mutex - CountErr error - CountCalls []StatsClientCountArgs - countLock sync.Mutex + gaugeLock sync.Mutex + GaugeErr error + GaugeCalls []StatsClientGaugeArgs + + countLock sync.RWMutex + CountErr error + CountCalls []StatsClientCountArgs + HistogramErr error HistogramCalls []StatsClientHistogramArgs histogramLock sync.Mutex @@ -83,6 +85,8 @@ func (c *TestStatsClient) Histogram(name string, value float64, tags []string, r func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary { result := map[string]*CountSummary{} + c.countLock.RLock() + defer c.countLock.RUnlock() for _, countCall := range c.CountCalls { name := countCall.Name summary, ok := result[name] diff --git a/writer/fixtures.go b/writer/fixtures.go index b5db925fb..16334a648 100644 --- a/writer/fixtures.go +++ b/writer/fixtures.go @@ -18,21 +18,50 @@ type PayloadConstructedHandlerArgs struct { // TestEndpoint represents a mocked endpoint that replies with a configurable error and records successful and failed // payloads. type TestEndpoint struct { - Err error - SuccessPayloads []Payload - ErrorPayloads []Payload + sync.RWMutex + err error + successPayloads []Payload + errorPayloads []Payload } // Write mocks the writing of a payload to a remote endpoint, recording it and replying with the configured error (or // success in its absence). func (e *TestEndpoint) Write(payload *Payload) error { - if e.Err != nil { - e.ErrorPayloads = append(e.ErrorPayloads, *payload) + e.Lock() + defer e.Unlock() + if e.err != nil { + e.errorPayloads = append(e.errorPayloads, *payload) } else { - e.SuccessPayloads = append(e.SuccessPayloads, *payload) + e.successPayloads = append(e.successPayloads, *payload) } + return e.err +} + +func (e *TestEndpoint) Error() error { + e.RLock() + defer e.RUnlock() + return e.err +} + +// ErrorPayloads returns all the error payloads registered with the test endpoint. +func (e *TestEndpoint) ErrorPayloads() []Payload { + e.RLock() + defer e.RUnlock() + return e.errorPayloads +} + +// SuccessPayloads returns all the success payloads registered with the test endpoint. +func (e *TestEndpoint) SuccessPayloads() []Payload { + e.RLock() + defer e.RUnlock() + return e.successPayloads +} - return e.Err +// SetError sets the passed error on the endpoint. +func (e *TestEndpoint) SetError(err error) { + e.Lock() + defer e.Unlock() + e.err = err } func (e *TestEndpoint) String() string { @@ -92,7 +121,7 @@ func (c *TestPayloadSender) Run() { // Payloads allows access to all payloads recorded as being successfully sent by this sender. func (c *TestPayloadSender) Payloads() []Payload { - return c.testEndpoint.SuccessPayloads + return c.testEndpoint.SuccessPayloads() } // Endpoint allows access to the underlying TestEndpoint. diff --git a/writer/payload_test.go b/writer/payload_test.go index a4cdd011d..e59ff4cef 100644 --- a/writer/payload_test.go +++ b/writer/payload_test.go @@ -55,9 +55,9 @@ func TestQueuablePayloadSender_WorkingEndpoint(t *testing.T) { errorPayloads := monitor.FailurePayloads() assert.Equal([]Payload{*payload1, *payload2, *payload3, *payload4, *payload5}, successPayloads, "Expect all sent payloads to have been successful") - assert.Equal(successPayloads, workingEndpoint.SuccessPayloads, "Expect sender and endpoint to match on successful payloads") + assert.Equal(successPayloads, workingEndpoint.SuccessPayloads(), "Expect sender and endpoint to match on successful payloads") assert.Len(errorPayloads, 0, "No payloads should have errored out on send") - assert.Len(workingEndpoint.ErrorPayloads, 0, "No payloads should have errored out on send") + assert.Len(workingEndpoint.ErrorPayloads(), 0, "No payloads should have errored out on send") } func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { @@ -95,7 +95,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads") // With a failing endpoint with a retriable error - flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint} + flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}) // We send some payloads payload3 := RandomPayload() queuableSender.Send(payload3) @@ -112,7 +112,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { assert.Equal(2, queuableSender.NumQueuedPayloads(), "Expect 2 queued payloads") // With the previously failing endpoint working again - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // We retry for the third time testBackoffTimer.TriggerTick() @@ -122,7 +122,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads") // Finally, with a failing endpoint with a non-retriable error - flakyEndpoint.Err = fmt.Errorf("non retriable bleh") + flakyEndpoint.SetError(fmt.Errorf("non retriable bleh")) // We send some payloads payload5 := RandomPayload() queuableSender.Send(payload5) @@ -135,7 +135,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { assert.Equal(0, queuableSender.NumQueuedPayloads(), "Expect no queued payloads") // With the previously failing endpoint working again - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // We retry just in case there's something in the queue testBackoffTimer.TriggerTick() @@ -151,7 +151,7 @@ func TestQueuablePayloadSender_FlakyEndpoint(t *testing.T) { retryPayloads := monitor.RetryPayloads() assert.Equal([]Payload{*payload1, *payload2, *payload3, *payload4}, successPayloads, "Expect all sent payloads to have been successful") - assert.Equal(successPayloads, flakyEndpoint.SuccessPayloads, "Expect sender and endpoint to match on successful payloads") + assert.Equal(successPayloads, flakyEndpoint.SuccessPayloads(), "Expect sender and endpoint to match on successful payloads") // Expect 3 retry events for payload 3 (one because of first send, two others because of the two retries) assert.Equal([]Payload{*payload3, *payload3, *payload3}, retryPayloads, "Expect payload 3 to have been retries 3 times") // We expect payloads 5 and 6 to appear in error payloads as they failed for non-retriable errors. @@ -163,7 +163,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) { // Given an endpoint that continuously throws out retriable errors flakyEndpoint := &TestEndpoint{} - flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint} + flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}) // And a test backoff timer that can be triggered on-demand testBackoffTimer := fixtures.NewTestBackoffTimer() @@ -198,7 +198,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) { syncBarrier <- nil // Then, when the endpoint finally works - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // And we trigger a retry testBackoffTimer.TriggerTick() @@ -215,7 +215,7 @@ func TestQueuablePayloadSender_MaxQueuedPayloads(t *testing.T) { // Then endpoint should have received only payload3. Other should have been discarded because max queued payloads // is 1 - assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads, "Endpoint should have received only payload 3") + assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3") // Monitor should agree on previous fact assert.Equal([]Payload{*payload3}, monitor.SuccessPayloads(), @@ -233,7 +233,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) { // Given an endpoint that continuously throws out retriable errors flakyEndpoint := &TestEndpoint{} - flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint} + flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}) // And a test backoff timer that can be triggered on-demand testBackoffTimer := fixtures.NewTestBackoffTimer() @@ -268,7 +268,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) { syncBarrier <- nil // Then, when the endpoint finally works - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // And we trigger a retry testBackoffTimer.TriggerTick() @@ -285,7 +285,7 @@ func TestQueuablePayloadSender_MaxQueuedBytes(t *testing.T) { // Then endpoint should have received payload2 and payload3. Payload1 should have been discarded because keeping all // 3 would have put us over the max size of sender - assert.Equal([]Payload{*payload2, *payload3}, flakyEndpoint.SuccessPayloads, + assert.Equal([]Payload{*payload2, *payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 2 and 3 (in that order)") // Monitor should agree on previous fact @@ -302,7 +302,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) { // Given an endpoint that continuously throws out retriable errors flakyEndpoint := &TestEndpoint{} - flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint} + flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}) // And a test backoff timer that can be triggered on-demand testBackoffTimer := fixtures.NewTestBackoffTimer() @@ -329,7 +329,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) { syncBarrier <- nil // Then, when the endpoint finally works - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // And we trigger a retry testBackoffTimer.TriggerTick() @@ -345,7 +345,7 @@ func TestQueuablePayloadSender_DropBigPayloadsOnRetry(t *testing.T) { monitor.Stop() // Then endpoint should have received no payloads because payload1 was too big to store in queue. - assert.Len(flakyEndpoint.SuccessPayloads, 0, "Endpoint should have received no payloads") + assert.Len(flakyEndpoint.SuccessPayloads(), 0, "Endpoint should have received no payloads") // And monitor should have received failed event for payload1 with correct reason assert.Equal([]Payload{*payload1}, monitor.FailurePayloads(), @@ -392,7 +392,7 @@ func TestQueuablePayloadSender_SendBigPayloadsIfNoRetry(t *testing.T) { monitor.Stop() // Then endpoint should have received payload1 because although it was big, it didn't get queued. - assert.Equal([]Payload{*payload1}, workingEndpoint.SuccessPayloads, "Endpoint should have received payload1") + assert.Equal([]Payload{*payload1}, workingEndpoint.SuccessPayloads(), "Endpoint should have received payload1") // And monitor should have received success event for payload1 assert.Equal([]Payload{*payload1}, monitor.SuccessPayloads(), @@ -404,7 +404,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) { // Given an endpoint that continuously throws out retriable errors flakyEndpoint := &TestEndpoint{} - flakyEndpoint.Err = &RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint} + flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint}) // And a test backoff timer that can be triggered on-demand testBackoffTimer := fixtures.NewTestBackoffTimer() @@ -443,7 +443,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) { syncBarrier <- nil // Then, when the endpoint finally works - flakyEndpoint.Err = nil + flakyEndpoint.SetError(nil) // And we trigger a retry testBackoffTimer.TriggerTick() @@ -460,7 +460,7 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) { // Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed // retry (first TriggerTick). - assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads, "Endpoint should have received only payload 3") + assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3") // And monitor should have received failed events for payload1 and payload2 with correct reason assert.Equal([]Payload{*payload1, *payload2}, monitor.FailurePayloads(), diff --git a/writer/service_writer_test.go b/writer/service_writer_test.go index f394d50be..bc1a29c7d 100644 --- a/writer/service_writer_test.go +++ b/writer/service_writer_test.go @@ -62,10 +62,11 @@ func TestServiceWriter_ServiceHandling(t *testing.T) { } mergedMetadata := mergeMetadataInOrder(metadata1, metadata2) + successPayloads := testEndpoint.SuccessPayloads() - assert.Len(testEndpoint.SuccessPayloads, 2, "There should be 2 payloads") - assertMetadata(assert, expectedHeaders, metadata1, testEndpoint.SuccessPayloads[0]) - assertMetadata(assert, expectedHeaders, mergedMetadata, testEndpoint.SuccessPayloads[1]) + assert.Len(successPayloads, 2, "There should be 2 payloads") + assertMetadata(assert, expectedHeaders, metadata1, successPayloads[0]) + assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[1]) } func TestServiceWriter_UpdateInfoHandling(t *testing.T) { @@ -105,7 +106,7 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * serviceWriter.conf.FlushPeriod) // And then sending a third payload with other 3 traces with an errored out endpoint with no retry - testEndpoint.Err = fmt.Errorf("non retriable error") + testEndpoint.SetError(fmt.Errorf("non retriable error")) expectedNumErrors++ metadata3 := fixtures.RandomServices(10, 10) serviceChannel <- metadata3 @@ -116,10 +117,10 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * serviceWriter.conf.FlushPeriod) // And then sending a third payload with other 3 traces with an errored out endpoint with retry - testEndpoint.Err = &RetriableError{ + testEndpoint.SetError(&RetriableError{ err: fmt.Errorf("retriable error"), endpoint: testEndpoint, - } + }) expectedMinNumRetries++ metadata4 := fixtures.RandomServices(10, 10) serviceChannel <- metadata4 diff --git a/writer/stats_writer_test.go b/writer/stats_writer_test.go index f98484157..cd7ef4e9c 100644 --- a/writer/stats_writer_test.go +++ b/writer/stats_writer_test.go @@ -46,10 +46,11 @@ func TestStatsWriter_StatHandling(t *testing.T) { close(statsChannel) statsWriter.Stop() + payloads := testEndpoint.SuccessPayloads() + // Then the endpoint should have received 2 payloads, containing all stat buckets - assert.Len(testEndpoint.SuccessPayloads, 2, "There should be 2 payloads") + assert.Len(payloads, 2, "There should be 2 payloads") - payloads := testEndpoint.SuccessPayloads payload1 := payloads[0] payload2 := payloads[1] @@ -104,7 +105,7 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * statsWriter.conf.UpdateInfoPeriod) // And then sending a third payload with other 3 buckets with an errored out endpoint - testEndpoint.Err = fmt.Errorf("non retriable error") + testEndpoint.SetError(fmt.Errorf("non retriable error")) expectedNumErrors++ payload3Buckets := []model.StatsBucket{ fixtures.RandomStatsBucket(5), @@ -119,10 +120,10 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * statsWriter.conf.UpdateInfoPeriod) // And then sending a third payload with other 3 traces with an errored out endpoint with retry - testEndpoint.Err = &RetriableError{ + testEndpoint.SetError(&RetriableError{ err: fmt.Errorf("non retriable error"), endpoint: testEndpoint, - } + }) expectedMinNumRetries++ payload4Buckets := []model.StatsBucket{ fixtures.RandomStatsBucket(5), diff --git a/writer/trace_writer_test.go b/writer/trace_writer_test.go index 0c00afccc..ac7b1b298 100644 --- a/writer/trace_writer_test.go +++ b/writer/trace_writer_test.go @@ -66,8 +66,8 @@ func TestTraceWriter_TraceHandling(t *testing.T) { "Content-Encoding": "gzip", } - assert.Len(testEndpoint.SuccessPayloads, 2, "There should be 2 payloads") - assertPayloads(assert, traceWriter, expectedHeaders, testTraces, testEndpoint.SuccessPayloads) + assert.Len(testEndpoint.SuccessPayloads(), 2, "There should be 2 payloads") + assertPayloads(assert, traceWriter, expectedHeaders, testTraces, testEndpoint.SuccessPayloads()) } func TestTraceWriter_BigTraceHandling(t *testing.T) { @@ -114,8 +114,8 @@ func TestTraceWriter_BigTraceHandling(t *testing.T) { } expectedNumPayloads := int(math.Ceil(float64(numSpans) / float64(traceWriter.conf.MaxSpansPerPayload))) - assert.Len(testEndpoint.SuccessPayloads, expectedNumPayloads, "There should be more than 1 payload") - assertPayloads(assert, traceWriter, expectedHeaders, testTraces, testEndpoint.SuccessPayloads) + assert.Len(testEndpoint.SuccessPayloads(), expectedNumPayloads, "There should be more than 1 payload") + assertPayloads(assert, traceWriter, expectedHeaders, testTraces, testEndpoint.SuccessPayloads()) } func TestTraceWriter_UpdateInfoHandling(t *testing.T) { @@ -172,7 +172,7 @@ func TestTraceWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * traceWriter.conf.FlushPeriod) // And then sending a third payload with other 3 traces with an errored out endpoint - testEndpoint.Err = fmt.Errorf("non retriable error") + testEndpoint.SetError(fmt.Errorf("non retriable error")) expectedNumErrors++ payload3Traces := []model.Trace{ fixtures.RandomTrace(3, 1), @@ -191,10 +191,10 @@ func TestTraceWriter_UpdateInfoHandling(t *testing.T) { time.Sleep(2 * traceWriter.conf.FlushPeriod) // And then sending a fourth payload with other 3 traces with an errored out endpoint but retriable - testEndpoint.Err = &RetriableError{ + testEndpoint.SetError(&RetriableError{ err: fmt.Errorf("non retriable error"), endpoint: testEndpoint, - } + }) expectedMinNumRetries++ payload4Traces := []model.Trace{ fixtures.RandomTrace(3, 1),