Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NPM-4131] Add netpath aggregator + e2e test #33416

Merged
merged 13 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,8 @@ require (
github.com/DataDog/datadog-agent/comp/otelcol/ddflareextension/impl v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/config/structure v0.61.0
github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0
github.com/DataDog/dd-trace-go/v2 v2.0.0-beta.11
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkpath/payload/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ go 1.23.0
replace github.com/DataDog/datadog-agent/pkg/network/payload => ../../network/payload

require (
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/google/uuid v1.6.0
)
2 changes: 2 additions & 0 deletions test/fakeintake/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ COPY test/fakeintake/go.mod test/fakeintake/go.sum ./
# every datadog-agent module imported by the fakeintake has to be copied in the build image
COPY pkg/proto /pkg/proto
COPY comp/netflow/payload /comp/netflow/payload
COPY pkg/network/payload /pkg/network/payload
COPY pkg/networkpath/payload /pkg/networkpath/payload

RUN go mod download

Expand Down
Binary file added test/fakeintake/aggregator/fixtures/netpath_bytes
Binary file not shown.
68 changes: 68 additions & 0 deletions test/fakeintake/aggregator/netpathAggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package aggregator

import (
"bytes"
"encoding/json"
"fmt"
"time"

"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/DataDog/datadog-agent/test/fakeintake/api"
)

// Netpath represents a network path payload
type Netpath struct {
collectedTime time.Time
payload.NetworkPath
}

func (p *Netpath) name() string {
return fmt.Sprintf("%s:%d %s", p.Destination.Hostname, p.Destination.Port, p.Protocol)
}

// GetTags return the tags from a payload
func (p *Netpath) GetTags() []string {
return []string{}
}

// GetCollectedTime return the time when the payload has been collected by the fakeintake server
func (p *Netpath) GetCollectedTime() time.Time {
return p.collectedTime
}

// ParseNetpathPayload parses an api.Payload into a list of Netpath
func ParseNetpathPayload(payload api.Payload) (netpaths []*Netpath, err error) {
if len(payload.Data) == 0 || bytes.Equal(payload.Data, []byte("{}")) {
return []*Netpath{}, nil
}
enflated, err := enflate(payload.Data, payload.Encoding)
if err != nil {
return nil, err
}
netpaths = []*Netpath{}
err = json.Unmarshal(enflated, &netpaths)
if err != nil {
return nil, err
}
for _, n := range netpaths {
n.collectedTime = payload.Timestamp
}
return netpaths, err
}

// NetpathAggregator is an Aggregator for netpath payloads
type NetpathAggregator struct {
Aggregator[*Netpath]
}

// NewNetpathAggregator return a new NetpathAggregator
func NewNetpathAggregator() NetpathAggregator {
return NetpathAggregator{
Aggregator: newAggregator(ParseNetpathPayload),
}
}
76 changes: 76 additions & 0 deletions test/fakeintake/aggregator/netpathAggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package aggregator

import (
_ "embed"
"testing"

"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/DataDog/datadog-agent/test/fakeintake/api"
"github.com/stretchr/testify/assert"
)

//go:embed fixtures/netpath_bytes
var netpathData []byte

func TestNetpathAggregator(t *testing.T) {
t.Run("ParseNetpathPayload should return empty Netpath array on empty data", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: []byte(""), Encoding: encodingEmpty})
assert.NoError(t, err)
assert.Empty(t, netpaths)
})

t.Run("ParseNetpathPayload should return empty Netpath array on empty json object", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: []byte("{}"), Encoding: encodingJSON})
assert.NoError(t, err)
assert.Empty(t, netpaths)
})

t.Run("ParseNetpathPayload should return a valid Netpath on valid payload", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: netpathData, Encoding: encodingGzip})
assert.NoError(t, err)

assert.Len(t, netpaths, 1)
np := netpaths[0]

assert.Equal(t, int64(1737933404281), np.Timestamp)
assert.Equal(t, "7.64.0-devel+git.40.38beef2", np.AgentVersion)
assert.Equal(t, "default", np.Namespace)
assert.Equal(t, "da6f9055-b7df-41b0-bafd-0e5d3c6c370e", np.PathtraceID)
assert.Equal(t, payload.PathOrigin("network_path_integration"), np.Origin)
assert.Equal(t, payload.Protocol("TCP"), np.Protocol)
assert.Equal(t, "i-019fda1a9f830d95e", np.Source.Hostname)
assert.Equal(t, "subnet-091570395d476e9ce", np.Source.Via.Subnet.Alias)
assert.Equal(t, "vpc-029c0faf8f49dee8d", np.Source.NetworkID)
assert.Equal(t, "api.datadoghq.eu", np.Destination.Hostname)
assert.Equal(t, "34.107.236.155", np.Destination.IPAddress)
assert.Equal(t, uint16(443), np.Destination.Port)
assert.Equal(t, "155.236.107.34.bc.googleusercontent.com", np.Destination.ReverseDNSHostname)

