diff --git a/internal/newtelemetry/client_test.go b/internal/newtelemetry/client_test.go index f03a5c7d4f..7335d07803 100644 --- a/internal/newtelemetry/client_test.go +++ b/internal/newtelemetry/client_test.go @@ -6,7 +6,10 @@ package newtelemetry import ( + "encoding/json" "errors" + "net/http" + "runtime" "testing" "time" @@ -17,6 +20,8 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport" "gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types" + "gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" ) func TestNewClient(t *testing.T) { @@ -384,3 +389,142 @@ func TestClientFlush(t *testing.T) { }) } } + +type testRoundTripper struct { + roundTrip func(*http.Request) (*http.Response, error) +} + +func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return t.roundTrip(req) +} + +func TestClientEnd2End(t *testing.T) { + tracerConfig := internal.TracerConfig{ + Service: "test-service", + Env: "test-env", + Version: "1.0.0", + } + clientConfig := ClientConfig{ + AgentURL: "http://localhost:8126", + HTTPClient: &http.Client{ + Timeout: 5 * time.Second, + }, + Debug: true, + } + + parseRequest := func(t *testing.T, request *http.Request) transport.Body { + assert.Equal(t, "v2", request.Header.Get("DD-Telemetry-API-Version")) + assert.Equal(t, "application/json", request.Header.Get("Content-Type")) + assert.Equal(t, "go", request.Header.Get("DD-Client-Library-Language")) + assert.Equal(t, "test-env", request.Header.Get("DD-Agent-Env")) + assert.Equal(t, version.Tag, request.Header.Get("DD-Client-Library-Version")) + assert.Equal(t, globalconfig.InstrumentationInstallID(), request.Header.Get("DD-Agent-Install-Id")) + assert.Equal(t, globalconfig.InstrumentationInstallType(), request.Header.Get("DD-Agent-Install-Type")) + assert.Equal(t, globalconfig.InstrumentationInstallTime(), request.Header.Get("DD-Agent-Install-Time")) + assert.Equal(t, "true", request.Header.Get("DD-Telemetry-Debug-Enabled")) + + assert.NotEmpty(t, request.Header.Get("DD-Agent-Hostname")) + + var body transport.Body + require.NoError(t, json.NewDecoder(request.Body).Decode(&body)) + + assert.Equal(t, "test-service", body.Application.ServiceName) + assert.Equal(t, "test-env", body.Application.Env) + assert.Equal(t, "1.0.0", body.Application.ServiceVersion) + assert.Equal(t, "go", body.Application.LanguageName) + assert.Equal(t, runtime.Version(), body.Application.LanguageVersion) + + assert.NotEmpty(t, body.Host.Hostname) + assert.Equal(t, osinfo.OSName(), body.Host.OS) + assert.Equal(t, osinfo.OSVersion(), body.Host.OSVersion) + assert.Equal(t, osinfo.Architecture(), body.Host.Architecture) + assert.Equal(t, osinfo.KernelName(), body.Host.KernelName) + assert.Equal(t, osinfo.KernelRelease(), body.Host.KernelRelease) + assert.Equal(t, osinfo.KernelVersion(), body.Host.KernelVersion) + + assert.Equal(t, true, body.Debug) + assert.Equal(t, "v2", body.APIVersion) + assert.NotZero(t, body.TracerTime) + assert.EqualValues(t, 1, body.SeqID) + assert.Equal(t, globalconfig.RuntimeID(), body.RuntimeID) + + return body + } + + for _, test := range []struct { + name string + when func(*client) + expect func(*testing.T, *http.Request) (*http.Response, error) + }{ + { + name: "app-start", + when: func(c *client) { + c.appStart() + }, + expect: func(t *testing.T, request *http.Request) (*http.Response, error) { + assert.EqualValues(t, transport.RequestTypeAppStarted, request.Header.Get("DD-Telemetry-Request-Type")) + body := parseRequest(t, request) + assert.Equal(t, transport.RequestTypeAppStarted, body.RequestType) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + }, + { + name: "app-stop", + when: func(c *client) { + c.appStop() + }, + expect: func(t *testing.T, request *http.Request) (*http.Response, error) { + assert.EqualValues(t, transport.RequestTypeAppClosing, request.Header.Get("DD-Telemetry-Request-Type")) + body := parseRequest(t, request) + assert.Equal(t, transport.RequestTypeAppClosing, body.RequestType) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + }, + { + name: "app-start+app-stop", + when: func(c *client) { + c.appStart() + c.appStop() + }, + expect: func(t *testing.T, request *http.Request) (*http.Response, error) { + assert.EqualValues(t, transport.RequestTypeMessageBatch, request.Header.Get("DD-Telemetry-Request-Type")) + body := parseRequest(t, request) + assert.Equal(t, transport.RequestTypeMessageBatch, body.RequestType) + + payload := body.Payload.(transport.MessageBatch) + assert.Len(t, payload.Payload, 2) + assert.Equal(t, transport.RequestTypeAppStarted, payload.Payload[0].RequestType) + assert.Equal(t, transport.RequestTypeAppClosing, payload.Payload[1].RequestType) + + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + clientConfig.HTTPClient.Transport = &testRoundTripper{ + roundTrip: func(req *http.Request) (*http.Response, error) { + if test.expect != nil { + return test.expect(t, req) + } + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + } + + c, err := newClient(tracerConfig, clientConfig) + require.NoError(t, err) + defer c.Close() + + test.when(c) + c.Flush() + }) + } +} diff --git a/internal/newtelemetry/internal/transport/body.go b/internal/newtelemetry/internal/transport/body.go index 1015df61e5..b34f673a23 100644 --- a/internal/newtelemetry/internal/transport/body.go +++ b/internal/newtelemetry/internal/transport/body.go @@ -5,6 +5,11 @@ package transport +import ( + "encoding/json" + "fmt" +) + // Application is identifying information about the app itself type Application struct { ServiceName string `json:"service_name"` @@ -40,3 +45,104 @@ type Body struct { Application Application `json:"application"` Host Host `json:"host"` } + +func (b *Body) UnmarshalJSON(bytes []byte) error { + var anyMap map[string]json.RawMessage + var err error + if err = json.Unmarshal(bytes, &anyMap); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["api_version"], &b.APIVersion); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["request_type"], &b.RequestType); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["tracer_time"], &b.TracerTime); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["runtime_id"], &b.RuntimeID); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["seq_id"], &b.SeqID); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["debug"], &b.Debug); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["application"], &b.Application); err != nil { + return err + } + + if err = json.Unmarshal(anyMap["host"], &b.Host); err != nil { + return err + } + + if b.RequestType == RequestTypeMessageBatch { + var messageBatch struct { + Payload []struct { + RequestType RequestType `json:"request_type"` + Payload json.RawMessage `json:"payload"` + } `json:"payload"` + } + + if err = json.Unmarshal(anyMap["payload"], &messageBatch); err != nil { + return err + } + + batch := make([]Message, len(messageBatch.Payload)) + for i, message := range messageBatch.Payload { + payload, err := unmarshalPayload(message.Payload, message.RequestType) + if err != nil { + return err + } + batch[i] = Message{RequestType: message.RequestType, Payload: payload} + } + b.Payload = MessageBatch{Payload: batch} + return nil + } + + b.Payload, err = unmarshalPayload(anyMap["payload"], b.RequestType) + return err +} + +func unmarshalPayload(bytes json.RawMessage, requestType RequestType) (Payload, error) { + var payload Payload + switch requestType { + case RequestTypeAppClientConfigurationChange: + payload = new(AppClientConfigurationChange) + case RequestTypeAppProductChange: + payload = new(AppProductChange) + case RequestTypeAppIntegrationsChange: + payload = new(AppIntegrationChange) + case RequestTypeAppHeartbeat: + payload = new(AppHeartbeat) + case RequestTypeAppStarted: + payload = new(AppStarted) + case RequestTypeAppClosing: + payload = new(AppClosing) + case RequestTypeAppExtendedHeartBeat: + payload = new(AppExtendedHeartbeat) + case RequestTypeAppDependenciesLoaded: + payload = new(AppDependenciesLoaded) + case RequestTypeDistributions: + payload = new(Distributions) + case RequestTypeGenerateMetrics: + payload = new(GenerateMetrics) + case RequestTypeLogs: + payload = new(Logs) + } + + if err := json.Unmarshal(bytes, payload); err != nil { + return nil, fmt.Errorf("failed to unmarshal payload: %v", err) + } + + return payload, nil +}