Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable worker environment variables #214

Merged
merged 3 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,27 @@ pkg-config --variable workerexecdir yggdrasil
/usr/local/libexec/yggdrasil
```

A worker program must implement the `Worker` service. `yggd` will execute
A worker program must implement the `Worker` service. `yggd` will execute
worker programs at start up. It will set the `YGG_SOCKET_ADDR` variable in the
worker's environment. This address is the socket on which the worker must dial
the dispatcher and call the "Register" RPC method. Upon successful registration,
the worker will receive back a socket address. The worker must bind to and
listen on this address for RPC methods. See `worker/echo` for an example worker
process that does nothing more than return the content data it received from the
dispatcher.

Additionally, `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` (and their lowercase
equivalents) are automatically read from `yggd`'s environment and added to the
worker's environment. Any additional environment variables required may be set
in a configuration file (see below).

## Worker Configuration

A TOML configuration file may optionaly be instaleld into
`$SYSCONFDIR/yggdrasil/workers`. The file may be used to configure the worker
startup procedure. The following table includes valid fields for a
worker configuration file:

| **Field** | **Value** | **Description** |
| ---------- | --------- | --------------- |
| `env` | `array` | Any additional values that a worker needs inserted into its runtime enviroment before starting up. `PATH` and all variables beginning with `YGG_` are forbidden and may not be overridden. |
207 changes: 38 additions & 169 deletions cmd/yggd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,215 +5,84 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"git.sr.ht/~spc/go-log"
"github.com/redhatinsights/yggdrasil"
"github.com/rjeczalik/notify"
)

func startProcess(file string, env []string, delay time.Duration, died chan int) {
type processStartedFunc func(pid int, stdout, stderr io.ReadCloser)
type processStoppedFunc func(pid int, state *os.ProcessState)

// startProcess executes file, setting up the environment using the provided env
// values. If the function parameter started is not nil, it is invoked on a
// goroutine after the process has been started.
func startProcess(
file string,
args []string,
env []string,
started processStartedFunc,
) error {
if _, err := os.Stat(file); os.IsNotExist(err) {
log.Warnf("cannot start worker: %v", err)
return
return fmt.Errorf("cannot find file: %v", err)
}

cmd := exec.Command(file)
cmd := exec.Command(file, args...)
cmd.Env = env

if delay < 0 {
log.Errorf("failed to start worker '%v' too many times", file)
return
}

if delay > 0 {
log.Tracef("delaying worker start for %v...", delay)
time.Sleep(delay)
}

stdout, err := cmd.StdoutPipe()
if err != nil {
log.Errorf("cannot connect to stdout: %v", err)
return
return fmt.Errorf("cannot connect to stdout: %v", err)
}

stderr, err := cmd.StderrPipe()
if err != nil {
log.Errorf("cannot connect to stderr: %v", err)
return
return fmt.Errorf("cannot connect to stderr: %v", err)
}

if err := cmd.Start(); err != nil {
log.Errorf("cannot start worker: %v: %v", file, err)
return
}
log.Debugf("started process: %v", cmd.Process.Pid)

go func() {
for {
buf := make([]byte, 4096)
n, err := stdout.Read(buf)
if n > 0 {
log.Tracef("[%v] %v", file, strings.TrimRight(string(buf), "\n\x00"))
}
if err != nil {
switch err {
case io.EOF:
log.Debugf("%v stdout reached EOF: %v", file, err)
return
default:
log.Errorf("cannot read from stdout: %v", err)
continue
}
}
}
}()

go func() {
for {
buf := make([]byte, 4096)
n, err := stderr.Read(buf)
if n > 0 {
log.Errorf("[%v] %v", file, strings.TrimRight(string(buf), "\n\x00"))
}
if err != nil {
switch err {
case io.EOF:
log.Debugf("%v stderr reached EOF: %v", file, err)
return
default:
log.Errorf("cannot read from stderr: %v", err)
continue
}
}
}
}()

pidDirPath := filepath.Join(yggdrasil.LocalstateDir, "run", yggdrasil.LongName, "workers")

if err := os.MkdirAll(pidDirPath, 0755); err != nil {
log.Errorf("cannot create directory: %v", err)
return
}
return fmt.Errorf("cannot start worker: %v: %v", file, err)

if err := os.WriteFile(filepath.Join(pidDirPath, filepath.Base(file)+".pid"), []byte(fmt.Sprintf("%v", cmd.Process.Pid)), 0644); err != nil {
log.Errorf("cannot write to file: %v", err)
return
}
log.Debugf("started process: %v", cmd.Process.Pid)

go watchProcess(cmd, delay, died)
}

func watchProcess(cmd *exec.Cmd, delay time.Duration, died chan int) {
log.Debugf("watching process: %v", cmd.Process.Pid)

state, err := cmd.Process.Wait()
if err != nil {
log.Errorf("process %v exited with error: %v", cmd.Process.Pid, err)
}

died <- state.Pid()

if state.SystemTime() < time.Duration(1*time.Second) {
delay += 5 * time.Second
if started != nil {
go started(cmd.Process.Pid, stdout, stderr)
}
if delay >= time.Duration(30*time.Second) {
delay = -1
}

go startProcess(cmd.Path, cmd.Env, delay, died)
}

func killProcess(pid int) error {
process, err := os.FindProcess(int(pid))
if err != nil {
return fmt.Errorf("cannot find process with pid: %w", err)
}
if err := process.Kill(); err != nil {
log.Errorf("cannot kill process: %v", err)
} else {
log.Infof("killed process %v", process.Pid)
}
return nil
}

func killWorker(pidFile string) error {
data, err := os.ReadFile(pidFile)
// waitProcess finds a process with the given pid and waits for it to exit.
// If the function parameter stopped is not nil, it is invoked on a goroutine
// when the process exits.
func waitProcess(pid int, stopped processStoppedFunc) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("cannot read contents of file: %w", err)
return fmt.Errorf("cannot find process with pid: %v", err)
}
pid, err := strconv.ParseInt(string(data), 10, 64)

log.Debugf("waiting for process: %v", process.Pid)
state, err := process.Wait()
if err != nil {
return fmt.Errorf("cannot parse file contents as int: %w", err)
return fmt.Errorf("process %v exited with error: %v", process.Pid, err)
}

if err := killProcess(int(pid)); err != nil {
return fmt.Errorf("cannot kill process: %w", err)
if stopped != nil {
go stopped(process.Pid, state)
}

if err := os.Remove(pidFile); err != nil {
return fmt.Errorf("cannot remove file: %w", err)
}
return nil
}

func killWorkers() error {
pidDirPath := filepath.Join(yggdrasil.LocalstateDir, "run", yggdrasil.LongName, "workers")
if err := os.MkdirAll(pidDirPath, 0755); err != nil {
return fmt.Errorf("cannot create directory: %w", err)
}
fileInfos, err := os.ReadDir(pidDirPath)
// stopProcess finds a process with the given pid and kills it.
func stopProcess(pid int) error {
subpop marked this conversation as resolved.
Show resolved Hide resolved
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("cannot read contents of directory: %w", err)
return fmt.Errorf("cannot find process with pid: %v", err)
}

for _, info := range fileInfos {
pidFilePath := filepath.Join(pidDirPath, info.Name())
if err := killWorker(pidFilePath); err != nil {
return fmt.Errorf("cannot kill worker: %w", err)
}
if err := process.Kill(); err != nil {
return fmt.Errorf("cannot stop process %v", err)
}

return nil
}

func watchWorkerDir(dir string, env []string, died chan int) {
c := make(chan notify.EventInfo, 1)

if err := notify.Watch(dir, c, notify.InCloseWrite, notify.InDelete, notify.InMovedFrom, notify.InMovedTo); err != nil {
log.Errorf("cannot start notify watchpoint: %v", err)
return
}
defer notify.Stop(c)

for e := range c {
log.Debugf("received inotify event %v", e.Event())
switch e.Event() {
case notify.InCloseWrite, notify.InMovedTo:
if strings.HasSuffix(e.Path(), "worker") {
if ExcludeWorkers[filepath.Base(e.Path())] {
continue
}
log.Tracef("new worker detected: %v", e.Path())
go startProcess(e.Path(), env, 0, died)
}
case notify.InDelete, notify.InMovedFrom:
workerName := filepath.Base(e.Path())
pidFilePath := filepath.Join(
yggdrasil.LocalstateDir,
"run",
yggdrasil.LongName,
"workers",
workerName+".pid",
)

if err := killWorker(pidFilePath); err != nil {
log.Errorf("cannot kill worker: %v", err)
continue
}
}
}
}
Loading
Loading