Skip to content

Commit

Permalink
[AGNTLOG-56 ] Create Auditor Component (#33680)
Browse files Browse the repository at this point in the history
  • Loading branch information
soberpeach authored Feb 10, 2025
1 parent ab83df9 commit eb0eb80
Show file tree
Hide file tree
Showing 16 changed files with 942 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
/comp/forwarder/eventplatform @DataDog/agent-log-pipelines
/comp/forwarder/eventplatformreceiver @DataDog/agent-log-pipelines
/comp/forwarder/orchestrator @DataDog/agent-log-pipelines
/comp/logs/auditor @DataDog/agent-metrics-logs
/comp/metadata/haagent @DataDog/ndm-core
/comp/metadata/packagesigning @DataDog/agent-delivery
/comp/trace/etwtracer @DataDog/windows-agent
Expand Down
8 changes: 8 additions & 0 deletions comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ Package adscheduler is glue code to connect autodiscovery to the logs agent. It

Package agent contains logs agent component.

### [comp/logs/auditor](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/logs/auditor)

*Datadog Team*: agent-metrics-logs

Package auditor records the log files the agent is tracking. It tracks
filename, time last updated, offset (how far into the file the agent has
read), and tailing mode for each log file.

### [comp/logs/integrations](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/logs/integrations)

Package integrations adds a go interface for integrations to register and
Expand Down
30 changes: 30 additions & 0 deletions comp/logs/auditor/def/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package auditor records the log files the agent is tracking. It tracks
// filename, time last updated, offset (how far into the file the agent has
// read), and tailing mode for each log file.
package auditor

import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// team: agent-metrics-logs

// Component is the component type.
type Component interface {
Registry

// Start starts the auditor
Start()

// Stop stops the auditor
Stop()

// Channel returns the channel to use to communicate with the auditor or nil
// if the auditor is currently stopped.
Channel() chan *message.Payload
}
12 changes: 12 additions & 0 deletions comp/logs/auditor/def/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package auditor

// Registry holds a list of offsets.
type Registry interface {
GetOffset(identifier string) string
GetTailingMode(identifier string) string
}
21 changes: 21 additions & 0 deletions comp/logs/auditor/fx/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package fx provides the fx module for the auditor component
package fx

import (
auditorimpl "github.com/DataDog/datadog-agent/comp/logs/auditor/impl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)

// Module defines the fx options for this component
func Module() fxutil.Module {
return fxutil.Component(
fxutil.ProvideComponentConstructor(
auditorimpl.NewProvides,
),
)
}
66 changes: 66 additions & 0 deletions comp/logs/auditor/impl-none/auditor_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package noneimpl provides the noop auditor component
package noneimpl

import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// NullAuditor is an auditor that does nothing but empties the channel it
// receives messages from
type NullAuditor struct {
channel chan *message.Payload
stopChannel chan struct{}
}

// NewAuditor creates a new noop auditor comoponent
func NewAuditor() *NullAuditor {
nullAuditor := &NullAuditor{
channel: make(chan *message.Payload),
stopChannel: make(chan struct{}),
}

return nullAuditor
}

// GetOffset returns an empty string
func (a *NullAuditor) GetOffset(_ string) string {
return ""
}

// GetTailingMode returns an empty string
func (a *NullAuditor) GetTailingMode(_ string) string {
return ""
}

// Start starts the NullAuditor main loop
func (a *NullAuditor) Start() {
go a.run()
}

// Stop stops the NullAuditor main loop
func (a *NullAuditor) Stop() {
a.stopChannel <- struct{}{}
}

// Channel returns the channel messages should be sent on
func (a *NullAuditor) Channel() chan *message.Payload {
return a.channel
}

// run is the main run loop for the null auditor
func (a *NullAuditor) run() {
for {
select {
case <-a.channel:
// drain the channel, we're not doing anything with the channel
case <-a.stopChannel:
close(a.channel)
return
}
}
}
46 changes: 46 additions & 0 deletions comp/logs/auditor/impl/api_v0.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package auditorimpl

import (
"encoding/json"
"fmt"
"strconv"
"time"
)

// v0: In the first version of the auditor, we were only recording file offsets

type registryEntryV0 struct {
Path string
Timestamp time.Time
Offset int64
}

type jsonRegistryV0 struct {
Version int
Registry map[string]registryEntryV0
}

func unmarshalRegistryV0(b []byte) (map[string]*RegistryEntry, error) {
var r jsonRegistryV0
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
switch {
case entry.Offset > 0:
// from v0 to v1 and further, we also prefixed path with file:
newIdentifier := fmt.Sprintf("file:%s", identifier)
registry[newIdentifier] = &RegistryEntry{LastUpdated: entry.Timestamp, Offset: strconv.FormatInt(entry.Offset, 10)}
default:
// no valid offset for this entry
}
}
return registry, nil
}
36 changes: 36 additions & 0 deletions comp/logs/auditor/impl/api_v0_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package auditorimpl

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAuditorUnmarshalRegistryV0(t *testing.T) {
input := `{
"Registry": {
"path1.log": {
"Offset": 1,
"Path": "path1.log",
"Timestamp": "2006-01-12T01:01:01.000000001Z"
},
"path2.log": {
"Offset": 2,
"Path": "path2.log",
"Timestamp": "2006-01-12T01:01:02.000000001Z"
}
},
"Version": 0
}`
r, err := unmarshalRegistryV0([]byte(input))
assert.Nil(t, err)
assert.Equal(t, "1", r["file:path1.log"].Offset)
assert.Equal(t, 1, r["file:path1.log"].LastUpdated.Second())
assert.Equal(t, "2", r["file:path2.log"].Offset)
assert.Equal(t, 2, r["file:path2.log"].LastUpdated.Second(), 2)
}
45 changes: 45 additions & 0 deletions comp/logs/auditor/impl/api_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package auditorimpl

import (
"encoding/json"
"strconv"
"time"
)

// v1: In the second version of the auditor, Timestamp became LastUpdated and we added Timestamp to record container offsets.

type registryEntryV1 struct {
Timestamp string
Offset int64
LastUpdated time.Time
}

type jsonRegistryV1 struct {
Version int
Registry map[string]registryEntryV1
}

func unmarshalRegistryV1(b []byte) (map[string]*RegistryEntry, error) {
var r jsonRegistryV1
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
switch {
case entry.Offset > 0:
registry[identifier] = &RegistryEntry{LastUpdated: entry.LastUpdated, Offset: strconv.FormatInt(entry.Offset, 10)}
case entry.Timestamp != "":
registry[identifier] = &RegistryEntry{LastUpdated: entry.LastUpdated, Offset: entry.Timestamp}
default:
// no valid offset for this entry
}
}
return registry, nil
}
37 changes: 37 additions & 0 deletions comp/logs/auditor/impl/api_v1_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package auditorimpl

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAuditorUnmarshalRegistryV1(t *testing.T) {
input := `{
"Registry": {
"path1.log": {
"Offset": 1,
"LastUpdated": "2006-01-12T01:01:01.000000001Z",
"Timestamp": ""
},
"path2.log": {
"Offset": 0,
"LastUpdated": "2006-01-12T01:01:02.000000001Z",
"Timestamp": "2006-01-12T01:01:03.000000001Z"
}
},
"Version": 1
}`
r, err := unmarshalRegistryV1([]byte(input))
assert.Nil(t, err)
assert.Equal(t, "1", r["path1.log"].Offset)
assert.Equal(t, 1, r["path1.log"].LastUpdated.Second())

assert.Equal(t, "2006-01-12T01:01:03.000000001Z", r["path2.log"].Offset)
assert.Equal(t, 2, r["path2.log"].LastUpdated.Second(), 2)
}
27 changes: 27 additions & 0 deletions comp/logs/auditor/impl/api_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package auditorimpl

