Skip to content

Commit

Permalink
fix races
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka committed Jun 24, 2020
1 parent a8dfa8e commit fa67a66
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 93 deletions.
7 changes: 1 addition & 6 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
return loadedConfig{}, err
}

loadedConf, err := createPluginsWithTomlConfig(md, conf)

if len(md.Undecoded()) > 0 {
fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded())
}
return loadedConf, err
return createPluginsWithTomlConfig(md, conf)
}

func expandEnvVars(contents []byte) string {
Expand Down
17 changes: 11 additions & 6 deletions plugins/common/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ import (
type empty struct{}

var (
stdin io.Reader = os.Stdin
stdout io.Writer = os.Stdout
stderr io.Writer = os.Stderr
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
Expand All @@ -40,6 +37,11 @@ type Shim struct {
Processor telegraf.StreamingProcessor
Output telegraf.Output

// streams
stdin io.Reader
stdout io.Writer
stderr io.Writer

// outgoing metric channel
metricCh chan telegraf.Metric

Expand All @@ -51,6 +53,9 @@ type Shim struct {
func New() *Shim {
return &Shim{
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
}
}

Expand Down Expand Up @@ -105,7 +110,7 @@ func (s *Shim) writeProcessedMetrics() error {
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
fmt.Fprint(stdout, string(b))
fmt.Fprint(s.stdout, string(b))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/common/shim/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Shim) RunInput(pollInterval time.Duration) error {
wg.Done()
}()

scanner := bufio.NewScanner(stdin)
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
Expand Down Expand Up @@ -85,11 +85,11 @@ func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc tel
return
case <-s.gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(stderr, "failed to gather metrics: %s\n", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err)
}
case <-t.C:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(stderr, "failed to gather metrics: %s\n", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err)
}
}
}
Expand Down
32 changes: 17 additions & 15 deletions plugins/common/shim/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package shim

import (
"bufio"
"bytes"
"io"
"io/ioutil"
"strings"
Expand All @@ -15,19 +14,16 @@ import (
)

func TestInputShimTimer(t *testing.T) {
stdoutBytes := bytes.NewBufferString("")
stdout = stdoutBytes
stdoutReader, stdoutWriter := io.Pipe()

stdin, _ = io.Pipe() // hold the stdin pipe open
stdin, _ := io.Pipe() // hold the stdin pipe open

metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond)
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil)

<-metricProcessed
for stdoutBytes.Len() == 0 {
time.Sleep(10 * time.Millisecond)
}

out := string(stdoutBytes.Bytes())
r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Contains(t, out, "\n")
metricLine := strings.Split(out, "\n")[0]
require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine)
Expand All @@ -37,10 +33,7 @@ func TestInputShimStdinSignalingWorks(t *testing.T) {
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()

stdin = stdinReader
stdout = stdoutWriter

metricProcessed, exited := runInputPlugin(t, 40*time.Second)
metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil)

stdinWriter.Write([]byte("\n"))

Expand All @@ -57,14 +50,23 @@ func TestInputShimStdinSignalingWorks(t *testing.T) {
<-exited
}

func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) {
func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool, 1)
exited = make(chan bool, 1)
inp := &testInput{
metricProcessed: metricProcessed,
}

shim := New()
if stdin != nil {
shim.stdin = stdin
}
if stdout != nil {
shim.stdout = stdout
}
if stderr != nil {
shim.stderr = stderr
}
shim.AddInput(inp)
go func() {
err := shim.Run(interval)
Expand Down
6 changes: 3 additions & 3 deletions plugins/common/shim/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ func (s *Shim) RunOutput() error {

var m telegraf.Metric

scanner := bufio.NewScanner(stdin)
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err = parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(stderr, "Failed to parse metric: %s\b", err)
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err)
}
if err = s.Output.Write([]telegraf.Metric{m}); err != nil {
fmt.Fprintf(stderr, "Failed to write metric: %s\b", err)
fmt.Fprintf(s.stderr, "Failed to write metric: %s\b", err)
}
}

