From 8ad1759ce2b5f7cdadbc47235cc01cdc6485943c Mon Sep 17 00:00:00 2001 From: David McKay Date: Thu, 17 Oct 2019 11:27:27 +0100 Subject: [PATCH 1/2] feat: add -run-once option to exit after a single collection --- agent/agent.go | 40 ++++++++++++++++++++++++++++++---------- agent/agent_test.go | 26 +++++++++++++------------- cmd/telegraf/telegraf.go | 3 ++- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 863309f284fc2..98c2cdf52dc98 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -18,13 +18,15 @@ import ( // Agent runs a set of plugins. type Agent struct { - Config *config.Config + Config *config.Config + RunOnce bool } // NewAgent returns an Agent for the given Config. -func NewAgent(config *config.Config) (*Agent, error) { +func NewAgent(config *config.Config, runOnce bool) (*Agent, error) { a := &Agent{ - Config: config, + Config: config, + RunOnce: runOnce, } return a, nil } @@ -58,10 +60,12 @@ func (a *Agent) Run(ctx context.Context) error { startTime := time.Now() - log.Printf("D! [agent] Starting service inputs") - err = a.startServiceInputs(ctx, inputC) - if err != nil { - return err + if !a.RunOnce { + log.Printf("D! [agent] Starting service inputs") + err = a.startServiceInputs(ctx, inputC) + if err != nil { + return err + } } var wg sync.WaitGroup @@ -78,8 +82,10 @@ func (a *Agent) Run(ctx context.Context) error { log.Printf("E! [agent] Error running inputs: %v", err) } - log.Printf("D! [agent] Stopping service inputs") - a.stopServiceInputs() + if !a.RunOnce { + log.Printf("D! [agent] Stopping service inputs") + a.stopServiceInputs() + } close(dst) log.Printf("D! [agent] Input channel closed") @@ -272,7 +278,7 @@ func (a *Agent) runInputs( go func(input *models.RunningInput) { defer wg.Done() - if a.Config.Agent.RoundInterval { + if !a.RunOnce && a.Config.Agent.RoundInterval { err := internal.SleepContext( ctx, internal.AlignDuration(startTime, interval)) if err != nil { @@ -280,6 +286,11 @@ func (a *Agent) runInputs( } } + if a.RunOnce { + a.gatherOnce(acc, input, interval) + return + } + a.gatherOnInterval(ctx, acc, input, interval, jitter) }(input) } @@ -517,6 +528,15 @@ func (a *Agent) runOutputs( wg.Add(1) go func(output *models.RunningOutput) { defer wg.Done() + + if !a.RunOnce && !a.Config.Agent.RoundInterval { + err := internal.SleepContext( + ctx, internal.AlignDuration(startTime, interval)) + if err != nil { + return + } + } + a.flushLoop(ctx, startTime, output, interval, jitter) }(output) } diff --git a/agent/agent_test.go b/agent/agent_test.go index c822a236b3084..abfe21716d964 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,7 +14,7 @@ import ( func TestAgent_OmitHostname(t *testing.T) { c := config.NewConfig() c.Agent.OmitHostname = true - _, err := NewAgent(c) + _, err := NewAgent(c, false) assert.NoError(t, err) assert.NotContains(t, c.Tags, "host") } @@ -24,35 +24,35 @@ func TestAgent_LoadPlugin(t *testing.T) { c.InputFilters = []string{"mysql"} err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ := NewAgent(c) + a, _ := NewAgent(c, false) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 0, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "redis"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 2, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo", "redis", "bar"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 2, len(a.Config.Inputs)) } @@ -61,35 +61,35 @@ func TestAgent_LoadOutput(t *testing.T) { c.OutputFilters = []string{"influxdb"} err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ := NewAgent(c) + a, _ := NewAgent(c, false) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"kafka"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 1, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 0, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() @@ -97,14 +97,14 @@ func TestAgent_LoadOutput(t *testing.T) { err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) assert.Equal(t, 3, len(c.Outputs)) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c) + a, _ = NewAgent(c, false) assert.Equal(t, 3, len(a.Config.Outputs)) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index c1f7344da1717..cb96e91a888c0 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)") var fPlugins = flag.String("plugin-directory", "", "path to directory containing external plugins") +var fRunOnce = flag.Bool("run-once", false, "collect metrics from inputs once and exit") var ( version string @@ -151,7 +152,7 @@ func runAgent(ctx context.Context, c.Agent.Interval.Duration) } - ag, err := agent.NewAgent(c) + ag, err := agent.NewAgent(c, *fRunOnce) if err != nil { return err } From cf5ff88c70b212f056ec6c77bc5e8a0a163eab61 Mon Sep 17 00:00:00 2001 From: David McKay Date: Fri, 24 Apr 2020 18:05:08 +0100 Subject: [PATCH 2/2] feat: separate Run and RunOnce As per PR feedback, it is preferred not to pass through trailing boolean parameters and instead isolate the behaviour for each Run and RunOnce --- agent/agent.go | 138 ++++++++++++++++++++++++++++++++------ agent/agent_test.go | 26 +++---- cmd/telegraf/telegraf.go | 12 +++- internal/config/config.go | 4 ++ 4 files changed, 144 insertions(+), 36 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 98c2cdf52dc98..86173e057cee0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -18,15 +18,13 @@ import ( // Agent runs a set of plugins. type Agent struct { - Config *config.Config - RunOnce bool + Config *config.Config } // NewAgent returns an Agent for the given Config. -func NewAgent(config *config.Config, runOnce bool) (*Agent, error) { +func NewAgent(config *config.Config) (*Agent, error) { a := &Agent{ - Config: config, - RunOnce: runOnce, + Config: config, } return a, nil } @@ -60,12 +58,10 @@ func (a *Agent) Run(ctx context.Context) error { startTime := time.Now() - if !a.RunOnce { - log.Printf("D! [agent] Starting service inputs") - err = a.startServiceInputs(ctx, inputC) - if err != nil { - return err - } + log.Printf("D! [agent] Starting service inputs") + err = a.startServiceInputs(ctx, inputC) + if err != nil { + return err } var wg sync.WaitGroup @@ -82,10 +78,8 @@ func (a *Agent) Run(ctx context.Context) error { log.Printf("E! [agent] Error running inputs: %v", err) } - if !a.RunOnce { - log.Printf("D! [agent] Stopping service inputs") - a.stopServiceInputs() - } + log.Printf("D! [agent] Stopping service inputs") + a.stopServiceInputs() close(dst) log.Printf("D! [agent] Input channel closed") @@ -148,6 +142,95 @@ func (a *Agent) Run(ctx context.Context) error { return nil } +// Run starts and runs only once +func (a *Agent) RunOnce(ctx context.Context) error { + log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+ + "Flush Interval:%s", + a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet, + a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) + + if ctx.Err() != nil { + return ctx.Err() + } + + log.Printf("D! [agent] Initializing plugins") + err := a.initPlugins() + if err != nil { + return err + } + + log.Printf("D! [agent] Connecting outputs") + err = a.connectOutputs(ctx) + if err != nil { + return err + } + + inputC := make(chan telegraf.Metric, 100) + procC := make(chan telegraf.Metric, 100) + outputC := make(chan telegraf.Metric, 100) + + startTime := time.Now() + + log.Printf("D! [agent] Starting service inputs") + err = a.startServiceInputs(ctx, inputC) + if err != nil { + return err + } + + src := inputC + dst := inputC + + err = a.runInputs(ctx, startTime, dst) + if err != nil { + log.Printf("E! [agent] Error running inputs: %v", err) + } + + log.Printf("D! [agent] Stopping service inputs") + a.stopServiceInputs() + + close(dst) + log.Printf("D! [agent] Input channel closed") + + src = dst + + if len(a.Config.Processors) > 0 { + dst = procC + + err = a.runProcessors(src, dst) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + close(dst) + log.Printf("D! [agent] Processor channel closed") + + src = dst + } + + if len(a.Config.Aggregators) > 0 { + dst = outputC + + err = a.runAggregators(startTime, src, dst) + if err != nil { + log.Printf("E! [agent] Error running aggregators: %v", err) + } + close(dst) + log.Printf("D! [agent] Output channel closed") + + src = dst + } + + err = a.runOutputs(startTime, src) + if err != nil { + log.Printf("E! [agent] Error running outputs: %v", err) + } + + log.Printf("D! [agent] Closing outputs") + a.closeOutputs() + + log.Printf("D! [agent] Stopped Successfully") + return nil +} + // Test runs the inputs once and prints the output to stdout in line protocol. func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { var wg sync.WaitGroup @@ -278,7 +361,15 @@ func (a *Agent) runInputs( go func(input *models.RunningInput) { defer wg.Done() - if !a.RunOnce && a.Config.Agent.RoundInterval { + if a.Config.Agent.RunOnce { + err := a.gatherOnce(acc, input, interval) + if err != nil { + acc.AddError(err) + } + return + } + + if a.Config.Agent.RoundInterval { err := internal.SleepContext( ctx, internal.AlignDuration(startTime, interval)) if err != nil { @@ -286,11 +377,6 @@ func (a *Agent) runInputs( } } - if a.RunOnce { - a.gatherOnce(acc, input, interval) - return - } - a.gatherOnInterval(ctx, acc, input, interval, jitter) }(input) } @@ -529,7 +615,15 @@ func (a *Agent) runOutputs( go func(output *models.RunningOutput) { defer wg.Done() - if !a.RunOnce && !a.Config.Agent.RoundInterval { + if a.Config.Agent.RunOnce { + err := a.flushOnce(output, interval, output.Write) + if err != nil { + fmt.Println("Dead") + } + return + } + + if !a.Config.Agent.RoundInterval { err := internal.SleepContext( ctx, internal.AlignDuration(startTime, interval)) if err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index abfe21716d964..c822a236b3084 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,7 +14,7 @@ import ( func TestAgent_OmitHostname(t *testing.T) { c := config.NewConfig() c.Agent.OmitHostname = true - _, err := NewAgent(c, false) + _, err := NewAgent(c) assert.NoError(t, err) assert.NotContains(t, c.Tags, "host") } @@ -24,35 +24,35 @@ func TestAgent_LoadPlugin(t *testing.T) { c.InputFilters = []string{"mysql"} err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ := NewAgent(c, false) + a, _ := NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "redis"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo", "redis", "bar"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) } @@ -61,35 +61,35 @@ func TestAgent_LoadOutput(t *testing.T) { c.OutputFilters = []string{"influxdb"} err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ := NewAgent(c, false) + a, _ := NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"kafka"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() @@ -97,14 +97,14 @@ func TestAgent_LoadOutput(t *testing.T) { err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) assert.Equal(t, 3, len(c.Outputs)) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"} err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) - a, _ = NewAgent(c, false) + a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index cb96e91a888c0..f9935e0622063 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -152,7 +152,13 @@ func runAgent(ctx context.Context, c.Agent.Interval.Duration) } - ag, err := agent.NewAgent(c, *fRunOnce) + // Command-line argument takes precedence over configuration file + if *fRunOnce { + c.Agent.RunOnce = true + + } + + ag, err := agent.NewAgent(c) if err != nil { return err } @@ -199,6 +205,10 @@ func runAgent(ctx context.Context, } } + if ag.Config.Agent.RunOnce { + return ag.RunOnce(ctx) + } + return ag.Run(ctx) } diff --git a/internal/config/config.go b/internal/config/config.go index c2335fac26bc7..0a856be279457 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -77,6 +77,7 @@ func NewConfig() *Config { FlushInterval: internal.Duration{Duration: 10 * time.Second}, LogTarget: "file", LogfileRotationMaxArchives: 5, + RunOnce: false, }, Tags: make(map[string]string), @@ -168,6 +169,9 @@ type AgentConfig struct { // If set to -1, no archives are removed. LogfileRotationMaxArchives int `toml:"logfile_rotation_max_archives"` + // Run the collection once, then exit + RunOnce bool `toml:"run_once"` + Hostname string OmitHostname bool }