Skip to content

Commit

Permalink
Add agent business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdoherty4 committed Apr 6, 2023
1 parent f3d2cf6 commit 738a941
Show file tree
Hide file tree
Showing 14 changed files with 486 additions and 0 deletions.
83 changes: 83 additions & 0 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package agent

import (
"context"
"errors"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

// Agent is the core data structure for handling workflow execution on target nodes. It leverages
// a Transport and a ContainerRuntime to retrieve workflows and execute actions.
//
// The agent runs a single workflow at a time. Concurrent requests to run workflows will have the
// second workflow rejected with an event.WorkflowRejected event.
type Agent struct {
Log logr.Logger

// ID is the unique identifier for the agent. It is used by the transport to identify workflows
// scheduled for this agent.
ID string

// Transport is the transport used by the agent for communicating workflows and events.
Transport Transport

// Runtime is the container runtime used to execute workflow actions.
Runtime ContainerRuntime

sem chan struct{}
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
// to receive workflows. On receiving a workflow, it will leverage the configured Runtime to
// execute workflow actions.
func (a *Agent) Start(ctx context.Context) error {
if a.Log.GetSink() == nil {
//nolint:stylecheck // Specifying field on data structure
return errors.New("Log field must be configured with a valid logger before calling Start()")
}

if a.ID == "" {
return errors.New("ID field must be set before calling Start()")
}

if a.Transport == nil {
return errors.New("Transport field must be set before calling Start()")
}

if a.Runtime == nil {
//nolint:stylecheck // Specifying field on data structure
return errors.New("Runtime field must be set before calling Start()")
}

a.Log = a.Log.WithValues("agent_id", a.ID)

// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time.
a.sem = make(chan struct{}, 1)
a.sem <- struct{}{}

a.Log.Info("Starting agent")
return a.Transport.Start(ctx, a.ID, a)
}

// HandleWorkflow satisfies workflow.Handler.
func (a *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
select {
case <-a.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { a.sem <- struct{}{} }()
return a.run(ctx, wflw, events)

default:
reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
if err := events.RecordEvent(ctx, reject); err != nil {
a.Log.Info("Failed to record event", logEventKey, reject)
}
return nil
}
}
53 changes: 53 additions & 0 deletions internal/agent/event/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package event

import "fmt"

const (
ActionStartedName Name = "ActionStarted"
ActionSucceededName Name = "ActionSucceeded"
ActionFailedName Name = "ActionFailed"
)

// ActionStarted occurs when an action begins running.
type ActionStarted struct {
ActionID string
WorkflowID string
}

func (ActionStarted) GetName() Name {
return ActionStartedName
}

func (e ActionStarted) String() string {
return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID)
}

// ActionSucceeded occurs when an action successfully completes.
type ActionSucceeded struct {
ActionID string
WorkflowID string
}

func (ActionSucceeded) GetName() Name {
return ActionSucceededName
}

func (e ActionSucceeded) String() string {
return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID)
}

// ActionFailed occurs when an action fails to complete.
type ActionFailed struct {
ActionID string
WorkflowID string
Reason string
Message string
}

func (ActionFailed) GetName() Name {
return ActionFailedName
}

func (e ActionFailed) String() string {
return fmt.Sprintf("workflow=%v action=%v reason=%v", e.WorkflowID, e.ActionID, e.Reason)
}
14 changes: 14 additions & 0 deletions internal/agent/event/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package event

import (
"fmt"
)

// IncompatibleError indicates an event was received that.
type IncompatibleError struct {
Event Event
}

func (e IncompatibleError) Error() string {
return fmt.Sprintf("incompatible event: %v", e.Event.GetName())
}
22 changes: 22 additions & 0 deletions internal/agent/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package event

import (
"context"
)

// Name is a unique name identifying an event.
type Name string

// Event is a recordable event.
type Event interface {
// GetName retrieves the event name.
GetName() Name

// Force events to reside in this package - see zz_known.go.
isEventFromThisPackage()
}

// Recorder records events generated from running a Workflow.
type Recorder interface {
RecordEvent(context.Context, Event) error
}
19 changes: 19 additions & 0 deletions internal/agent/event/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package event

const (
WorkflowRejectedName Name = "WorkflowRejected"
)

// WorkflowRejected is generated when a workflow is being rejected by the agent.
type WorkflowRejected struct {
ID string
Message string
}

func (WorkflowRejected) GetName() Name {
return WorkflowRejectedName
}

