Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AGNTLOG-56 ] Create Auditor Component #33680

Merged
merged 6 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@
/comp/forwarder/eventplatform @DataDog/agent-log-pipelines
/comp/forwarder/eventplatformreceiver @DataDog/agent-log-pipelines
/comp/forwarder/orchestrator @DataDog/agent-log-pipelines
/comp/logs/auditor @DataDog/agent-metrics-logs
/comp/metadata/haagent @DataDog/ndm-core
/comp/metadata/packagesigning @DataDog/agent-delivery
/comp/trace/etwtracer @DataDog/windows-agent
Expand Down
8 changes: 8 additions & 0 deletions comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ Package adscheduler is glue code to connect autodiscovery to the logs agent. It

Package agent contains logs agent component.

### [comp/logs/auditor](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/logs/auditor)

*Datadog Team*: agent-metrics-logs

Package auditor records the log files the agent is tracking. It tracks
filename, time last updated, offset (how far into the file the agent has
read), and tailing mode for each log file.

### [comp/logs/integrations](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/logs/integrations)

Package integrations adds a go interface for integrations to register and
Expand Down
30 changes: 30 additions & 0 deletions comp/logs/auditor/def/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package auditor records the log files the agent is tracking. It tracks
// filename, time last updated, offset (how far into the file the agent has
// read), and tailing mode for each log file.
package auditor

import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// team: agent-metrics-logs

// Component is the component type.
type Component interface {
Registry

// Start starts the auditor
Start()

// Stop stops the auditor
Stop()

// Channel returns the channel to use to communicate with the auditor or nil
// if the auditor is currently stopped.
Channel() chan *message.Payload
}
12 changes: 12 additions & 0 deletions comp/logs/auditor/def/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package auditor

// Registry holds a list of offsets.
type Registry interface {
GetOffset(identifier string) string
GetTailingMode(identifier string) string
}
21 changes: 21 additions & 0 deletions comp/logs/auditor/fx/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package fx provides the fx module for the auditor component
package fx

import (
auditorimpl "github.com/DataDog/datadog-agent/comp/logs/auditor/impl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)

// Module defines the fx options for this component
func Module() fxutil.Module {
return fxutil.Component(
fxutil.ProvideComponentConstructor(
auditorimpl.NewAuditor,
),
)
}
66 changes: 66 additions & 0 deletions comp/logs/auditor/impl-none/auditor_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package noneimpl provides the noop auditor component
package noneimpl

import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// NullAuditor is an auditor that does nothing but empties the channel it
// receives messages from
type NullAuditor struct {
channel chan *message.Payload
stopChannel chan struct{}
}

// NewAuditor creates a new noop auditor comoponent
func NewAuditor() *NullAuditor {
nullAuditor := &NullAuditor{
channel: make(chan *message.Payload),
stopChannel: make(chan struct{}),
}

return nullAuditor
}

// GetOffset returns an empty string
func (a *NullAuditor) GetOffset(_ string) string {
return ""
}

// GetTailingMode returns an empty string
func (a *NullAuditor) GetTailingMode(_ string) string {
return ""
}

// Start starts the NullAuditor main loop
func (a *NullAuditor) Start() {
go a.run()
}

// Stop stops the NullAuditor main loop
func (a *NullAuditor) Stop() {
a.stopChannel <- struct{}{}
}

// Channel returns the channel messages should be sent on
func (a *NullAuditor) Channel() chan *message.Payload {
return a.channel
}

// run is the main run loop for the null auditor
func (a *NullAuditor) run() {
for {
select {
case <-a.channel:
// drain the channel, we're not doing anything with the channel
case <-a.stopChannel:
// TODO(remy): close the message channel
soberpeach marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}
47 changes: 47 additions & 0 deletions comp/logs/auditor/impl/api_v0.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//nolint:revive // TODO(AML) Fix revive linter
soberpeach marked this conversation as resolved.
Show resolved Hide resolved
package auditorimpl

import (
"encoding/json"
"fmt"
"strconv"
"time"
)

// v0: In the first version of the auditor, we were only recording file offsets

type registryEntryV0 struct {
Path string
Timestamp time.Time
Offset int64
}

type jsonRegistryV0 struct {
Version int
Registry map[string]registryEntryV0
}

func unmarshalRegistryV0(b []byte) (map[string]*RegistryEntry, error) {
var r jsonRegistryV0
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
switch {
case entry.Offset > 0:
// from v0 to v1 and further, we also prefixed path with file:
newIdentifier := fmt.Sprintf("file:%s", identifier)
registry[newIdentifier] = &RegistryEntry{LastUpdated: entry.Timestamp, Offset: strconv.FormatInt(entry.Offset, 10)}
default:
// no valid offset for this entry
}
}
return registry, nil
}
45 changes: 45 additions & 0 deletions comp/logs/auditor/impl/api_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package auditorimpl

import (
"encoding/json"
"strconv"
"time"
)

// v1: In the second version of the auditor, Timestamp became LastUpdated and we added Timestamp to record container offsets.

type registryEntryV1 struct {
Timestamp string
Offset int64
LastUpdated time.Time
}

type jsonRegistryV1 struct {
Version int
Registry map[string]registryEntryV1
}

func unmarshalRegistryV1(b []byte) (map[string]*RegistryEntry, error) {
var r jsonRegistryV1
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
switch {
case entry.Offset > 0:
registry[identifier] = &RegistryEntry{LastUpdated: entry.LastUpdated, Offset: strconv.FormatInt(entry.Offset, 10)}
case entry.Timestamp != "":
registry[identifier] = &RegistryEntry{LastUpdated: entry.LastUpdated, Offset: entry.Timestamp}
default:
// no valid offset for this entry
}
}
return registry, nil
}
27 changes: 27 additions & 0 deletions comp/logs/auditor/impl/api_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package auditorimpl

import (
"encoding/json"
)

// v2: In the third version of the auditor, we dropped Timestamp and used a generic Offset instead to reinforce the separation of concerns
// between the auditor and log sources.

func unmarshalRegistryV2(b []byte) (map[string]*RegistryEntry, error) {
var r JSONRegistry
err := json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
registry := make(map[string]*RegistryEntry)
for identifier, entry := range r.Registry {
newEntry := entry
registry[identifier] = &newEntry
}
return registry, nil
}
Loading
Loading