Expand Down
10 changes: 4 additions & 6 deletions plugins/common/shim/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@ import (
func TestOutputShim(t *testing.T) {
o := &testOutput{}

stdinReader, stdinWriter := io.Pipe()

s := New()
s.stdin = stdinReader
err := s.AddOutput(o)
require.NoError(t, err)

stdinReader, stdinWriter := io.Pipe()

// inject test into shim
stdin = stdinReader

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
err = s.RunOutput()
err := s.RunOutput()
require.NoError(t, err)
wg.Done()
}()
Expand Down
4 changes: 2 additions & 2 deletions plugins/common/shim/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (s *Shim) RunProcessor() error {
wg.Done()
}()

scanner := bufio.NewScanner(stdin)
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err := parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(stderr, "Failed to parse metric: %s\b", err)
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err)
}
s.Processor.Add(m, acc)
}
Expand Down
13 changes: 6 additions & 7 deletions plugins/common/shim/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@ import (
func TestProcessorShim(t *testing.T) {
p := &testProcessor{}

s := New()
err := s.AddProcessor(p)
require.NoError(t, err)

stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()

s := New()
// inject test into shim
stdin = stdinReader
stdout = stdoutWriter
s.stdin = stdinReader
s.stdout = stdoutWriter
err := s.AddProcessor(p)
require.NoError(t, err)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
err = s.RunProcessor()
err := s.RunProcessor()
require.NoError(t, err)
wg.Done()
}()
Expand Down
35 changes: 18 additions & 17 deletions plugins/inputs/execd/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
type empty struct{}

var (
stdout io.Writer = os.Stdout
stdin io.Reader = os.Stdin
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
Expand All @@ -45,11 +43,19 @@ type Shim struct {
Inputs []telegraf.Input
gatherPromptChans []chan empty
metricCh chan telegraf.Metric

stdin io.Reader
stdout io.Writer
stderr io.Writer
}

// New creates a new shim interface
func New() *Shim {
return &Shim{}
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
}
}

// AddInput adds the input to the shim. Later calls to Run() will run this input.
Expand Down Expand Up @@ -108,7 +114,7 @@ func (s *Shim) Run(pollInterval time.Duration) error {
s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh)
wg.Add(1) // one per input
go func(input telegraf.Input) {
startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
s.startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
if serviceInput, ok := input.(telegraf.ServiceInput); ok {
serviceInput.Stop()
}
Expand Down Expand Up @@ -141,7 +147,7 @@ loop:
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
fmt.Fprint(stdout, string(b))
fmt.Fprint(s.stdout, string(b))
}
}

Expand All @@ -163,7 +169,7 @@ func (s *Shim) stdinCollectMetricsPrompt(ctx context.Context, cancel context.Can
close(collectMetricsPrompt)
}()

scanner := bufio.NewScanner(stdin)
scanner := bufio.NewScanner(s.stdin)
// for every line read from stdin, make sure we're not supposed to quit,
// then push a message on to the collectMetricsPrompt
for scanner.Scan() {
Expand Down Expand Up @@ -201,7 +207,7 @@ func (s *Shim) collectMetrics(ctx context.Context) {
}
}

func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) {
func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) {
if pollInterval == PollIntervalDisabled {
return // don't poll
}
Expand All @@ -218,11 +224,11 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu
return
case <-gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
}
case <-t.C:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
}
}
}
Expand Down Expand Up @@ -269,12 +275,7 @@ func LoadConfig(filePath *string) ([]telegraf.Input, error) {
return nil, err
}

loadedInputs, err := loadConfigIntoInputs(md, conf.Inputs)

if len(md.Undecoded()) > 0 {
fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded())
}
return loadedInputs, err
return loadConfigIntoInputs(md, conf.Inputs)
}

func expandEnvVars(contents []byte) string {
Expand Down
5 changes: 1 addition & 4 deletions plugins/inputs/execd/shim/shim_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()

stdin = stdinReader
stdout = stdoutWriter

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metricProcessed, exited := runInputPlugin(t, 20*time.Minute)
metricProcessed, exited := runInputPlugin(t, 20*time.Minute, stdinReader, stdoutWriter, nil)

// signal USR1 to yourself.
pid := os.Getpid()
Expand Down
Loading

0 comments on commit fa67a66

Please sign in to comment.