diff --git a/internal/agent/transport/file.go b/internal/agent/transport/file.go new file mode 100644 index 000000000..1013db674 --- /dev/null +++ b/internal/agent/transport/file.go @@ -0,0 +1,143 @@ +package transport + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" + "github.com/tinkerbell/tink/internal/agent/event" +) + +type File struct { + Dir string +} + +func (f *File) Start(ctx context.Context, agentID string, handler WorkflowHandler) error { + files := make(chan string) + errs := make(chan error) + + go func() { + f.gatherExistingFiles(ctx, files, errs) + f.watchForNewFiles(ctx, files, errs) + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errs: + return err + + case file := <-files: + // Attempt to parse the YAML + // Convert the YAML to a workflow object + // Offload to the handler + } + } +} + +func (f *File) gatherExistingFiles(ctx context.Context, file chan<- string, errs chan<- error) { + entries, err := os.ReadDir(f.Dir) + if err != nil { + select { + case <-ctx.Done(): + case errs <- err: + } + return + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + if hasYAMLExt(entry.Name()) { + select { + case <-ctx.Done(): + return + case file <- entry.Name(): + } + } + } +} + +func (f *File) watchForNewFiles(ctx context.Context, files chan<- string, errs chan<- error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + select { + case <-ctx.Done(): + case errs <- err: + } + return + } + defer watcher.Close() + + if err := watcher.Add(f.Dir); err != nil { + select { + case <-ctx.Done(): + case errs <- err: + } + return + } + + for { + select { + // If the context is done, do nothing as the primary routine should handle it. + case <-ctx.Done(): + return + + case err, ok := <-watcher.Errors: + if !ok { + err = fmt.Errorf("directory monitor unexpectedly errored: %w", err) + } + + select { + case <-ctx.Done(): + case errs <- err: + } + return + + case ev, ok := <-watcher.Events: + if !ok { + select { + case <-ctx.Done(): + case errs <- errors.New("directory monitor unexpectedly closed"): + } + return + } + + // We watch for create and write events. Watching for write events may result in + // funneling invalid YAMLs to the handling logic. This is fine, the handling logic + // should just ignore the YAML. + switch ev.Op { + case fsnotify.Create, fsnotify.Write: + if hasYAMLExt(ev.Name) { + select { + case <-ctx.Done(): + case files <- ev.Name: + } + } + } + } + } +} + +func hasYAMLExt(path string) bool { + ext := filepath.Ext(path) + for _, v := range []string{"yml", "yaml"} { + if v == ext { + return true + } + } + return false +} + +func (f *File) RecordEvent(ctx context.Context, e event.Event) error { + // Noop because we don't particularly care about events for File based transports. Maybe + // we'll record this in a dedicated file one day. + return nil +}