import (
"encoding/json"
)

// v2: In the third version of the auditor, we dropped Timestamp and used a generic Offset instead to reinforce the separation of concerns
// between the auditor and log sources.

func unmarshalRegistryV2(b []byte) (map[string]*RegistryEntry, error) {
var r JSONRegistry
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
newEntry := entry
registry[identifier] = &newEntry
}
return registry, nil
}
36 changes: 36 additions & 0 deletions comp/logs/auditor/impl/api_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package auditorimpl

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAuditorUnmarshalRegistryV2(t *testing.T) {
input := `{
"Registry": {
"path1.log": {
"Offset": "1",
"LastUpdated": "2006-01-12T01:01:01.000000001Z"
},
"path2.log": {
"Offset": "2006-01-12T01:01:03.000000001Z",
"LastUpdated": "2006-01-12T01:01:02.000000001Z"
}
},
"Version": 2
}`
r, err := unmarshalRegistryV2([]byte(input))
assert.Nil(t, err)

assert.Equal(t, "1", r["path1.log"].Offset)
assert.Equal(t, 1, r["path1.log"].LastUpdated.Second())

assert.Equal(t, "2006-01-12T01:01:03.000000001Z", r["path2.log"].Offset)
assert.Equal(t, 2, r["path2.log"].LastUpdated.Second())
}
Loading

0 comments on commit eb0eb80

Please sign in to comment.