Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI Visibility] Add support for CI Visibility mode in ddtrace #2739

117 changes: 117 additions & 0 deletions ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracer

import (
"bytes"
"sync/atomic"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

// ciVisibilityPayload represents a payload specifically designed for CI Visibility events.
// It embeds the generic payload structure and adds methods to handle CI Visibility specific data.
type ciVisibilityPayload struct {
*payload
}

// push adds a new CI Visibility event to the payload buffer.
// It grows the buffer to accommodate the new event, encodes the event in MessagePack format, and updates the event count.
//
// Parameters:
//
// event - The CI Visibility event to be added to the payload.
//
// Returns:
//
// An error if encoding the event fails.
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
p.buf.Grow(event.Msgsize())
if err := msgp.Encode(&p.buf, event); err != nil {
return err
}
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}

// newCiVisibilityPayload creates a new instance of civisibilitypayload.
//
// Returns:
//
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
return &ciVisibilityPayload{newPayload()}
}

// getBuffer retrieves the complete body of the CI Visibility payload, including metadata.
// It reads the current payload buffer, adds metadata, and encodes the entire payload in MessagePack format.
//
// Parameters:
//
// config - A pointer to the config structure containing environment settings.
//
// Returns:
//
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {

/*
The Payload format in the CI Visibility protocol is like this:
{
"version": 1,
"metadata": {
"*": {
"runtime-id": "...",
"language": "...",
"library_version": "...",
"env": "..."
}
},
"events": [
// ...
]
}

The event format can be found in the `civisibility_tslv.go` file in the ciVisibilityEvent documentation
*/

// Create a buffer to read the current payload
payloadBuf := new(bytes.Buffer)
if _, err := payloadBuf.ReadFrom(p.payload); err != nil {
return nil, err
}

// Create the metadata map
allMetadata := map[string]string{
"language": "go",
"runtime-id": globalconfig.RuntimeID(),
"library_version": version.Tag,
}
if config.env != "" {
allMetadata["env"] = config.env
}

// Create the visibility payload
visibilityPayload := ciTestCyclePayload{
Version: 1,
Metadata: map[string]map[string]string{
"*": allMetadata,
},
Events: payloadBuf.Bytes(),
}

// Create a new buffer to encode the visibility payload in MessagePack format
encodedBuf := new(bytes.Buffer)
if err := msgp.Encode(encodedBuf, &visibilityPayload); err != nil {
return nil, err
}

return encodedBuf, nil
}
120 changes: 120 additions & 0 deletions ddtrace/tracer/civisibility_payload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracer

import (
"bytes"
"io"
"strconv"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/tinylib/msgp/msgp"
)

func newCiVisibilityEventsList(n int) []*ciVisibilityEvent {
list := make([]*ciVisibilityEvent, n)
for i := 0; i < n; i++ {
s := newBasicSpan("span.list." + strconv.Itoa(i%5+1))
s.Start = fixedTime
list[i] = getCiVisibilityEvent(s)
}

return list
}

// TestCiVisibilityPayloadIntegrity tests that whatever we push into the payload
// allows us to read the same content as would have been encoded by
// the codec.
func TestCiVisibilityPayloadIntegrity(t *testing.T) {
want := new(bytes.Buffer)
for _, n := range []int{10, 1 << 10, 1 << 17} {
dianashevchenko marked this conversation as resolved.
Show resolved Hide resolved
t.Run(strconv.Itoa(n), func(t *testing.T) {
assert := assert.New(t)
p := newCiVisibilityPayload()
var allEvents ciVisibilityEvents

for i := 0; i < n; i++ {
list := newCiVisibilityEventsList(i%5 + 1)
allEvents = append(allEvents, list...)
for _, event := range list {
p.push(event)
}
}

want.Reset()
err := msgp.Encode(want, allEvents)
assert.NoError(err)
assert.Equal(want.Len(), p.size())
assert.Equal(p.itemCount(), len(allEvents))

got, err := io.ReadAll(p)
assert.NoError(err)
assert.Equal(want.Bytes(), got)
})
}
}

// TestCiVisibilityPayloadDecode ensures that whatever we push into the payload can
// be decoded by the codec.
func TestCiVisibilityPayloadDecode(t *testing.T) {
assert := assert.New(t)
for _, n := range []int{10, 1 << 10} {
t.Run(strconv.Itoa(n), func(t *testing.T) {
p := newCiVisibilityPayload()
for i := 0; i < n; i++ {
list := newCiVisibilityEventsList(i%5 + 1)
for _, event := range list {
p.push(event)
}
}
var got ciVisibilityEvents
err := msgp.Decode(p, &got)
assert.NoError(err)
})
}
}

func BenchmarkCiVisibilityPayloadThroughput(b *testing.B) {
b.Run("10K", benchmarkCiVisibilityPayloadThroughput(1))
b.Run("100K", benchmarkCiVisibilityPayloadThroughput(10))
b.Run("1MB", benchmarkCiVisibilityPayloadThroughput(100))
}

// benchmarkCiVisibilityPayloadThroughput benchmarks the throughput of the payload by subsequently
// pushing a list of civisibility events containing count spans of approximately 10KB in size each, until the
// payload is filled.
func benchmarkCiVisibilityPayloadThroughput(count int) func(*testing.B) {
return func(b *testing.B) {
p := newCiVisibilityPayload()
s := newBasicSpan("X")
s.Meta["key"] = strings.Repeat("X", 10*1024)
e := getCiVisibilityEvent(s)
events := make(ciVisibilityEvents, count)
for i := 0; i < count; i++ {
events[i] = e
}

b.ReportAllocs()
b.ResetTimer()
reset := func() {
p.header = make([]byte, 8)
p.off = 8
atomic.StoreUint32(&p.count, 0)
p.buf.Reset()
}
for i := 0; i < b.N; i++ {
reset()
for _, event := range events {
for p.size() < payloadMaxLimit {
p.push(event)
}
}
}
}
}
Loading
Loading