Skip to content

Commit

Permalink
initial commit for agent monitor addresses influxdata#5958
Browse files Browse the repository at this point in the history
  • Loading branch information
nicgrobler committed Jul 10, 2019
1 parent 04937d0 commit 8c8cca7
Show file tree
Hide file tree
Showing 5 changed files with 683 additions and 8 deletions.
33 changes: 31 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
Expand All @@ -29,7 +30,7 @@ func NewAgent(config *config.Config) (*Agent, error) {
}

// Run starts and runs the Agent until the context is done.
func (a *Agent) Run(ctx context.Context) error {
func (a *Agent) Run(ctx context.Context, signalsMonitorAgent chan os.Signal) error {
log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s",
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
Expand Down Expand Up @@ -57,6 +58,20 @@ func (a *Agent) Run(ctx context.Context) error {

startTime := time.Now()

// start internal monitor
log.Printf("D! [agent] Creating agent monitor")
agentMonitor, err := NewAgentMonitor(ctx, a.Config, signalsMonitorAgent, inputC)
if err != nil {
/*
we should NOT cause telegraf agent to fail here - this can be due to nonesense version string (or total lack of one - while testing etc)
log it, and die quietly
*/
log.Printf("D! [agent] agent monitor returned: " + err.Error())
} else {
log.Printf("D! [agent] Starting agent monitor")
go agentMonitor.Run() //this will look after itself from this point
}

log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
Expand Down Expand Up @@ -142,7 +157,7 @@ func (a *Agent) Run(ctx context.Context) error {
}

// Test runs the inputs once and prints the output to stdout in line protocol.
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration, signalsMonitorAgent chan os.Signal) error {
var wg sync.WaitGroup
metricC := make(chan telegraf.Metric)
nulC := make(chan telegraf.Metric)
Expand Down Expand Up @@ -182,6 +197,20 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
}
}

// start agent monitor
log.Printf("D! [agent] Creating agent monitor")
agentMonitor, err := NewAgentMonitor(ctx, a.Config, signalsMonitorAgent, metricC)
if err != nil {
/*
we should NOT cause telegraf agent to fail here - this can be due to nonesense version string (or total lack of one - while testing etc)
log it, and die quietly
*/
log.Printf("D! [agent] agent monitor returned: " + err.Error())
} else {
log.Printf("D! [agent] Starting agent monitor...")
go agentMonitor.Run() //this will look after itself from this point, and operates autonomously. main should not need to do anything else.
}

if hasServiceInputs {
log.Printf("D! [agent] Starting service inputs")
err := a.startServiceInputs(ctx, metricC)
Expand Down
321 changes: 321 additions & 0 deletions agent/agent_momitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
package agent

import (
"context"
"os"
"syscall"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func panicOnReceive(signals chan os.Signal, tick <-chan time.Time) {
for {
select {
case signals <- syscall.SIGINT:
// if we reach this, it means that something was listening on the chan, and should not have been
panic("listener found where none should exist")
case <-tick:
// we're done
return
}
}
}

func TestExtractNumeric(t *testing.T) {
d, ok, err := extractNumeric("v1")
require.Nil(t, err)
require.Equal(t, true, ok)
require.Equal(t, int64(1), d)

d, ok, err = extractNumeric("1Garbage2")
require.Nil(t, err)
require.Equal(t, true, ok)
require.Equal(t, int64(1), d)

d, ok, err = extractNumeric("mon~2keyGa-rbage.")
require.Nil(t, err)
require.Equal(t, false, ok)
require.Equal(t, int64(0), d)

d, ok, err = extractNumeric("13~123bvdbc1")
require.Nil(t, err)
require.Equal(t, true, ok)
require.Equal(t, int64(13), d)
}

func TestGetNumericVersion(t *testing.T) {
numerics, err := getNumericVersion("v1.2.3")
require.Nil(t, err)
require.Equal(t, []int64{1, 2, 3}, numerics)

numerics, err = getNumericVersion("1.2.3~123bvdbc1")
require.Nil(t, err)
require.Equal(t, []int64{1, 2, 3}, numerics)

numerics, err = getNumericVersion("1.1.9-alpha")
require.Nil(t, err)
require.Equal(t, []int64{1, 1, 9}, numerics)

numerics, err = getNumericVersion("1.2-banana")
require.NotNil(t, err)
require.Equal(t, "version string is not of expected semantic format: 1.2-banana", err.Error())
require.Equal(t, []int64{}, numerics)

numerics, err = getNumericVersion("1.2")
require.NotNil(t, err)
require.Equal(t, "version string is not of expected semantic format: 1.2", err.Error())
require.Equal(t, []int64{}, numerics)

numerics, err = getNumericVersion("1.banana.2")
require.NotNil(t, err)
require.Equal(t, "version string contains nonsensical character sequence: 1.banana.2", err.Error())
require.Equal(t, []int64{}, numerics)
}

func TestAgentMetaDataCreate(t *testing.T) {
c := config.NewConfig()
outgoing := make(chan telegraf.Metric, 1)
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal)

_, err := NewAgentMonitor(ctx, c, signals, outgoing)
// no version string exists yet, so object instantiation should have failed
require.NotNil(t, err)

// so let's add the version, and retry
c.Agent.Version = "1.20.30"
_, err = NewAgentMonitor(ctx, c, signals, outgoing)
require.Nil(t, err)
cancel()
}

func TestAgentMetaDataRunNoJitter(t *testing.T) {
c := config.NewConfig()
outgoing := make(chan telegraf.Metric, 1)
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal)
c.Agent.Version = "1.20.30"
agentMonitor, err := NewAgentMonitor(ctx, c, signals, outgoing)
// no version string exists yet, so object instantiation should work
require.Nil(t, err)

// start the monitor
go agentMonitor.Run()

// grab the "startup signal" - sent on startup
signal := <-outgoing
require.Contains(t, "agent_statechange", signal.Name())
var expected = map[string]interface{}{
"state": "started",
}
actual := signal.Fields()
require.Equal(t, expected, actual)

// grab the version it just sent on startup
metric := <-outgoing
require.Contains(t, "agent_meta_data", metric.Name())

// send it an interupt
signals <- syscall.SIGINT

// grab the signal
metric = <-outgoing
require.Contains(t, "agent_statechange", metric.Name())

// cancel the agent, which should cause the agentmonitor to quietly die
cancel()

// sleep to give cancel() time to work
time.Sleep(500 * time.Millisecond)

// verify that it is indeed dead by trying to send another signal to it - which should panic
ticker := time.NewTicker(100 * time.Millisecond)
assert.NotPanics(t, func() { panicOnReceive(signals, ticker.C) })
ticker.Stop()

}
func TestAgentMetaDataVersion(t *testing.T) {
c := config.NewConfig()
outgoing := make(chan telegraf.Metric, 1)
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal)
// invalid string
c.Agent.Version = "1.banana.orangejuice"
agentMonitor, err := NewAgentMonitor(ctx, c, signals, outgoing)
require.NotNil(t, err)

// now with a valid version
c.Agent.Version = "1.20.30"
agentMonitor, err = NewAgentMonitor(ctx, c, signals, outgoing)
require.Nil(t, err)

// change the jitter value to be crazy short - allowing consistent tests as must be shorter than ticker
agentMonitor.jitter = time.Duration(10 * time.Millisecond)

// start the monitor
go agentMonitor.Run()

// grab the startup signal - sent on startup
signal := <-outgoing
require.Contains(t, "agent_statechange", signal.Name())
var expected = map[string]interface{}{
"state": "started",
}
actual := signal.Fields()
require.Equal(t, expected, actual)

// grab the version it just sent on startup
metric := <-outgoing
require.Contains(t, "agent_meta_data", metric.Name())
expected = map[string]interface{}{
"version_string": "1.20.30",
"major_version": int64(1),
"minor_version": int64(20),
"patch_version": int64(30),
"number_inputs": int64(0),
"number_outputs": int64(0),
}
actual = metric.Fields()
require.Equal(t, expected, actual)

// cancel the context, which should cause the agentmonitor to quietly die
cancel()

// sleep to give cancel() time to work
time.Sleep(500 * time.Millisecond)

// verify that it is indeed dead by trying to send another signal to it - which should panic
ticker := time.NewTicker(100 * time.Millisecond)
assert.NotPanics(t, func() { panicOnReceive(signals, ticker.C) })
ticker.Stop()

}

