Skip to content

Commit

Permalink
Add agent business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdoherty4 committed Apr 6, 2023
1 parent 21feed3 commit 8d59798
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 0 deletions.
83 changes: 83 additions & 0 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package agent

import (
"context"
"errors"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

// Agent is the core data structure for handling workflow execution on target nodes. It leverages
// a Transport and a ContainerRuntime to retrieve workflows and execute actions.
//
// The agent runs a single workflow at a time. Concurrent requests to run workflows will have the
// second workflow rejected with an event.WorkflowRejected event.
type Agent struct {
Log logr.Logger

// ID is the unique identifier for the agent. It is used by the transport to identify workflows
// scheduled for this agent.
ID string

// Transport is the transport used by the agent for communicating workflows and events.
Transport Transport

// Runtime is the container runtime used to execute workflow actions.
Runtime ContainerRuntime

sem chan struct{}
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
// to receive workflows. On receiving a workflow, it will leverage the configured Runtime to
// execute workflow actions.
func (a *Agent) Start(ctx context.Context) error {
if a.Log.GetSink() == nil {
//nolint:stylecheck // Specifying field on data structure
return errors.New("Log field must be configured with a valid logger before calling Start()")
}

if a.ID == "" {
return errors.New("ID field must be set before calling Start()")
}

if a.Transport == nil {
return errors.New("Transport field must be set before calling Start()")
}

if a.Runtime == nil {
//nolint:stylecheck // Specifying field on data structure
return errors.New("Runtime field must be set before calling Start()")
}

a.Log = a.Log.WithValues("agent_id", a.ID)

// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time.
a.sem = make(chan struct{}, 1)
a.sem <- struct{}{}

a.Log.Info("Starting agent")
return a.Transport.Start(ctx, a.ID, a)
}

// HandleWorkflow satisfies workflow.Handler.
func (a *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
select {
case <-a.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { a.sem <- struct{}{} }()
return a.run(ctx, wflw, events)

default:
reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
if err := events.RecordEvent(ctx, reject); err != nil {
a.Log.Info("Failed to record event", logEventKey, reject)
}
return nil
}
}
53 changes: 53 additions & 0 deletions internal/agent/event/action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package event

import "fmt"

const (
ActionStartedName Name = "ActionStarted"
ActionSucceededName Name = "ActionSucceeded"
ActionFailedName Name = "ActionFailed"
)

// ActionStarted occurs when an action begins running.
type ActionStarted struct {
ActionID string
WorkflowID string
}

func (ActionStarted) GetName() Name {
return ActionStartedName
}

func (e ActionStarted) String() string {
return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID)
}

// ActionSucceeded occurs when an action successfully completes.
type ActionSucceeded struct {
ActionID string
WorkflowID string
}

func (ActionSucceeded) GetName() Name {
return ActionSucceededName
}

func (e ActionSucceeded) String() string {
return fmt.Sprintf("workflow=%v action=%v", e.WorkflowID, e.ActionID)
}

// ActionFailed occurs when an action fails to complete.
type ActionFailed struct {
ActionID string
WorkflowID string
Reason string
Message string
}

func (ActionFailed) GetName() Name {
return ActionFailedName
}

func (e ActionFailed) String() string {
return fmt.Sprintf("workflow=%v action=%v reason=%v", e.WorkflowID, e.ActionID, e.Reason)
}
14 changes: 14 additions & 0 deletions internal/agent/event/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package event

import (
"fmt"
)

// IncompatibleError indicates an event was received that.
type IncompatibleError struct {
Event Event
}

func (e IncompatibleError) Error() string {
return fmt.Sprintf("incompatible event: %v", e.Event.GetName())
}
22 changes: 22 additions & 0 deletions internal/agent/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package event

import (
"context"
)

// Name is a unique name identifying an event.
type Name string

// Event is a recordable event.
type Event interface {
// GetName retrieves the event name.
GetName() Name

// Force events to reside in this package - see zz_known.go.
isEventFromThisPackage()
}

// Recorder records events generated from running a Workflow.
type Recorder interface {
RecordEvent(context.Context, Event) error
}
19 changes: 19 additions & 0 deletions internal/agent/event/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package event

const (
WorkflowRejectedName Name = "WorkflowRejected"
)

// WorkflowRejected is generated when a workflow is being rejected by the agent.
type WorkflowRejected struct {
ID string
Message string
}

func (WorkflowRejected) GetName() Name {
return WorkflowRejectedName
}

func (e WorkflowRejected) String() string {
return e.Message
}
16 changes: 16 additions & 0 deletions internal/agent/event/zz_from_package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package event

// We want to force events to reside in this package so its clear what events are usable
// with by agent code. We achieve this using a compile time check that ensures all events
// implement an unexported method on the Event interface which is the interface passed around
// by event handling code.
//
// This source file should not contain methods other than the isEventFromThisPackage().
//
// This code is hand written.

func (ActionStarted) isEventFromThisPackage() {}
func (ActionSucceeded) isEventFromThisPackage() {}
func (ActionFailed) isEventFromThisPackage() {}

func (WorkflowRejected) isEventFromThisPackage() {}
20 changes: 20 additions & 0 deletions internal/agent/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package agent

import (
"context"

"github.com/tinkerbell/tink/internal/agent/workflow"
)

// ContainerRuntime is a runtime capable of executing workflow actions.
type ContainerRuntime interface {
// Run executes the action. The runtime should mount the following files for the action
// implementation to communicate a reason and message in the event of failure:
//
// /tinkerbell/failure-reason
// /tinkerbell/failure-message
//
// The reason and message should be communicataed via the returned error. The message should
// be the error message and the reason should be provided as defined in failure.Reason().
Run(context.Context, workflow.Action) error
}
15 changes: 15 additions & 0 deletions internal/agent/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package agent

import (
"context"

"github.com/tinkerbell/tink/internal/agent/transport"
)

// Transport is a transport mechanism for communicating workflows to the agent.
type Transport interface {
// Start is a blocking call that starts the transport and begins retreiving workflows for the
// given agentID. The transport should pass workflows to the WorkflowHandler. If the transport
// needs to cancel a workflow it should cancel the context passed to the WorkflowHandler.
Start(_ context.Context, agentID string, _ transport.WorkflowHandler) error
}
99 changes: 99 additions & 0 deletions internal/agent/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package agent

import (
"context"
"regexp"
"time"

"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/failure"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

// ReasonRuntimeError is the default reason used when no reason is provided by the runtime.
const ReasonRuntimeError = "RuntimeError"

// ReasonInvalid indicates a reason provided by the runtime was invalid.
const ReasonInvalid = "InvalidReason"

// Consistent logging keys.
const (
logEventKey = "event"
logErrorKey = "error"
logReasonKey = "reason"
)

// validReasonRegex defines the regex for a valid action failure reason.
var validReasonRegex = regexp.MustCompile(`^[a-zA-Z]+$`)

// run executes the workflow using the runtime configured on a.
func (a *Agent) run(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
logger := a.Log.WithValues("workflow", wflw)
logger.Info("Starting workflow")

for _, action := range wflw.Actions {
logger := logger.WithValues("action_id", action.ID, "action_name", action.Name)

start := time.Now()
logger.Info("Starting action")

a.recordNonTerminatingEvent(ctx, events, event.ActionStarted{
ActionID: action.ID,
WorkflowID: wflw.ID,
})

if err := a.Runtime.Run(ctx, action); err != nil {
reason := ReasonRuntimeError
if r, ok := failure.Reason(err); ok {
reason = r
if !validReasonRegex.MatchString(reason) {
logger.Info(
"Received invalid reason for action failure; using InvalidReason instead",
logReasonKey, reason,
)
reason = ReasonInvalid
}
}

logger.Info("Failed to run action; terminating workflow",
logErrorKey, err,
logReasonKey, reason,
"duration", time.Since(start).String(),
)
a.recordTerminatingEvent(ctx, events, event.ActionFailed{
ActionID: action.ID,
WorkflowID: wflw.ID,
Reason: reason,
Message: err.Error(),
})
return nil
}

a.recordNonTerminatingEvent(ctx, events, event.ActionSucceeded{
ActionID: action.ID,
WorkflowID: wflw.ID,
})

logger.Info("Finished action", "duration", time.Since(start).String())
}

logger.Info("Finished workflow")

return nil
}

func (a *Agent) recordNonTerminatingEvent(ctx context.Context, events event.Recorder, e event.Event) {
if err := events.RecordEvent(ctx, e); err != nil {
a.Log.Info(
"Failed to record event; continuing workflow",
logEventKey, e,
logErrorKey, err,
)
}
}

func (a *Agent) recordTerminatingEvent(ctx context.Context, events event.Recorder, e event.Event) {
if err := events.RecordEvent(ctx, e); err != nil {
a.Log.Info("Failed to record event", logEventKey, e, logErrorKey, err)
}
}

0 comments on commit 8d59798

Please sign in to comment.