Skip to content

Commit

Permalink
[NPM-4131] Add netpath aggregator + e2e test (#33416)
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu authored Jan 30, 2025
1 parent 78991f6 commit 7d4d74f
Show file tree
Hide file tree
Showing 19 changed files with 455 additions and 4 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,8 @@ require (
github.com/DataDog/datadog-agent/comp/otelcol/ddflareextension/impl v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/config/structure v0.61.0
github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0
github.com/DataDog/dd-trace-go/v2 v2.0.0-beta.11
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkpath/payload/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ go 1.23.0
replace github.com/DataDog/datadog-agent/pkg/network/payload => ../../network/payload

require (
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/google/uuid v1.6.0
)
2 changes: 2 additions & 0 deletions test/fakeintake/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ COPY test/fakeintake/go.mod test/fakeintake/go.sum ./
# every datadog-agent module imported by the fakeintake has to be copied in the build image
COPY pkg/proto /pkg/proto
COPY comp/netflow/payload /comp/netflow/payload
COPY pkg/network/payload /pkg/network/payload
COPY pkg/networkpath/payload /pkg/networkpath/payload

RUN go mod download

Expand Down
Binary file added test/fakeintake/aggregator/fixtures/netpath_bytes
Binary file not shown.
68 changes: 68 additions & 0 deletions test/fakeintake/aggregator/netpathAggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package aggregator

import (
"bytes"
"encoding/json"
"fmt"
"time"

"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/DataDog/datadog-agent/test/fakeintake/api"
)

// Netpath represents a network path payload
type Netpath struct {
collectedTime time.Time
payload.NetworkPath
}

func (p *Netpath) name() string {
return fmt.Sprintf("%s:%d %s", p.Destination.Hostname, p.Destination.Port, p.Protocol)
}

// GetTags return the tags from a payload
func (p *Netpath) GetTags() []string {
return []string{}
}

// GetCollectedTime return the time when the payload has been collected by the fakeintake server
func (p *Netpath) GetCollectedTime() time.Time {
return p.collectedTime
}

// ParseNetpathPayload parses an api.Payload into a list of Netpath
func ParseNetpathPayload(payload api.Payload) (netpaths []*Netpath, err error) {
if len(payload.Data) == 0 || bytes.Equal(payload.Data, []byte("{}")) {
return []*Netpath{}, nil
}
enflated, err := enflate(payload.Data, payload.Encoding)
if err != nil {
return nil, err
}
netpaths = []*Netpath{}
err = json.Unmarshal(enflated, &netpaths)
if err != nil {
return nil, err
}
for _, n := range netpaths {
n.collectedTime = payload.Timestamp
}
return netpaths, err
}

// NetpathAggregator is an Aggregator for netpath payloads
type NetpathAggregator struct {
Aggregator[*Netpath]
}

// NewNetpathAggregator return a new NetpathAggregator
func NewNetpathAggregator() NetpathAggregator {
return NetpathAggregator{
Aggregator: newAggregator(ParseNetpathPayload),
}
}
76 changes: 76 additions & 0 deletions test/fakeintake/aggregator/netpathAggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package aggregator

import (
_ "embed"
"testing"

"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/DataDog/datadog-agent/test/fakeintake/api"
"github.com/stretchr/testify/assert"
)

//go:embed fixtures/netpath_bytes
var netpathData []byte

func TestNetpathAggregator(t *testing.T) {
t.Run("ParseNetpathPayload should return empty Netpath array on empty data", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: []byte(""), Encoding: encodingEmpty})
assert.NoError(t, err)
assert.Empty(t, netpaths)
})

t.Run("ParseNetpathPayload should return empty Netpath array on empty json object", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: []byte("{}"), Encoding: encodingJSON})
assert.NoError(t, err)
assert.Empty(t, netpaths)
})

