Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RunOnce / Serverless Telegraf #7408

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,95 @@ func (a *Agent) Run(ctx context.Context) error {
return nil
}

// Run starts and runs only once
func (a *Agent) RunOnce(ctx context.Context) error {
log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s",
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

if ctx.Err() != nil {
return ctx.Err()
}

log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}

log.Printf("D! [agent] Connecting outputs")
err = a.connectOutputs(ctx)
if err != nil {
return err
}

inputC := make(chan telegraf.Metric, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want massive buffers here, and you're going to have a hard limit on how many metrics you can process this way. eg: a file input with 2 million lines is not going to work; it's going to fill this channel up before the rest of the chain can run. Since you're running the pieces one at a time. the nth + 1 write over n buffer size is going to block forever.

procC := make(chan telegraf.Metric, 100)
outputC := make(chan telegraf.Metric, 100)

startTime := time.Now()

log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
return err
}

src := inputC
dst := inputC

err = a.runInputs(ctx, startTime, dst)
if err != nil {
log.Printf("E! [agent] Error running inputs: %v", err)
}

log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()

close(dst)
log.Printf("D! [agent] Input channel closed")

src = dst

if len(a.Config.Processors) > 0 {
dst = procC

err = a.runProcessors(src, dst)
if err != nil {
log.Printf("E! [agent] Error running processors: %v", err)
}
close(dst)
log.Printf("D! [agent] Processor channel closed")

src = dst
}

if len(a.Config.Aggregators) > 0 {
dst = outputC

err = a.runAggregators(startTime, src, dst)
if err != nil {
log.Printf("E! [agent] Error running aggregators: %v", err)
}
close(dst)
log.Printf("D! [agent] Output channel closed")

src = dst
}

err = a.runOutputs(startTime, src)
if err != nil {
log.Printf("E! [agent] Error running outputs: %v", err)
}

log.Printf("D! [agent] Closing outputs")
a.closeOutputs()

log.Printf("D! [agent] Stopped Successfully")
return nil
}

// Test runs the inputs once and prints the output to stdout in line protocol.
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
var wg sync.WaitGroup
Expand Down Expand Up @@ -272,6 +361,14 @@ func (a *Agent) runInputs(
go func(input *models.RunningInput) {
defer wg.Done()

if a.Config.Agent.RunOnce {
err := a.gatherOnce(acc, input, interval)
if err != nil {
acc.AddError(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might move error handling inside of gatherOnce. It won't affect existing use of it and it'll clean up both callers.

}
return
}

if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
Expand Down Expand Up @@ -517,6 +614,23 @@ func (a *Agent) runOutputs(
wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()

if a.Config.Agent.RunOnce {
err := a.flushOnce(output, interval, output.Write)
if err != nil {
fmt.Println("Dead")
}
return
}

if !a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

a.flushLoop(ctx, startTime, output, interval, jitter)
}(output)
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var fPlugins = flag.String("plugin-directory", "",
"path to directory containing external plugins")
var fRunOnce = flag.Bool("run-once", false, "collect metrics from inputs once and exit")

var (
version string
Expand Down Expand Up @@ -151,6 +152,12 @@ func runAgent(ctx context.Context,
c.Agent.Interval.Duration)
}

// Command-line argument takes precedence over configuration file
if *fRunOnce {
c.Agent.RunOnce = true

}

ag, err := agent.NewAgent(c)
if err != nil {
return err
Expand Down Expand Up @@ -198,6 +205,10 @@ func runAgent(ctx context.Context,
}
}

if ag.Config.Agent.RunOnce {
return ag.RunOnce(ctx)
}

return ag.Run(ctx)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func NewConfig() *Config {
FlushInterval: internal.Duration{Duration: 10 * time.Second},
LogTarget: "file",
LogfileRotationMaxArchives: 5,
RunOnce: false,
},

Tags: make(map[string]string),
Expand Down Expand Up @@ -168,6 +169,9 @@ type AgentConfig struct {
// If set to -1, no archives are removed.
LogfileRotationMaxArchives int `toml:"logfile_rotation_max_archives"`

// Run the collection once, then exit
RunOnce bool `toml:"run_once"`

Hostname string
OmitHostname bool
}
Expand Down