From fa67a6668bd729081e80e252c129a7082cdbd02e Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 24 Jun 2020 15:57:25 -0400 Subject: [PATCH] fix races --- plugins/common/shim/config.go | 7 +--- plugins/common/shim/goshim.go | 17 ++++++---- plugins/common/shim/input.go | 6 ++-- plugins/common/shim/input_test.go | 32 +++++++++--------- plugins/common/shim/output.go | 6 ++-- plugins/common/shim/output_test.go | 10 +++--- plugins/common/shim/processor.go | 4 +-- plugins/common/shim/processor_test.go | 13 ++++---- plugins/inputs/execd/shim/goshim.go | 35 ++++++++++---------- plugins/inputs/execd/shim/shim_posix_test.go | 5 +-- plugins/inputs/execd/shim/shim_test.go | 34 ++++++++++--------- plugins/processors/execd/execd.go | 12 +++---- 12 files changed, 88 insertions(+), 93 deletions(-) diff --git a/plugins/common/shim/config.go b/plugins/common/shim/config.go index a581b3d7a8356..33ae8ab2cfb55 100644 --- a/plugins/common/shim/config.go +++ b/plugins/common/shim/config.go @@ -72,12 +72,7 @@ func LoadConfig(filePath *string) (loaded loadedConfig, err error) { return loadedConfig{}, err } - loadedConf, err := createPluginsWithTomlConfig(md, conf) - - if len(md.Undecoded()) > 0 { - fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded()) - } - return loadedConf, err + return createPluginsWithTomlConfig(md, conf) } func expandEnvVars(contents []byte) string { diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go index 7497fb1074743..2490967eebb48 100644 --- a/plugins/common/shim/goshim.go +++ b/plugins/common/shim/goshim.go @@ -17,11 +17,8 @@ import ( type empty struct{} var ( - stdin io.Reader = os.Stdin - stdout io.Writer = os.Stdout - stderr io.Writer = os.Stderr - forever = 100 * 365 * 24 * time.Hour - envVarEscaper = strings.NewReplacer( + forever = 100 * 365 * 24 * time.Hour + envVarEscaper = strings.NewReplacer( `"`, `\"`, `\`, `\\`, ) @@ -40,6 +37,11 @@ type Shim struct { Processor telegraf.StreamingProcessor Output telegraf.Output + // streams + stdin io.Reader + stdout io.Writer + stderr io.Writer + // outgoing metric channel metricCh chan telegraf.Metric @@ -51,6 +53,9 @@ type Shim struct { func New() *Shim { return &Shim{ metricCh: make(chan telegraf.Metric, 1), + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, } } @@ -105,7 +110,7 @@ func (s *Shim) writeProcessedMetrics() error { return fmt.Errorf("failed to serialize metric: %s", err) } // Write this to stdout - fmt.Fprint(stdout, string(b)) + fmt.Fprint(s.stdout, string(b)) } } } diff --git a/plugins/common/shim/input.go b/plugins/common/shim/input.go index 7e4ec89ddaf20..006f2ad046226 100644 --- a/plugins/common/shim/input.go +++ b/plugins/common/shim/input.go @@ -57,7 +57,7 @@ func (s *Shim) RunInput(pollInterval time.Duration) error { wg.Done() }() - scanner := bufio.NewScanner(stdin) + scanner := bufio.NewScanner(s.stdin) for scanner.Scan() { // push a non-blocking message to trigger metric collection. s.pushCollectMetricsRequest() @@ -85,11 +85,11 @@ func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc tel return case <-s.gatherPromptCh: if err := input.Gather(acc); err != nil { - fmt.Fprintf(stderr, "failed to gather metrics: %s\n", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err) } case <-t.C: if err := input.Gather(acc); err != nil { - fmt.Fprintf(stderr, "failed to gather metrics: %s\n", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err) } } } diff --git a/plugins/common/shim/input_test.go b/plugins/common/shim/input_test.go index d2550f29caaab..709ac79ef4fdc 100644 --- a/plugins/common/shim/input_test.go +++ b/plugins/common/shim/input_test.go @@ -2,7 +2,6 @@ package shim import ( "bufio" - "bytes" "io" "io/ioutil" "strings" @@ -15,19 +14,16 @@ import ( ) func TestInputShimTimer(t *testing.T) { - stdoutBytes := bytes.NewBufferString("") - stdout = stdoutBytes + stdoutReader, stdoutWriter := io.Pipe() - stdin, _ = io.Pipe() // hold the stdin pipe open + stdin, _ := io.Pipe() // hold the stdin pipe open - metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) + metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil) <-metricProcessed - for stdoutBytes.Len() == 0 { - time.Sleep(10 * time.Millisecond) - } - - out := string(stdoutBytes.Bytes()) + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) require.Contains(t, out, "\n") metricLine := strings.Split(out, "\n")[0] require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) @@ -37,10 +33,7 @@ func TestInputShimStdinSignalingWorks(t *testing.T) { stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() - stdin = stdinReader - stdout = stdoutWriter - - metricProcessed, exited := runInputPlugin(t, 40*time.Second) + metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil) stdinWriter.Write([]byte("\n")) @@ -57,7 +50,7 @@ func TestInputShimStdinSignalingWorks(t *testing.T) { <-exited } -func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { +func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) { metricProcessed = make(chan bool, 1) exited = make(chan bool, 1) inp := &testInput{ @@ -65,6 +58,15 @@ func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan } shim := New() + if stdin != nil { + shim.stdin = stdin + } + if stdout != nil { + shim.stdout = stdout + } + if stderr != nil { + shim.stderr = stderr + } shim.AddInput(inp) go func() { err := shim.Run(interval) diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index e42a21dc1ea30..75fcf5a5fa3cb 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -36,14 +36,14 @@ func (s *Shim) RunOutput() error { var m telegraf.Metric - scanner := bufio.NewScanner(stdin) + scanner := bufio.NewScanner(s.stdin) for scanner.Scan() { m, err = parser.ParseLine(scanner.Text()) if err != nil { - fmt.Fprintf(stderr, "Failed to parse metric: %s\b", err) + fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err) } if err = s.Output.Write([]telegraf.Metric{m}); err != nil { - fmt.Fprintf(stderr, "Failed to write metric: %s\b", err) + fmt.Fprintf(s.stderr, "Failed to write metric: %s\b", err) } } diff --git a/plugins/common/shim/output_test.go b/plugins/common/shim/output_test.go index 721d0a225a1e0..5a74d59edb240 100644 --- a/plugins/common/shim/output_test.go +++ b/plugins/common/shim/output_test.go @@ -16,20 +16,18 @@ import ( func TestOutputShim(t *testing.T) { o := &testOutput{} + stdinReader, stdinWriter := io.Pipe() + s := New() + s.stdin = stdinReader err := s.AddOutput(o) require.NoError(t, err) - stdinReader, stdinWriter := io.Pipe() - - // inject test into shim - stdin = stdinReader - wg := sync.WaitGroup{} wg.Add(1) go func() { - err = s.RunOutput() + err := s.RunOutput() require.NoError(t, err) wg.Done() }() diff --git a/plugins/common/shim/processor.go b/plugins/common/shim/processor.go index 38581f1a2c093..7de28af2df1f5 100644 --- a/plugins/common/shim/processor.go +++ b/plugins/common/shim/processor.go @@ -52,11 +52,11 @@ func (s *Shim) RunProcessor() error { wg.Done() }() - scanner := bufio.NewScanner(stdin) + scanner := bufio.NewScanner(s.stdin) for scanner.Scan() { m, err := parser.ParseLine(scanner.Text()) if err != nil { - fmt.Fprintf(stderr, "Failed to parse metric: %s\b", err) + fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err) } s.Processor.Add(m, acc) } diff --git a/plugins/common/shim/processor_test.go b/plugins/common/shim/processor_test.go index 69c672f1f0ff8..b4cf01ae0236f 100644 --- a/plugins/common/shim/processor_test.go +++ b/plugins/common/shim/processor_test.go @@ -18,22 +18,21 @@ import ( func TestProcessorShim(t *testing.T) { p := &testProcessor{} - s := New() - err := s.AddProcessor(p) - require.NoError(t, err) - stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() + s := New() // inject test into shim - stdin = stdinReader - stdout = stdoutWriter + s.stdin = stdinReader + s.stdout = stdoutWriter + err := s.AddProcessor(p) + require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go func() { - err = s.RunProcessor() + err := s.RunProcessor() require.NoError(t, err) wg.Done() }() diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index d38f17ffdb665..1ea794fb6877d 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -24,10 +24,8 @@ import ( type empty struct{} var ( - stdout io.Writer = os.Stdout - stdin io.Reader = os.Stdin - forever = 100 * 365 * 24 * time.Hour - envVarEscaper = strings.NewReplacer( + forever = 100 * 365 * 24 * time.Hour + envVarEscaper = strings.NewReplacer( `"`, `\"`, `\`, `\\`, ) @@ -45,11 +43,19 @@ type Shim struct { Inputs []telegraf.Input gatherPromptChans []chan empty metricCh chan telegraf.Metric + + stdin io.Reader + stdout io.Writer + stderr io.Writer } // New creates a new shim interface func New() *Shim { - return &Shim{} + return &Shim{ + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, + } } // AddInput adds the input to the shim. Later calls to Run() will run this input. @@ -108,7 +114,7 @@ func (s *Shim) Run(pollInterval time.Duration) error { s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh) wg.Add(1) // one per input go func(input telegraf.Input) { - startGathering(ctx, input, acc, gatherPromptCh, pollInterval) + s.startGathering(ctx, input, acc, gatherPromptCh, pollInterval) if serviceInput, ok := input.(telegraf.ServiceInput); ok { serviceInput.Stop() } @@ -141,7 +147,7 @@ loop: return fmt.Errorf("failed to serialize metric: %s", err) } // Write this to stdout - fmt.Fprint(stdout, string(b)) + fmt.Fprint(s.stdout, string(b)) } } @@ -163,7 +169,7 @@ func (s *Shim) stdinCollectMetricsPrompt(ctx context.Context, cancel context.Can close(collectMetricsPrompt) }() - scanner := bufio.NewScanner(stdin) + scanner := bufio.NewScanner(s.stdin) // for every line read from stdin, make sure we're not supposed to quit, // then push a message on to the collectMetricsPrompt for scanner.Scan() { @@ -201,7 +207,7 @@ func (s *Shim) collectMetrics(ctx context.Context) { } } -func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) { +func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) { if pollInterval == PollIntervalDisabled { return // don't poll } @@ -218,11 +224,11 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu return case <-gatherPromptCh: if err := input.Gather(acc); err != nil { - fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err) } case <-t.C: if err := input.Gather(acc); err != nil { - fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err) } } } @@ -269,12 +275,7 @@ func LoadConfig(filePath *string) ([]telegraf.Input, error) { return nil, err } - loadedInputs, err := loadConfigIntoInputs(md, conf.Inputs) - - if len(md.Undecoded()) > 0 { - fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded()) - } - return loadedInputs, err + return loadConfigIntoInputs(md, conf.Inputs) } func expandEnvVars(contents []byte) string { diff --git a/plugins/inputs/execd/shim/shim_posix_test.go b/plugins/inputs/execd/shim/shim_posix_test.go index 00e5dc6c3f595..873ef89bf655f 100644 --- a/plugins/inputs/execd/shim/shim_posix_test.go +++ b/plugins/inputs/execd/shim/shim_posix_test.go @@ -23,12 +23,9 @@ func TestShimUSR1SignalingWorks(t *testing.T) { stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() - stdin = stdinReader - stdout = stdoutWriter - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - metricProcessed, exited := runInputPlugin(t, 20*time.Minute) + metricProcessed, exited := runInputPlugin(t, 20*time.Minute, stdinReader, stdoutWriter, nil) // signal USR1 to yourself. pid := os.Getpid() diff --git a/plugins/inputs/execd/shim/shim_test.go b/plugins/inputs/execd/shim/shim_test.go index 2a31e5adcbd01..387ccfe5cbbf8 100644 --- a/plugins/inputs/execd/shim/shim_test.go +++ b/plugins/inputs/execd/shim/shim_test.go @@ -2,7 +2,6 @@ package shim import ( "bufio" - "bytes" "io" "os" "strings" @@ -16,20 +15,16 @@ import ( ) func TestShimWorks(t *testing.T) { - stdoutBytes := bytes.NewBufferString("") - stdout = stdoutBytes + stdoutReader, stdoutWriter := io.Pipe() - stdin, _ = io.Pipe() // hold the stdin pipe open + stdin, _ := io.Pipe() // hold the stdin pipe open - metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) + metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil) <-metricProcessed - for stdoutBytes.Len() == 0 { - t.Log("Waiting for bytes available in stdout") - time.Sleep(10 * time.Millisecond) - } - - out := string(stdoutBytes.Bytes()) + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) require.Contains(t, out, "\n") metricLine := strings.Split(out, "\n")[0] require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) @@ -39,10 +34,7 @@ func TestShimStdinSignalingWorks(t *testing.T) { stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() - stdin = stdinReader - stdout = stdoutWriter - - metricProcessed, exited := runInputPlugin(t, 40*time.Second) + metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil) stdinWriter.Write([]byte("\n")) @@ -61,7 +53,7 @@ func TestShimStdinSignalingWorks(t *testing.T) { <-exited } -func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { +func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) { metricProcessed = make(chan bool, 10) exited = make(chan bool) inp := &testInput{ @@ -69,6 +61,16 @@ func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan } shim := New() + if stdin != nil { + shim.stdin = stdin + } + if stdout != nil { + shim.stdout = stdout + } + if stderr != nil { + shim.stderr = stderr + } + shim.AddInput(inp) go func() { err := shim.Run(interval) diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index 41cf441388350..829b15a7405af 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -87,21 +87,17 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { return nil } -func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) { +func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) error { b, err := e.serializer.Serialize(m) if err != nil { - log.Println(fmt.Errorf("Metric serializing error: %s", err)) - return + return fmt.Errorf("Metric serializing error: %s", err) } - // if osStdin, ok := e.process.Stdin.(*os.File); ok { - // osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) - // } _, err = e.process.Stdin.Write(b) if err != nil { - log.Println(fmt.Errorf("Error writing to process stdin: %s", err)) - return + return fmt.Errorf("Error writing to process stdin: %s", err) } + return nil } func (e *Execd) Stop() error {