t.Run("ParseNetpathPayload should return a valid Netpath on valid payload", func(t *testing.T) {
netpaths, err := ParseNetpathPayload(api.Payload{Data: netpathData, Encoding: encodingGzip})
assert.NoError(t, err)

assert.Len(t, netpaths, 1)
np := netpaths[0]

assert.Equal(t, int64(1737933404281), np.Timestamp)
assert.Equal(t, "7.64.0-devel+git.40.38beef2", np.AgentVersion)
assert.Equal(t, "default", np.Namespace)
assert.Equal(t, "da6f9055-b7df-41b0-bafd-0e5d3c6c370e", np.PathtraceID)
assert.Equal(t, payload.PathOrigin("network_path_integration"), np.Origin)
assert.Equal(t, payload.Protocol("TCP"), np.Protocol)
assert.Equal(t, "i-019fda1a9f830d95e", np.Source.Hostname)
assert.Equal(t, "subnet-091570395d476e9ce", np.Source.Via.Subnet.Alias)
assert.Equal(t, "vpc-029c0faf8f49dee8d", np.Source.NetworkID)
assert.Equal(t, "api.datadoghq.eu", np.Destination.Hostname)
assert.Equal(t, "34.107.236.155", np.Destination.IPAddress)
assert.Equal(t, uint16(443), np.Destination.Port)
assert.Equal(t, "155.236.107.34.bc.googleusercontent.com", np.Destination.ReverseDNSHostname)

assert.Len(t, np.Hops, 9)
assert.Equal(t, payload.NetworkPathHop{
TTL: 1,
IPAddress: "10.1.62.52",
Hostname: "ip-10-1-62-52.ec2.internal",
RTT: 0.39,
Reachable: true,
}, np.Hops[0])
assert.Equal(t, payload.NetworkPathHop{
TTL: 2,
IPAddress: "unknown_hop_2",
Hostname: "unknown_hop_2",
Reachable: false,
}, np.Hops[1])
assert.Equal(t, payload.NetworkPathHop{
TTL: 9,
IPAddress: "34.107.236.155",
Hostname: "155.236.107.34.bc.googleusercontent.com",
RTT: 2.864,
Reachable: true,
}, np.Hops[8])
})
}
24 changes: 24 additions & 0 deletions test/fakeintake/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
orchestratorManifestEndpoint = "/api/v2/orchmanif"
metadataEndpoint = "/api/v1/metadata"
ndmflowEndpoint = "/api/v2/ndmflow"
netpathEndpoint = "/api/v2/netpath"
apmTelemetryEndpoint = "/api/v2/apmtelemetry"
)

Expand Down Expand Up @@ -119,6 +120,7 @@ type Client struct {
orchestratorManifestAggregator aggregator.OrchestratorManifestAggregator
metadataAggregator aggregator.MetadataAggregator
ndmflowAggregator aggregator.NDMFlowAggregator
netpathAggregator aggregator.NetpathAggregator
serviceDiscoveryAggregator aggregator.ServiceDiscoveryAggregator
}

