Skip to content

Commit

Permalink
execd processor
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka committed Jun 24, 2020
1 parent a3aaa2f commit a8dfa8e
Show file tree
Hide file tree
Showing 27 changed files with 1,586 additions and 149 deletions.
162 changes: 162 additions & 0 deletions internal/process/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package process

import (
"context"
"fmt"
"io"
"log"
"os/exec"
"sync"
"time"
)

// Process is a long-running process manager that will restart processes if they stop.
type Process struct {
Cmd *exec.Cmd
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration

cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}

// New creates a new process wrapper
func New(command []string) (*Process, error) {
p := &Process{
RestartDelay: 5 * time.Second,
}
if len(command) > 1 {
p.Cmd = exec.Command(command[0], command[1:]...)
} else {
p.Cmd = exec.Command(command[0])
}
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("Error opening stdin pipe: %s", err)
}

p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("Error opening stdout pipe: %s", err)
}

p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("Error opening stderr pipe: %s", err)
}

return p, nil
}

// Start the process
func (p *Process) Start() error {
p.mainLoopWg.Add(1)

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

if err := p.cmdStart(); err != nil {
return err
}

go func() {
if err := p.cmdLoop(ctx); err != nil {
log.Printf("Process quit with message: %s", err.Error())
}
p.mainLoopWg.Done()
}()

return nil
}

func (p *Process) Stop() {
if p.cancel != nil {
p.cancel()
}
p.mainLoopWg.Wait()
}

func (p *Process) cmdStart() error {
log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)

if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("Error starting process: %s", err)
}

return nil
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
for {
// Use a buffered channel to ensure goroutine below can exit
// if `ctx.Done` is selected and nothing reads on `done` anymore
done := make(chan error, 1)
go func() {
done <- p.cmdWait()
}()

select {
case <-ctx.Done():
if p.Stdin != nil {
p.Stdin.Close()
gracefulStop(p.Cmd, 5*time.Second)
}
return nil
case err := <-done:
log.Printf("Process %s terminated: %s", p.Cmd.Path, err)
if isQuitting(ctx) {
return err
}
}

log.Printf("Restarting in %s...", time.Duration(p.RestartDelay))

select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(p.RestartDelay)):
// Continue the loop and restart the process
if err := p.cmdStart(); err != nil {
return err
}
}
}
}

func (p *Process) cmdWait() error {
var wg sync.WaitGroup

if p.ReadStdoutFn != nil {
wg.Add(1)
go func() {
p.ReadStdoutFn(p.Stdout)
wg.Done()
}()
}

if p.ReadStderrFn != nil {
wg.Add(1)
go func() {
p.ReadStderrFn(p.Stderr)
wg.Done()
}()
}

wg.Wait()
return p.Cmd.Wait()
}

func isQuitting(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
24 changes: 24 additions & 0 deletions internal/process/process_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// +build !windows

package process

import (
"os/exec"
"syscall"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
go func() {
<-time.NewTimer(timeout).C
if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM)
go func() {
<-time.NewTimer(timeout).C
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
}()
}
}()
}
17 changes: 17 additions & 0 deletions internal/process/process_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// +build windows

package process

import (
"os/exec"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
go func() {
<-time.NewTimer(timeout).C
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
}()
}
48 changes: 48 additions & 0 deletions plugins/common/shim/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Telegraf Execd Go Shim

The goal of this _shim_ is to make it trivial to extract an internal input plugin
out to a stand-alone repo for the purpose of compiling it as a separate app and
running it from the inputs.execd plugin.

The execd-shim is still experimental and the interface may change in the future.
Especially as the concept expands to processors, aggregators, and outputs.

## Steps to externalize a plugin

1. Move the project to an external repo, optionally preserving the
_plugins/inputs/plugin_name_ folder structure. For an example of what this might
look at, take a look at [ssoroka/rand](https://github.com/ssoroka/rand) or
[danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins)
1. Copy [main.go](./example/cmd/main.go) into your project under the cmd folder.
This will be the entrypoint to the plugin when run as a stand-alone program, and
it will call the shim code for you to make that happen.
1. Edit the main.go file to import your plugin. Within Telegraf this would have
been done in an all.go file, but here we don't split the two apart, and the change
just goes in the top of main.go. If you skip this step, your plugin will do nothing.
1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration
specific to your plugin. Note that this config file **must be separate from the
rest of the config for Telegraf, and must not be in a shared directory where
Telegraf is expecting to load all configs**. If Telegraf reads this config file
it will not know which plugin it relates to.

## Steps to build and run your plugin

1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go`
1. Test out the binary if you haven't done this yet. eg `./rand -config plugin.conf`
Depending on your polling settings and whether you implemented a service plugin or
an input gathering plugin, you may see data right away, or you may have to hit enter
first, or wait for your poll duration to elapse, but the metrics will be written to
STDOUT. Ctrl-C to end your test.
1. Configure Telegraf to call your new plugin binary. eg:

```
[[inputs.execd]]
command = ["/path/to/rand", "-config", "/path/to/plugin.conf"]
signal = "none"
```

## Congratulations!

You've done it! Consider publishing your plugin to github and open a Pull Request
back to the Telegraf repo letting us know about the availability of your
[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md).
Loading

0 comments on commit a8dfa8e

Please sign in to comment.