Skip to content

Commit

Permalink
Add file based agent transport
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Doherty <[email protected]>
  • Loading branch information
chrisdoherty4 committed Jun 22, 2023
1 parent d1c781c commit 20887ff
Showing 1 changed file with 143 additions and 0 deletions.
143 changes: 143 additions & 0 deletions internal/agent/transport/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package transport

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"

"github.com/fsnotify/fsnotify"
"github.com/tinkerbell/tink/internal/agent/event"
)

type File struct {
Dir string
}

func (f *File) Start(ctx context.Context, agentID string, handler WorkflowHandler) error {
files := make(chan string)
errs := make(chan error)

go func() {
f.gatherExistingFiles(ctx, files, errs)
f.watchForNewFiles(ctx, files, errs)
}()

for {
select {
case <-ctx.Done():
return ctx.Err()

case err := <-errs:
return err

case file := <-files:
// Attempt to parse the YAML
// Convert the YAML to a workflow object
// Offload to the handler
}
}
}

func (f *File) gatherExistingFiles(ctx context.Context, file chan<- string, errs chan<- error) {
entries, err := os.ReadDir(f.Dir)
if err != nil {
select {
case <-ctx.Done():
case errs <- err:
}
return
}

for _, entry := range entries {
if entry.IsDir() {
continue
}

if hasYAMLExt(entry.Name()) {
select {
case <-ctx.Done():
return
case file <- entry.Name():
}
}
}
}

func (f *File) watchForNewFiles(ctx context.Context, files chan<- string, errs chan<- error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
select {
case <-ctx.Done():
case errs <- err:
}
return
}
defer watcher.Close()

if err := watcher.Add(f.Dir); err != nil {
select {
case <-ctx.Done():
case errs <- err:
}
return
}

for {
select {
// If the context is done, do nothing as the primary routine should handle it.
case <-ctx.Done():
return

case err, ok := <-watcher.Errors:
if !ok {
err = fmt.Errorf("directory monitor unexpectedly errored: %w", err)
}

select {
case <-ctx.Done():
case errs <- err:
}
return

case ev, ok := <-watcher.Events:
if !ok {
select {
case <-ctx.Done():
case errs <- errors.New("directory monitor unexpectedly closed"):
}
return
}

// We watch for create and write events. Watching for write events may result in
// funneling invalid YAMLs to the handling logic. This is fine, the handling logic
// should just ignore the YAML.
switch ev.Op {
case fsnotify.Create, fsnotify.Write:
if hasYAMLExt(ev.Name) {
select {
case <-ctx.Done():
case files <- ev.Name:
}
}
}
}
}
}

func hasYAMLExt(path string) bool {
ext := filepath.Ext(path)
for _, v := range []string{"yml", "yaml"} {
if v == ext {
return true
}
}
return false
}

func (f *File) RecordEvent(ctx context.Context, e event.Event) error {
// Noop because we don't particularly care about events for File based transports. Maybe
// we'll record this in a dedicated file one day.
return nil
}

0 comments on commit 20887ff

Please sign in to comment.