Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add file based agent transport
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Doherty <[email protected]>
chrisdoherty4 committed Jun 29, 2023

Partially verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
1 parent 2946cc3 commit 0d534d1
Showing 5 changed files with 239 additions and 15 deletions.
109 changes: 109 additions & 0 deletions internal/agent/transport/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package transport

import (
"context"
"os"
"path/filepath"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
"gopkg.in/yaml.v2"
)

// File is a transport implementation that monitors a directory for workflow files. When it finds
// files it has yet to process, it will parse them and offload them to its handler. It is intended
// for developmental use only.
type File struct {
// Log is a logger for debugging.
Log logr.Logger

// Dir is the directory to be monitored.
Dir string

// Tick defines the duration to wait before inspecting the directory for new workflow files.
// It defaulst to 5 seconds.
Tick time.Duration

// cache is used to track handled workflows.
cache map[string]struct{}
}

// Start begins watching f.Dir for files. When it finds a file it hasn't handled before, it
// attempts to parse it and offload to the handler. It will run workflows once where a workflow
// is determined by its file name.
func (f *File) Start(ctx context.Context, _ string, handler WorkflowHandler) error {
if f.Tick == 0 {
f.Tick = 5 * time.Second
}

f.cache = map[string]struct{}{}

ticker := time.NewTicker(f.Tick)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil

case <-ticker.C:
entries, err := os.ReadDir(f.Dir)
if err != nil {
return err
}

// When we experience an error with a particular file we don't want to return an
// error because we want to process as much as possible. Instead, we ignore the problem
// and hope the user fixes the issue in a future tick.
for _, e := range entries {
if e.IsDir() || !hasYAMLExt(e.Name()) {
continue
}

// Ensure we haven't handled this file before.
if _, handled := f.cache[e.Name()]; handled {
continue
}

f.Log.Info("Found new workflow file", "file", e.Name())

fh, err := os.Open(filepath.Join(f.Dir, e.Name()))
if err != nil {
f.Log.Error(err, "Could not open file", "file", e.Name())
continue
}

var wrkflow workflow.Workflow
if err := yaml.NewDecoder(fh).Decode(&wrkflow); err != nil {
f.Log.Error(err, "Invalid workflow YAML", "file", e.Name())
continue
}

// Add the file to the cache so we don't reprocess it on the next iteration.
f.cache[e.Name()] = struct{}{}

handler.HandleWorkflow(ctx, wrkflow, f)
}
}
}
}

func hasYAMLExt(path string) bool {
ext := strings.Trim(filepath.Ext(path), ".")
for _, v := range []string{"yml", "yaml"} {
if v == ext {
return true
}
}
return false
}

func (f *File) RecordEvent(_ context.Context, e event.Event) error {
// Noop because we don't particularly care about events for File based transports. Maybe
// we'll record this in a dedicated file one day.
f.Log.Info("Recording event", "event", e.GetName())
return nil
}
97 changes: 97 additions & 0 deletions internal/agent/transport/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package transport_test

import (
"context"
"testing"
"time"

"github.com/go-logr/zerologr"
"github.com/google/go-cmp/cmp"
"github.com/rs/zerolog"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/transport"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

func TestFile(t *testing.T) {
logger := zerolog.New(zerolog.NewConsoleWriter())

expect := expectWorkflows{
Workflows: []workflow.Workflow{
{
ID: "test-workflow-id",
Actions: []workflow.Action{
{
ID: "test-action-1",
Name: "my test action",
Image: "docker.io/hub/alpine",
Cmd: "sh -c",
Args: []string{"echo", "action 1"},
Env: map[string]string{"foo": "bar"},
Volumes: []string{"mount:/foo/bar:ro"},
NetworkNamespace: "custom-namespace",
},
{
ID: "test-action-2",
Name: "my test action",
Image: "docker.io/hub/alpine",
Cmd: "sh -c",
Args: []string{"echo", "action 2"},
Env: map[string]string{"foo": "bar"},
Volumes: []string{"mount:/foo/bar:ro"},
NetworkNamespace: "custom-namespace",
},
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

handler := &transport.WorkflowHandlerMock{
HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) {
if expect.End() {
t.Fatalf("Received unexpected workflow:\nid=%v", workflow.ID)
}

next := expect.Next()
if !cmp.Equal(*next, workflow) {
t.Fatalf("Workflow diff:\n%v", cmp.Diff(next, workflow))
}
},
}

f := transport.File{
Log: zerologr.New(&logger),
Dir: "./testdata/workflows",
Tick: 1 * time.Second,
}

err := f.Start(ctx, "agent_id", handler)
if err != nil {
t.Fatal(err)
}
}

type expectWorkflows struct {
Workflows []workflow.Workflow
idx int
}

func (e *expectWorkflows) Next() *workflow.Workflow {
if len(e.Workflows) == 0 {
return nil
}

if len(e.Workflows) < e.idx {
return nil
}

defer func() { e.idx++ }()
return &e.Workflows[e.idx]
}

func (e *expectWorkflows) End() bool {
return len(e.Workflows) < e.idx
}
5 changes: 0 additions & 5 deletions internal/agent/transport/grpc_test.go
Original file line number Diff line number Diff line change
@@ -2,13 +2,11 @@ package transport_test

import (
"context"
"fmt"
"io"
"sync"
"testing"

"github.com/go-logr/zerologr"
"github.com/kr/pretty"
"github.com/rs/zerolog"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/transport"
@@ -58,7 +56,6 @@ func TestGRPC(t *testing.T) {
handler := &transport.WorkflowHandlerMock{
HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) {
defer wg.Done()
fmt.Println("handling")
close(responses)
},
}
@@ -71,6 +68,4 @@ func TestGRPC(t *testing.T) {
}

wg.Wait()

pretty.Println(handler.HandleWorkflowCalls())
}
23 changes: 23 additions & 0 deletions internal/agent/transport/testdata/workflows/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
id: "test-workflow-id"
actions:
- id: "test-action-1"
name: "my test action"
image: "docker.io/hub/alpine"
cmd: "sh -c"
args: ["echo", "action 1"]
env:
foo: bar
volumes:
- mount:/foo/bar:ro
networkNamespace: "custom-namespace"
- id: "test-action-2"
name: "my test action"
image: "docker.io/hub/alpine"
cmd: "sh -c"
args: ["echo", "action 2"]
env:
foo: bar
volumes:
- mount:/foo/bar:ro
networkNamespace: "custom-namespace"

20 changes: 10 additions & 10 deletions internal/agent/workflow/workflow.go
Original file line number Diff line number Diff line change
@@ -5,8 +5,8 @@ package workflow
// Workflow represents a runnable workflow for the Handler.
type Workflow struct {
// Do we need a workflow name? Does that even come down in the proto definition?
ID string
Actions []Action
ID string `yaml:"id"`
Actions []Action `yaml:"actions"`
}

func (w Workflow) String() string {
@@ -15,14 +15,14 @@ func (w Workflow) String() string {

// Action represents an individually runnable action.
type Action struct {
ID string
Name string
Image string
Cmd string
Args []string
Env map[string]string
Volumes []string
NetworkNamespace string
ID string `yaml:"id"`
Name string `yaml:"name"`
Image string `yaml:"image"`
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Env map[string]string `yaml:"env"`
Volumes []string `yaml:"volumes"`
NetworkNamespace string `yaml:"networkNamespace"`
}

func (a Action) String() string {

0 comments on commit 0d534d1

Please sign in to comment.