Skip to content

Commit

Permalink
Add core agent business logic (#705)
Browse files Browse the repository at this point in the history
## Summary

This PR adds core business logic for the new tink agent component. The agent will replace the tink worker and can therefore be considered a re-write. Re-writing the worker has allowed us to remove unsupported functionality such as multi-worker workflows that were implemented in the previous Tinkerbell versions as 'tasks'.

The core business logic leverages a transport to retrieve workflows. It communicates workflow progression via events. It runs workflows serially rejecting requests to run concurrently with an explicit `event.WorkflowRejected` event. It assumes enforcement of state machines will be performed by the server component it interacts with.

Finally, the logging around the agent has been significantly improved to reflect exactly what's happening as it transitions between actions.

## Relevant designs/PRs

* [v1alpha2 API](https://github.com/tinkerbell/roadmap/blob/main/design/20230222_tinkerbell_crd_refactor.md)
* [Failure reason and message handling](https://github.com/tinkerbell/roadmap/blob/main/design/20230313_action_failure_reason_communication.md)
* [Tink agent PoC](#680)

## Future work

Separate PRs will introduce container runtime and transport implementations. See the Tink agent PoC for a reference.
  • Loading branch information
mergify[bot] authored May 9, 2023
2 parents c73414e + ae7fc75 commit 54a6f1c
Show file tree
Hide file tree
Showing 20 changed files with 1,299 additions and 0 deletions.
82 changes: 82 additions & 0 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package agent

import (
"context"
"errors"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
"github.com/tinkerbell/tink/internal/agent/workflow"
)

// Agent is the core data structure for handling workflow execution on target nodes. It leverages
// a Transport and a ContainerRuntime to retrieve workflows and execute actions.
//
// The agent runs a single workflow at a time. Concurrent requests to run workflows will have the
// second workflow rejected with an event.WorkflowRejected event.
type Agent struct {
Log logr.Logger

// ID is the unique identifier for the agent. It is used by the transport to identify workflows
// scheduled for this agent.
ID string

// Transport is the transport used by the agent for communicating workflows and events.
Transport Transport

// Runtime is the container runtime used to execute workflow actions.
Runtime ContainerRuntime

sem chan struct{}
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
// to receive workflows. On receiving a workflow, it will leverage the configured Runtime to
// execute workflow actions.
func (agent *Agent) Start(ctx context.Context) error {
if agent.ID == "" {
return errors.New("ID field must be set before calling Start()")
}

if agent.Transport == nil {
return errors.New("Transport field must be set before calling Start()")
}

if agent.Runtime == nil {
//nolint:stylecheck // Specifying field on data structure
return errors.New("Runtime field must be set before calling Start()")
}

agent.Log = agent.Log.WithValues("agent_id", agent.ID)

// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time.
agent.sem = make(chan struct{}, 1)
agent.sem <- struct{}{}

agent.Log.Info("Starting agent")
return agent.Transport.Start(ctx, agent.ID, agent)
}

// HandleWorkflow satisfies transport.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
// sem isn't protected by a synchronization data structure so this is technically invoking
// undefined behavior - consider this a best effort to ensuring Start() has been called.
if agent.sem == nil {
return errors.New("agent must have Start() called before calling HandleWorkflow()")
}

select {
case <-agent.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()
return agent.run(ctx, wflw, events)

default:
reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
events.RecordEvent(ctx, reject)
return nil
}
}
Loading

0 comments on commit 54a6f1c

Please sign in to comment.