func (e WorkflowRejected) String() string {
return e.Message
}
16 changes: 16 additions & 0 deletions internal/agent/event/zz_from_package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package event

// We want to force events to reside in this package so its clear what events are usable
// with by agent code. We achieve this using a compile time check that ensures all events
// implement an unexported method on the Event interface which is the interface passed around
// by event handling code.
//
// This source file should not contain methods other than the isEventFromThisPackage().
//
// This code is hand written.

func (ActionStarted) isEventFromThisPackage() {}
func (ActionSucceeded) isEventFromThisPackage() {}
func (ActionFailed) isEventFromThisPackage() {}

func (WorkflowRejected) isEventFromThisPackage() {}
47 changes: 47 additions & 0 deletions internal/agent/failure/reason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package failure

import "errors"

// ReasonUnknown is returned when Reason() is called on an error without a reason.
const ReasonUnknown = "Unknown"

// Reason extracts a failure reason from err. err has a reason if it satisfies the failure reason
// interface:
//
// interface {
// FailureReason() string
// }
//
// If err does not have a reason or FailureReason() returns an empty string, ReasonUnknown is
// returned.
func Reason(err error) (string, bool) {
fr, ok := err.(interface {
FailureReason() string
})

if !ok || fr.FailureReason() == "" {
return "", false
}

return fr.FailureReason(), true
}

// WrapWithReason decorates err with reason. The reason can be extracted using Reason().
func WrapWithReason(err error, reason string) error {
return withReason{err, reason}
}

// WithReason creates a new error using message and wraps it with reason. The reason can be
// extracted using Reason().
func WithReason(message, reason string) error {
return WrapWithReason(errors.New(message), reason)
}

type withReason struct {
error
reason string
}

func (e withReason) FailureReason() string {
return e.reason
}
20 changes: 20 additions & 0 deletions internal/agent/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package agent

import (
"context"

"github.com/tinkerbell/tink/internal/agent/workflow"
)

// ContainerRuntime is a runtime capable of executing workflow actions.
type ContainerRuntime interface {
// Run executes the action. The runtime should mount the following files for the action
// implementation to communicate a reason and message in the event of failure:
//
// /tinkerbell/failure-reason
// /tinkerbell/failure-message
//
// The reason and message should be communicataed via the returned error. The message should
// be the error message and the reason should be provided as defined in failure.Reason().
Run(context.Context, workflow.Action) error
}
22 changes: 22 additions & 0 deletions internal/agent/runtime/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package runtime

import (
"context"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

var _ agent.ContainerRuntime = Fake{}

// Fake is a runtime that always succeeds. It does not literally execute any actions.
type Fake struct {
Log logr.Logger
}

// Run satisfies agent.ContainerRuntime.
func (f Fake) Run(_ context.Context, a workflow.Action) error {
f.Log.Info("Starting fake container", "action", a)
return nil
}
15 changes: 15 additions & 0 deletions internal/agent/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package agent

import (
"context"

"github.com/tinkerbell/tink/internal/agent/transport"
)

// Transport is a transport mechanism for communicating workflows to the agent.
type Transport interface {
// Start is a blocking call that starts the transport and begins retreiving workflows for the
// given agentID. The transport should pass workflows to the WorkflowHandler. If the transport
// needs to cancel a workflow it should cancel the context passed to the WorkflowHandler.
Start(_ context.Context, agentID string, _ transport.WorkflowHandler) error
}
29 changes: 29 additions & 0 deletions internal/agent/transport/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package transport

import (
"context"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

type Fake struct {
Log logr.Logger
Workflows []workflow.Workflow
}

func (f Fake) Start(ctx context.Context, _ string, runner WorkflowHandler) error {
f.Log.Info("Starting fake transport")
for _, w := range f.Workflows {
if err := runner.HandleWorkflow(ctx, w, f); err != nil {
f.Log.Error(err, "Running workflow", "workflow", w)
}
}
return nil
}

func (f Fake) RecordEvent(_ context.Context, e event.Event) error {
f.Log.Info("Recording event", "event", e.GetName())
return nil
}
16 changes: 16 additions & 0 deletions internal/agent/transport/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package transport

import (
"context"

"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

// WorkflowHandler is responsible for handling workflow execution.
type WorkflowHandler interface {
// HandleWorkflow begins executing the given workflow. The event recorder can be used to
// indicate the progress of a workflow. If the given context becomes cancelled, the workflow
// handler should stop workflow execution.
HandleWorkflow(context.Context, workflow.Workflow, event.Recorder) error
}
Loading

0 comments on commit 738a941

Please sign in to comment.