Expand All @@ -145,6 +147,7 @@ func NewClient(fakeIntakeURL string, opts ...Option) *Client {
orchestratorManifestAggregator: aggregator.NewOrchestratorManifestAggregator(),
metadataAggregator: aggregator.NewMetadataAggregator(),
ndmflowAggregator: aggregator.NewNDMFlowAggregator(),
netpathAggregator: aggregator.NewNetpathAggregator(),
serviceDiscoveryAggregator: aggregator.NewServiceDiscoveryAggregator(),
}
for _, opt := range opts {
Expand Down Expand Up @@ -280,6 +283,14 @@ func (c *Client) getNDMFlows() error {
return c.ndmflowAggregator.UnmarshallPayloads(payloads)
}

func (c *Client) getNetpathEvents() error {
payloads, err := c.getFakePayloads(netpathEndpoint)
if err != nil {
return err
}
return c.netpathAggregator.UnmarshallPayloads(payloads)
}

// FilterMetrics fetches fakeintake on `/api/v2/series` endpoint and returns
// metrics matching `name` and any [MatchOpt](#MatchOpt) options
func (c *Client) FilterMetrics(name string, options ...MatchOpt[*aggregator.MetricSeries]) ([]*aggregator.MetricSeries, error) {
Expand Down Expand Up @@ -879,6 +890,19 @@ func (c *Client) GetNDMFlows() ([]*aggregator.NDMFlow, error) {
return ndmflows, nil
}

// GetNetpathEvents returns the latest netpath events by destination
func (c *Client) GetNetpathEvents() ([]*aggregator.Netpath, error) {
err := c.getNetpathEvents()
if err != nil {
return nil, err
}
var netpaths []*aggregator.Netpath
for _, name := range c.netpathAggregator.GetNames() {
netpaths = append(netpaths, c.netpathAggregator.GetPayloadsByName(name)...)
}
return netpaths, nil
}

// filterPayload returns payloads matching any [MatchOpt](#MatchOpt) options
func filterPayload[T aggregator.PayloadItem](payloads []T, options ...MatchOpt[T]) ([]T, error) {
// apply filters one after the other
Expand Down
15 changes: 15 additions & 0 deletions test/fakeintake/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var apiV1Metadata []byte
//go:embed fixtures/api_v2_ndmflow_response
var apiV2NDMFlow []byte

//go:embed fixtures/api_v2_netpath_response
var apiV2Netpath []byte

//go:embed fixtures/api_v2_telemetry_response
var apiV2Teleemtry []byte

Expand Down Expand Up @@ -569,6 +572,18 @@ func TestClient(t *testing.T) {
assert.Empty(t, ndmflows[0].AdditionalFields)
})

t.Run("getNetpathEvents", func(t *testing.T) {
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(apiV2Netpath)
}))
defer ts.Close()

client := NewClient(ts.URL)
err := client.getNetpathEvents()
require.NoError(t, err)
assert.True(t, client.netpathAggregator.ContainsPayloadName("api.datadoghq.eu:443 TCP"))
})