func TestAgentMetaDataEmptyVersion(t *testing.T) {
c := config.NewConfig()
outgoing := make(chan telegraf.Metric, 1)
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal)
// empty string
c.Agent.Version = ""
// confirm that default of not to ignore empty version string works as expected
agentMonitor, err := NewAgentMonitor(ctx, c, signals, outgoing)
require.NotNil(t, err)

// now with the setting changed to ignore
c.Agent.IgnoreInvalidVersion = true
agentMonitor, err = NewAgentMonitor(ctx, c, signals, outgoing)
require.Nil(t, err)

// change the jitter value to be crazy short - allowing consistent tests as must be shorter than ticker
agentMonitor.jitter = time.Duration(10 * time.Millisecond)

// start the monitor
go agentMonitor.Run()

// grab the startup signal - sent on startup
signal := <-outgoing
require.Contains(t, "agent_statechange", signal.Name())
var expected = map[string]interface{}{
"state": "started",
}
actual := signal.Fields()
require.Equal(t, expected, actual)

// grab the version it just sent on startup
metric := <-outgoing
require.Contains(t, "agent_meta_data", metric.Name())
expected = map[string]interface{}{
"version_string": "none",
"number_inputs": int64(0),
"number_outputs": int64(0),
}
actual = metric.Fields()
require.Equal(t, expected, actual)