assert.Len(t, np.Hops, 9)
assert.Equal(t, payload.NetworkPathHop{
TTL: 1,
IPAddress: "10.1.62.52",
Hostname: "ip-10-1-62-52.ec2.internal",
RTT: 0.39,
Reachable: true,
}, np.Hops[0])
assert.Equal(t, payload.NetworkPathHop{
TTL: 2,
IPAddress: "unknown_hop_2",
Hostname: "unknown_hop_2",
Reachable: false,
}, np.Hops[1])
assert.Equal(t, payload.NetworkPathHop{
TTL: 9,
IPAddress: "34.107.236.155",
Hostname: "155.236.107.34.bc.googleusercontent.com",
RTT: 2.864,
Reachable: true,
}, np.Hops[8])
})
}
24 changes: 24 additions & 0 deletions test/fakeintake/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
orchestratorManifestEndpoint = "/api/v2/orchmanif"
metadataEndpoint = "/api/v1/metadata"
ndmflowEndpoint = "/api/v2/ndmflow"
netpathEndpoint = "/api/v2/netpath"
apmTelemetryEndpoint = "/api/v2/apmtelemetry"
)

Expand Down Expand Up @@ -119,6 +120,7 @@ type Client struct {
orchestratorManifestAggregator aggregator.OrchestratorManifestAggregator
metadataAggregator aggregator.MetadataAggregator
ndmflowAggregator aggregator.NDMFlowAggregator
netpathAggregator aggregator.NetpathAggregator
serviceDiscoveryAggregator aggregator.ServiceDiscoveryAggregator
}

Expand All @@ -145,6 +147,7 @@ func NewClient(fakeIntakeURL string, opts ...Option) *Client {
orchestratorManifestAggregator: aggregator.NewOrchestratorManifestAggregator(),
metadataAggregator: aggregator.NewMetadataAggregator(),
ndmflowAggregator: aggregator.NewNDMFlowAggregator(),
netpathAggregator: aggregator.NewNetpathAggregator(),
serviceDiscoveryAggregator: aggregator.NewServiceDiscoveryAggregator(),
}
for _, opt := range opts {
Expand Down Expand Up @@ -280,6 +283,14 @@ func (c *Client) getNDMFlows() error {
return c.ndmflowAggregator.UnmarshallPayloads(payloads)
}

func (c *Client) getNetpathEvents() error {
payloads, err := c.getFakePayloads(netpathEndpoint)
if err != nil {
return err
}
return c.netpathAggregator.UnmarshallPayloads(payloads)
}

// FilterMetrics fetches fakeintake on `/api/v2/series` endpoint and returns
// metrics matching `name` and any [MatchOpt](#MatchOpt) options
func (c *Client) FilterMetrics(name string, options ...MatchOpt[*aggregator.MetricSeries]) ([]*aggregator.MetricSeries, error) {
Expand Down Expand Up @@ -879,6 +890,19 @@ func (c *Client) GetNDMFlows() ([]*aggregator.NDMFlow, error) {
return ndmflows, nil
}

// GetNetpathEvents returns the latest netpath events by destination
func (c *Client) GetNetpathEvents() ([]*aggregator.Netpath, error) {
err := c.getNetpathEvents()
if err != nil {
return nil, err
}
var netpaths []*aggregator.Netpath
for _, name := range c.netpathAggregator.GetNames() {
netpaths = append(netpaths, c.netpathAggregator.GetPayloadsByName(name)...)
}
return netpaths, nil
}

// filterPayload returns payloads matching any [MatchOpt](#MatchOpt) options
func filterPayload[T aggregator.PayloadItem](payloads []T, options ...MatchOpt[T]) ([]T, error) {
// apply filters one after the other
Expand Down
15 changes: 15 additions & 0 deletions test/fakeintake/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var apiV1Metadata []byte
//go:embed fixtures/api_v2_ndmflow_response
var apiV2NDMFlow []byte

//go:embed fixtures/api_v2_netpath_response
var apiV2Netpath []byte

//go:embed fixtures/api_v2_telemetry_response
var apiV2Teleemtry []byte

Expand Down Expand Up @@ -569,6 +572,18 @@ func TestClient(t *testing.T) {
assert.Empty(t, ndmflows[0].AdditionalFields)
})

t.Run("getNetpathEvents", func(t *testing.T) {
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(apiV2Netpath)
}))
defer ts.Close()

client := NewClient(ts.URL)
err := client.getNetpathEvents()
require.NoError(t, err)
assert.True(t, client.netpathAggregator.ContainsPayloadName("api.datadoghq.eu:443 TCP"))
})