t.Run("getServiceDiscoveries", func(t *testing.T) {
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(apiV2Teleemtry)
Expand Down
10 changes: 10 additions & 0 deletions test/fakeintake/client/fixtures/api_v2_netpath_response
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"payloads": [
{
"timestamp": "2025-01-26T23:16:45.522649265Z",
"data": "H4sIAAAAAAAA/5yUzY6jOBSF3+VuB678y992XmAWs2uVkMEXgorYtDGpRSnv3nIq3Z0QJS31Agn5Wt85Pgfz7RPidKQ1muMCDS9lWUupmBIVz8CM5GJ7orBO3kEDJRYKWW7pRPM/4xRRMZRVRzQIyMCZI62L6QkasDSYbY6QwWLiIQbTUzvZNDDFUDOt8660Q654x/LODDZnpK3si16WjCADH6ZxSpKO4ocP723CtJOLNAYTk5sMluCj7/0MDfz/73+Qweq3kNQ/4eDXmOxAA1POeD1Yw009VJLZWif+aTJp37p1jmJ6M/NkVmiuKzmruS6ZrLVVZUF1T3A+Z7/MXE5yWvqcibpngxmqQdWWqLJwzsDSGif35fLei1kmtCYa68fDd6QNMpiW1lgbaE3iUiFnJQpZINc6HdGHCI1SMoNAqQdqrVvbGyTX+ms/K1Eq7HocvR9n2lYKvXeRXMTeH5Ovg19WaFLhcYaG77Q5Q46FQJ2qvM1vyTnLeV6IXAukXmBqITgzQwYhRmgYyjr5M/3BdDNBE8NG5+yqI3Y6m3t3/sO1B7+0O6n96AY5mHn9zZQvmPI5U75iqh1TKIbsK9p74v3gkoBArsXzCPRD1AzTc6mN3dMfZleBqqieCxQP3jkyVFgWe+s36xcwf+283IN5gULW6TvhfJ/LfnZ1LqoXzqt9NEqg0Dz9ZzjbVfkw+5k9K58L1H+6Yn9xl25KUY/Kb+e3HwAAAP//AQAA///bNfaSVQUAAA==",
"encoding": "gzip",
"content_type": "application/json"
}
]
}
4 changes: 4 additions & 0 deletions test/fakeintake/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ go 1.23.0
// every datadog-agent module replaced in the fakeintake go.mod needs to be copied in the Dockerfile
replace (
github.com/DataDog/datadog-agent/comp/netflow/payload => ../../comp/netflow/payload
github.com/DataDog/datadog-agent/pkg/network/payload => ../../pkg/network/payload
github.com/DataDog/datadog-agent/pkg/networkpath/payload => ../../pkg/networkpath/payload
github.com/DataDog/datadog-agent/pkg/proto => ../../pkg/proto
)

require (
github.com/DataDog/agent-payload/v5 v5.0.141
github.com/DataDog/datadog-agent/comp/netflow/payload v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.3
github.com/DataDog/zstd v1.5.6
github.com/benbjohnson/clock v1.3.5
Expand All @@ -28,6 +31,7 @@ require (
)

require (
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07 // indirect
github.com/DataDog/mmh3 v0.0.0-20210722141835-012dc69a9e49 // indirect
github.com/DataDog/zstd_0 v0.0.0-20210310093942-586c1286621f // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions test/new-e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ replace (
github.com/DataDog/datadog-agent/pkg/config/nodetreemodel => ../../pkg/config/nodetreemodel
github.com/DataDog/datadog-agent/pkg/config/setup => ../../pkg/config/setup
github.com/DataDog/datadog-agent/pkg/config/teeconfig => ../../pkg/config/teeconfig
github.com/DataDog/datadog-agent/pkg/network/payload => ../../pkg/network/payload
github.com/DataDog/datadog-agent/pkg/networkpath/payload => ../../pkg/networkpath/payload
github.com/DataDog/datadog-agent/pkg/proto => ../../pkg/proto
github.com/DataDog/datadog-agent/pkg/trace => ../../pkg/trace
github.com/DataDog/datadog-agent/pkg/util/executable => ../../pkg/util/executable
Expand Down Expand Up @@ -280,6 +282,7 @@ require (

require (
github.com/DataDog/datadog-agent/comp/core/tagger/types v0.64.0-devel
github.com/DataDog/datadog-agent/pkg/networkpath/payload v0.0.0-20250128160050-7ac9ccd58c07
github.com/DataDog/datadog-agent/pkg/trace v0.56.0-rc.3
github.com/DataDog/datadog-go/v5 v5.6.0
github.com/aws/aws-sdk-go v1.55.6
Expand All @@ -291,6 +294,7 @@ require (

require (
github.com/DataDog/datadog-agent/comp/core/tagger/utils v0.59.0 // indirect
github.com/DataDog/datadog-agent/pkg/network/payload v0.0.0-20250128160050-7ac9ccd58c07 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/charmbracelet/x/ansi v0.6.0 // indirect
github.com/charmbracelet/x/term v0.2.1 // indirect
Expand Down
6 changes: 5 additions & 1 deletion test/new-e2e/pkg/environments/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ func (e *Host) generateAndDownloadAgentFlare(outputDir string) (string, error) {
// discard error, flare command might return error if there is no intake, but it the archive is still generated
flareCommandOutput, err := e.Agent.Client.FlareWithError(agentclient.WithArgs([]string{"--email", "e2e-tests@datadog-agent", "--send", "--local"}))

lines := []string{flareCommandOutput}
if err != nil {
lines = append(lines, err.Error())
}
// on error, the flare output is in the error message
flareCommandOutput = strings.Join([]string{flareCommandOutput, err.Error()}, "\n")
flareCommandOutput = strings.Join(lines, "\n")

// find <path to flare>.zip in flare command output
// (?m) is a flag that allows ^ and $ to match the beginning and end of each line
Expand Down
Loading

0 comments on commit 7d4d74f

Please sign in to comment.