-
Notifications
You must be signed in to change notification settings - Fork 441
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CI Visibility] Add support for CI Visibility mode in ddtrace (#2739)
Co-authored-by: liashenko <[email protected]>
- Loading branch information
1 parent
1987b7b
commit 1aba968
Showing
24 changed files
with
5,186 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2024 Datadog, Inc. | ||
|
||
package tracer | ||
|
||
import ( | ||
"bytes" | ||
"sync/atomic" | ||
|
||
"github.com/tinylib/msgp/msgp" | ||
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" | ||
"gopkg.in/DataDog/dd-trace-go.v1/internal/version" | ||
) | ||
|
||
// ciVisibilityPayload represents a payload specifically designed for CI Visibility events. | ||
// It embeds the generic payload structure and adds methods to handle CI Visibility specific data. | ||
type ciVisibilityPayload struct { | ||
*payload | ||
} | ||
|
||
// push adds a new CI Visibility event to the payload buffer. | ||
// It grows the buffer to accommodate the new event, encodes the event in MessagePack format, and updates the event count. | ||
// | ||
// Parameters: | ||
// | ||
// event - The CI Visibility event to be added to the payload. | ||
// | ||
// Returns: | ||
// | ||
// An error if encoding the event fails. | ||
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error { | ||
p.buf.Grow(event.Msgsize()) | ||
if err := msgp.Encode(&p.buf, event); err != nil { | ||
return err | ||
} | ||
atomic.AddUint32(&p.count, 1) | ||
p.updateHeader() | ||
return nil | ||
} | ||
|
||
// newCiVisibilityPayload creates a new instance of civisibilitypayload. | ||
// | ||
// Returns: | ||
// | ||
// A pointer to a newly initialized civisibilitypayload instance. | ||
func newCiVisibilityPayload() *ciVisibilityPayload { | ||
return &ciVisibilityPayload{newPayload()} | ||
} | ||
|
||
// getBuffer retrieves the complete body of the CI Visibility payload, including metadata. | ||
// It reads the current payload buffer, adds metadata, and encodes the entire payload in MessagePack format. | ||
// | ||
// Parameters: | ||
// | ||
// config - A pointer to the config structure containing environment settings. | ||
// | ||
// Returns: | ||
// | ||
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload. | ||
// An error if reading from the buffer or encoding the payload fails. | ||
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) { | ||
|
||
/* | ||
The Payload format in the CI Visibility protocol is like this: | ||
{ | ||
"version": 1, | ||
"metadata": { | ||
"*": { | ||
"runtime-id": "...", | ||
"language": "...", | ||
"library_version": "...", | ||
"env": "..." | ||
} | ||
}, | ||
"events": [ | ||
// ... | ||
] | ||
} | ||
The event format can be found in the `civisibility_tslv.go` file in the ciVisibilityEvent documentation | ||
*/ | ||
|
||
// Create a buffer to read the current payload | ||
payloadBuf := new(bytes.Buffer) | ||
if _, err := payloadBuf.ReadFrom(p.payload); err != nil { | ||
return nil, err | ||
} | ||
|
||
// Create the metadata map | ||
allMetadata := map[string]string{ | ||
"language": "go", | ||
"runtime-id": globalconfig.RuntimeID(), | ||
"library_version": version.Tag, | ||
} | ||
if config.env != "" { | ||
allMetadata["env"] = config.env | ||
} | ||
|
||
// Create the visibility payload | ||
visibilityPayload := ciTestCyclePayload{ | ||
Version: 1, | ||
Metadata: map[string]map[string]string{ | ||
"*": allMetadata, | ||
}, | ||
Events: payloadBuf.Bytes(), | ||
} | ||
|
||
// Create a new buffer to encode the visibility payload in MessagePack format | ||
encodedBuf := new(bytes.Buffer) | ||
if err := msgp.Encode(encodedBuf, &visibilityPayload); err != nil { | ||
return nil, err | ||
} | ||
|
||
return encodedBuf, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2024 Datadog, Inc. | ||
|
||
package tracer | ||
|
||
import ( | ||
"bytes" | ||
"io" | ||
"strconv" | ||
"strings" | ||
"sync/atomic" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/tinylib/msgp/msgp" | ||
) | ||
|
||
func newCiVisibilityEventsList(n int) []*ciVisibilityEvent { | ||
list := make([]*ciVisibilityEvent, n) | ||
for i := 0; i < n; i++ { | ||
s := newBasicSpan("span.list." + strconv.Itoa(i%5+1)) | ||
s.Start = fixedTime | ||
list[i] = getCiVisibilityEvent(s) | ||
} | ||
|
||
return list | ||
} | ||
|
||
// TestCiVisibilityPayloadIntegrity tests that whatever we push into the payload | ||
// allows us to read the same content as would have been encoded by | ||
// the codec. | ||
func TestCiVisibilityPayloadIntegrity(t *testing.T) { | ||
want := new(bytes.Buffer) | ||
for _, n := range []int{10, 1 << 10, 1 << 17} { | ||
t.Run(strconv.Itoa(n), func(t *testing.T) { | ||
assert := assert.New(t) | ||
p := newCiVisibilityPayload() | ||
var allEvents ciVisibilityEvents | ||
|
||
for i := 0; i < n; i++ { | ||
list := newCiVisibilityEventsList(i%5 + 1) | ||
allEvents = append(allEvents, list...) | ||
for _, event := range list { | ||
p.push(event) | ||
} | ||
} | ||
|
||
want.Reset() | ||
err := msgp.Encode(want, allEvents) | ||
assert.NoError(err) | ||
assert.Equal(want.Len(), p.size()) | ||
assert.Equal(p.itemCount(), len(allEvents)) | ||
|
||
got, err := io.ReadAll(p) | ||
assert.NoError(err) | ||
assert.Equal(want.Bytes(), got) | ||
}) | ||
} | ||
} | ||
|
||
// TestCiVisibilityPayloadDecode ensures that whatever we push into the payload can | ||
// be decoded by the codec. | ||
func TestCiVisibilityPayloadDecode(t *testing.T) { | ||
assert := assert.New(t) | ||
for _, n := range []int{10, 1 << 10} { | ||
t.Run(strconv.Itoa(n), func(t *testing.T) { | ||
p := newCiVisibilityPayload() | ||
for i := 0; i < n; i++ { | ||
list := newCiVisibilityEventsList(i%5 + 1) | ||
for _, event := range list { | ||
p.push(event) | ||
} | ||
} | ||
var got ciVisibilityEvents | ||
err := msgp.Decode(p, &got) | ||
assert.NoError(err) | ||
}) | ||
} | ||
} | ||
|
||
func BenchmarkCiVisibilityPayloadThroughput(b *testing.B) { | ||
b.Run("10K", benchmarkCiVisibilityPayloadThroughput(1)) | ||
b.Run("100K", benchmarkCiVisibilityPayloadThroughput(10)) | ||
b.Run("1MB", benchmarkCiVisibilityPayloadThroughput(100)) | ||
} | ||
|
||
// benchmarkCiVisibilityPayloadThroughput benchmarks the throughput of the payload by subsequently | ||
// pushing a list of civisibility events containing count spans of approximately 10KB in size each, until the | ||
// payload is filled. | ||
func benchmarkCiVisibilityPayloadThroughput(count int) func(*testing.B) { | ||
return func(b *testing.B) { | ||
p := newCiVisibilityPayload() | ||
s := newBasicSpan("X") | ||
s.Meta["key"] = strings.Repeat("X", 10*1024) | ||
e := getCiVisibilityEvent(s) | ||
events := make(ciVisibilityEvents, count) | ||
for i := 0; i < count; i++ { | ||
events[i] = e | ||
} | ||
|
||
b.ReportAllocs() | ||
b.ResetTimer() | ||
reset := func() { | ||
p.header = make([]byte, 8) | ||
p.off = 8 | ||
atomic.StoreUint32(&p.count, 0) | ||
p.buf.Reset() | ||
} | ||
for i := 0; i < b.N; i++ { | ||
reset() | ||
for _, event := range events { | ||
for p.size() < payloadMaxLimit { | ||
p.push(event) | ||
} | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.