t.Run("getServiceDiscoveries", func(t *testing.T) {
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(apiV2Teleemtry)
Expand Down
10 changes: 10 additions & 0 deletions test/fakeintake/client/fixtures/api_v2_netpath_response
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"payloads": [
{
"timestamp": "2025-01-26T23:16:45.522649265Z",
"data": "H4sIAAAAAAAA/5yUzY6jOBSF3+VuB678y992XmAWs2uVkMEXgorYtDGpRSnv3nIq3Z0QJS31Agn5Wt85Pgfz7RPidKQ1muMCDS9lWUupmBIVz8CM5GJ7orBO3kEDJRYKWW7pRPM/4xRRMZRVRzQIyMCZI62L6QkasDSYbY6QwWLiIQbTUzvZNDDFUDOt8660Q654x/LODDZnpK3si16WjCADH6ZxSpKO4ocP723CtJOLNAYTk5sMluCj7/0MDfz/73+Qweq3kNQ/4eDXmOxAA1POeD1Yw009VJLZWif+aTJp37p1jmJ6M/NkVmiuKzmruS6ZrLVVZUF1T3A+Z7/MXE5yWvqcibpngxmqQdWWqLJwzsDSGif35fLei1kmtCYa68fDd6QNMpiW1lgbaE3iUiFnJQpZINc6HdGHCI1SMoNAqQdqrVvbGyTX+ms/K1Eq7HocvR9n2lYKvXeRXMTeH5Ovg19WaFLhcYaG77Q5Q46FQJ2qvM1vyTnLeV6IXAukXmBqITgzQwYhRmgYyjr5M/3BdDNBE8NG5+yqI3Y6m3t3/sO1B7+0O6n96AY5mHn9zZQvmPI5U75iqh1TKIbsK9p74v3gkoBArsXzCPRD1AzTc6mN3dMfZleBqqieCxQP3jkyVFgWe+s36xcwf+283IN5gULW6TvhfJ/LfnZ1LqoXzqt9NEqg0Dz9ZzjbVfkw+5k9K58L1H+6Yn9xl25KUY/Kb+e3HwAAAP//AQAA///bNfaSVQUAAA==",
"encoding": "gzip",
"content_type": "application/json"
}
]
}
4 changes: 4 additions & 0 deletions test/fakeintake/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ go 1.23.0
// every datadog-agent module replaced in the fakeintake go.mod needs to be copied in the Dockerfile
replace (
github.com/DataDog/datadog-agent/comp/netflow/payload => ../../comp/netflow/payload
github.com/DataDog/datadog-agent/pkg/network/payload => ../../pkg/network/payload
github.com/DataDog/datadog-agent/pkg/networkpath/payload => ../../pkg/networkpath/payload
github.com/DataDog/datadog-agent/pkg/proto => ../../pkg/proto
)

require (
github.com/DataDog/agent-payload/v5 v5.0.141
github.com/DataDog/datadog-agent/comp/netflow/payload v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.3
github.com/DataDog/zstd v1.5.6
github.com/benbjohnson/clock v1.3.5
Expand All @@ -28,6 +31,7 @@ require (
)

require (
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07 // indirect
github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49 // indirect
github.com/DataDog/zstd_0 v0.0.0-20210310093942-586c1286621f // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions test/new-e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ replace (
github.com/DataDog/datadog-agent/pkg/config/nodetreemodel => ../../pkg/config/nodetreemodel
github.com/DataDog/datadog-agent/pkg/config/setup => ../../pkg/config/setup
github.com/DataDog/datadog-agent/pkg/config/teeconfig => ../../pkg/config/teeconfig
github.com/DataDog/datadog-agent/pkg/network/payload => ../../pkg/network/payload
github.com/DataDog/datadog-agent/pkg/networkpath/payload => ../../pkg/networkpath/payload
github.com/DataDog/datadog-agent/pkg/proto => ../../pkg/proto
github.com/DataDog/datadog-agent/pkg/trace => ../../pkg/trace
github.com/DataDog/datadog-agent/pkg/util/executable => ../../pkg/util/executable
Expand Down Expand Up @@ -280,6 +282,7 @@ require (

require (
github.com/DataDog/datadog-agent/comp/core/tagger/types v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/trace v0.56.0-rc.3
github.com/DataDog/datadog-go/v5 v5.6.0
github.com/aws/aws-sdk-go v1.55.6
Expand All @@ -291,6 +294,7 @@ require (

require (
github.com/DataDog/datadog-agent/comp/core/tagger/utils v0.59.0 // indirect
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/charmbracelet/x/ansi v0.6.0 // indirect
github.com/charmbracelet/x/term v0.2.1 // indirect
Expand Down
6 changes: 5 additions & 1 deletion test/new-e2e/pkg/environments/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ func (e *Host) generateAndDownloadAgentFlare(outputDir string) (string, error) {
// discard error, flare command might return error if there is no intake, but it the archive is still generated
flareCommandOutput, err := e.Agent.Client.FlareWithError(agentclient.WithArgs([]string{"--email", "e2e-tests@datadog-agent", "--send", "--local"}))

lines := []string{flareCommandOutput}
if err != nil {
lines = append(lines, err.Error())
}
// on error, the flare output is in the error message
flareCommandOutput = strings.Join([]string{flareCommandOutput, err.Error()}, "\n")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the correct fix, but new-e2e was panicking here which was interfering with pulumi destroy. Let me know if this should be changed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah that should help. We were aware of the issue but did not check to fix it yet.
Thanks for the fix

flareCommandOutput = strings.Join(lines, "\n")

// find <path to flare>.zip in flare command output
// (?m) is a flag that allows ^ and $ to match the beginning and end of each line
Expand Down
Loading
Loading