Skip to content

Commit

Permalink
feat(exec, execd): add an option to pass a custom environment to thei…
Browse files Browse the repository at this point in the history
…r child process (#11049)
  • Loading branch information
an-mmx authored and MyaLongmire committed Jul 6, 2022
1 parent ddab019 commit 7a0e5e4
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 29 deletions.
9 changes: 8 additions & 1 deletion internal/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync"
"sync/atomic"
Expand All @@ -26,13 +27,14 @@ type Process struct {

name string
args []string
envs []string
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}

// New creates a new process wrapper
func New(command []string) (*Process, error) {
func New(command []string, envs []string) (*Process, error) {
if len(command) == 0 {
return nil, errors.New("no command")
}
Expand All @@ -41,6 +43,7 @@ func New(command []string) (*Process, error) {
RestartDelay: 5 * time.Second,
name: command[0],
args: []string{},
envs: envs,
}

if len(command) > 1 {
Expand Down Expand Up @@ -85,6 +88,10 @@ func (p *Process) Stop() {
func (p *Process) cmdStart() error {
p.Cmd = exec.Command(p.name, p.args...)

if len(p.envs) > 0 {
p.Cmd.Env = append(os.Environ(), p.envs...)
}

var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestRestartingRebindsPipes(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

p, err := New([]string{exe, "-external"})
p, err := New([]string{exe, "-external"}, []string{"INTERNAL_PROCESS_MODE=application"})
p.RestartDelay = 100 * time.Nanosecond
p.Log = testutil.Logger{}
require.NoError(t, err)
Expand Down Expand Up @@ -62,7 +62,8 @@ var external = flag.Bool("external", false,

func TestMain(m *testing.M) {
flag.Parse()
if *external {
runMode := os.Getenv("INTERNAL_PROCESS_MODE")
if *external && runMode == "application" {
externalProcess()
os.Exit(0)
}
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ This plugin can be used to poll for custom metrics from any source.
"/tmp/collect_*.sh"
]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Timeout for each command to complete.
timeout = "5s"

Expand Down Expand Up @@ -55,7 +61,7 @@ It can be paired with the following configuration and will be run at the `interv

### My script works when I run it by hand, but not when Telegraf is running as a service

This may be related to the Telegraf service running as a different user. The
This may be related to the Telegraf service running as a different user. The
official packages run Telegraf as the `telegraf` user and group on Linux
systems.

Expand Down
17 changes: 12 additions & 5 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"os"
osExec "os/exec"
"path/filepath"
"runtime"
Expand All @@ -24,9 +25,10 @@ import (
const MaxStderrBytes int = 512

type Exec struct {
Commands []string `toml:"commands"`
Command string `toml:"command"`
Timeout config.Duration `toml:"timeout"`
Commands []string `toml:"commands"`
Command string `toml:"command"`
Environment []string `toml:"environment"`
Timeout config.Duration `toml:"timeout"`

parser parsers.Parser

Expand All @@ -42,13 +44,14 @@ func NewExec() *Exec {
}

type Runner interface {
Run(string, time.Duration) ([]byte, []byte, error)
Run(string, []string, time.Duration) ([]byte, []byte, error)
}

type CommandRunner struct{}

func (c CommandRunner) Run(
command string,
environments []string,
timeout time.Duration,
) ([]byte, []byte, error) {
splitCmd, err := shellquote.Split(command)
Expand All @@ -58,6 +61,10 @@ func (c CommandRunner) Run(

cmd := osExec.Command(splitCmd[0], splitCmd[1:]...)

if len(environments) > 0 {
cmd.Env = append(os.Environ(), environments...)
}

var (
out bytes.Buffer
stderr bytes.Buffer
Expand Down Expand Up @@ -120,7 +127,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
defer wg.Done()
_, isNagios := e.parser.(*nagios.NagiosParser)

out, errbuf, runErr := e.runner.Run(command, time.Duration(e.Timeout))
out, errbuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout))
if !isNagios && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
acc.AddError(err)
Expand Down
19 changes: 18 additions & 1 deletion plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newRunnerMock(out []byte, errout []byte, err error) Runner {
}
}

func (r runnerMock) Run(_ string, _ time.Duration) ([]byte, []byte, error) {
func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err
}

Expand Down Expand Up @@ -191,6 +191,23 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
acc.AssertContainsFields(t, "metric", fields)
}

func TestExecCommandWithEnv(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
e := NewExec()
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
e.Environment = []string{"METRIC_NAME=metric_value"}
e.SetParser(parser)

var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)

fields := map[string]interface{}{
"value": "metric_value",
}
acc.AssertContainsFields(t, "metric", fields)
}

func TestTruncate(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ STDERR from the process will be relayed to Telegraf as errors in the logs.
## NOTE: process and each argument should each be their own string
command = ["telegraf-smartctl", "-d", "/dev/sda"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Define how the process is signaled on each collection interval.
## Valid values are:
## "none" : Do not signal anything. (Recommended for service inputs)
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
Signal string `toml:"signal"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger `toml:"-"`
Expand All @@ -35,7 +36,7 @@ func (e *Execd) SetParser(parser parsers.Parser) {
func (e *Execd) Start(acc telegraf.Accumulator) error {
e.acc = acc
var err error
e.process, err = process.New(e.Command)
e.process, err = process.New(e.Command, e.Environment)
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions plugins/inputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestSettingConfigWorks(t *testing.T) {
cfg := `
[[inputs.execd]]
command = ["a", "b", "c"]
environment = ["d=e", "f=1"]
restart_delay = "1m"
signal = "SIGHUP"
`
Expand All @@ -35,6 +36,7 @@ func TestSettingConfigWorks(t *testing.T) {
inp, ok := conf.Inputs[0].Input.(*Execd)
require.True(t, ok)
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
require.EqualValues(t, []string{"d=e", "f=1"}, inp.Environment)
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
require.EqualValues(t, "SIGHUP", inp.Signal)
}
Expand All @@ -48,6 +50,7 @@ func TestExternalInputWorks(t *testing.T) {

e := &Execd{
Command: []string{exe, "-counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
parser: influxParser,
Signal: "STDIN",
Expand Down Expand Up @@ -152,7 +155,8 @@ var counter = flag.Bool("counter", false,

func TestMain(m *testing.M) {
flag.Parse()
if *counter {
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if *counter && runMode == "application" {
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
Expand All @@ -163,6 +167,7 @@ func TestMain(m *testing.M) {
}

func runCounterProgram() error {
envMetricName := os.Getenv("METRIC_NAME")
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
Expand All @@ -173,7 +178,7 @@ func runCounterProgram() error {

scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
m := metric.New("counter",
m := metric.New(envMetricName,
map[string]string{},
map[string]interface{}{
"count": i,
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ For better performance, consider execd, which runs continuously.
## Command to ingest metrics via stdin.
command = ["tee", "-a", "/dev/null"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Timeout for command to complete.
# timeout = "5s"

Expand Down
17 changes: 11 additions & 6 deletions plugins/outputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"runtime"
"time"
Expand All @@ -19,9 +20,10 @@ const maxStderrBytes = 512

// Exec defines the exec output plugin.
type Exec struct {
Command []string `toml:"command"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
Command []string `toml:"command"`
Environment []string `toml:"environment"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`

runner Runner
serializer serializers.Serializer
Expand Down Expand Up @@ -61,12 +63,12 @@ func (e *Exec) Write(metrics []telegraf.Metric) error {
return nil
}

return e.runner.Run(time.Duration(e.Timeout), e.Command, &buffer)
return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
}

// Runner provides an interface for running exec.Cmd.
type Runner interface {
Run(time.Duration, []string, io.Reader) error
Run(time.Duration, []string, []string, io.Reader) error
}

// CommandRunner runs a command with the ability to kill the process before the timeout.
Expand All @@ -76,8 +78,11 @@ type CommandRunner struct {
}

// Run runs the command.
func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.Reader) error {
func (c *CommandRunner) Run(timeout time.Duration, command []string, environments []string, buffer io.Reader) error {
cmd := exec.Command(command[0], command[1:]...)
if len(environments) > 0 {
cmd.Env = append(os.Environ(), environments...)
}
cmd.Stdin = buffer
var stderr bytes.Buffer
cmd.Stderr = &stderr
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ Telegraf minimum version: Telegraf 1.15.0
## NOTE: process and each argument should each be their own string
command = ["my-telegraf-output", "--some-flag", "value"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"

Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger

Expand All @@ -34,7 +35,7 @@ func (e *Execd) Init() error {

var err error

e.process, err = process.New(e.Command)
e.process, err = process.New(e.Command, e.Environment)
if err != nil {
return fmt.Errorf("error creating process %s: %w", e.Command, err)
}
Expand Down
7 changes: 5 additions & 2 deletions plugins/outputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestExternalOutputWorks(t *testing.T) {

e := &Execd{
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
serializer: influxSerializer,
Log: testutil.Logger{},
Expand Down Expand Up @@ -74,7 +75,8 @@ var testoutput = flag.Bool("testoutput", false,

func TestMain(m *testing.M) {
flag.Parse()
if *testoutput {
runMode := os.Getenv("PLUGINS_OUTPUTS_EXECD_MODE")
if *testoutput && runMode == "application" {
runOutputConsumerProgram()
os.Exit(0)
}
Expand All @@ -83,6 +85,7 @@ func TestMain(m *testing.M) {
}

func runOutputConsumerProgram() {
metricName := os.Getenv("METRIC_NAME")
parser := influx.NewStreamParser(os.Stdin)

for {
Expand All @@ -103,7 +106,7 @@ func runOutputConsumerProgram() {
os.Exit(1)
}

expected := testutil.MustMetric("cpu",
expected := testutil.MustMetric(metricName,
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
Expand Down
6 changes: 6 additions & 0 deletions plugins/processors/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Telegraf minimum version: Telegraf 1.15.0
## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Delay before the process is restarted after an unexpected termination
# restart_delay = "10s"
```
Expand Down
Loading

0 comments on commit 7a0e5e4

Please sign in to comment.