// cancel the context, which should cause the agentmonitor to quietly die
cancel()

// sleep to give cancel() time to work
time.Sleep(500 * time.Millisecond)

// verify that it is indeed dead by trying to send another signal to it - which should panic
ticker := time.NewTicker(100 * time.Millisecond)
assert.NotPanics(t, func() { panicOnReceive(signals, ticker.C) })
ticker.Stop()

}

func TestAgentMetaData(t *testing.T) {
c := config.NewConfig()
outgoing := make(chan telegraf.Metric, 1)
ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal)
c.Agent.Version = "1.20.30"

// make our fake inputs here
fi := models.NewRunningInput(&fakeInput{name: "sheeps"}, &models.InputConfig{
Name: "VeryFakeRunningInput",
})

c.Inputs = append(c.Inputs, fi)

agentMonitor, err := NewAgentMonitor(ctx, c, signals, outgoing)
// no version string exists yet, so object instantiation should work
require.Nil(t, err)

// change the jitter value to be crazy short - allowing consistent tests as must be shorter than ticker
agentMonitor.jitter = time.Duration(10 * time.Millisecond)

// start the monitor
go agentMonitor.Run()

// grab the startup signal - sent on startup
signal := <-outgoing
require.Contains(t, "agent_statechange", signal.Name())
var expected = map[string]interface{}{
"state": "started",
}
actual := signal.Fields()
require.Equal(t, expected, actual)

// now grab meta data
metric := <-outgoing
require.Contains(t, "agent_meta_data", metric.Name())
expected = map[string]interface{}{
"version_string": "1.20.30",
"major_version": int64(1),
"minor_version": int64(20),
"patch_version": int64(30),
"number_inputs": int64(1),
"number_outputs": int64(0),
}
actual = metric.Fields()
require.Equal(t, expected, actual)

// cancel the context, which should cause the agentmonitor to quietly die
cancel()

// sleep to give cancel() time to work
time.Sleep(500 * time.Millisecond)

// verify that it is indeed dead by trying to send another signal to it - which should panic
ticker := time.NewTicker(100 * time.Millisecond)
assert.NotPanics(t, func() { panicOnReceive(signals, ticker.C) })
ticker.Stop()

}

type fakeInput struct {
name string
}

func (f *fakeInput) SampleConfig() string { return f.name }
func (f *fakeInput) Description() string { return "description for: " + f.name }
func (f *fakeInput) Gather(acc telegraf.Accumulator) error { return nil }
Loading

0 comments on commit 8c8cca7

Please sign in to comment.