Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdoherty4 committed May 17, 2023
1 parent f723a96 commit 77e83f2
Show file tree
Hide file tree
Showing 14 changed files with 820 additions and 161 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ KUSTOMIZE := $(GO) run sigs.k8s.io/kustomize/kustomize/[email protected]
SETUP_ENVTEST := $(GO) run sigs.k8s.io/controller-runtime/tools/[email protected]
GOLANGCI_LINT := $(GO) run github.com/golangci/golangci-lint/cmd/[email protected]
YAMLFMT := $(GO) run github.com/google/yamlfmt/cmd/[email protected]
MOQ := $(GO) run github.com/matryer/[email protected]

# Installed tools
PROTOC_GEN_GO_GRPC := google.golang.org/grpc/cmd/[email protected]
Expand Down Expand Up @@ -94,6 +95,11 @@ e2e-test: ## Run e2e tests
$(SETUP_ENVTEST) use
source <($(SETUP_ENVTEST) use -p env) && $(GO) test -v ./internal/e2e/... -tags=e2e

mocks:
$(MOQ) -fmt goimpots -rm -out ./internal/proto/workflow/v2/mock.go ./internal/proto/workflow/v2 WorkflowServiceClient WorkflowService_StreamWorkflowsClient
$(MOQ) -fmt goimports -rm -out ./internal/agent/transport/mock.go ./internal/agent/transport WorkflowHandler
$(MOQ) -fmt goimports -rm -out ./internal/agent/mock.go ./internal/agent Transport ContainerRuntime

.PHONY: generate-proto
generate-proto: buf.gen.yaml buf.lock $(shell git ls-files '**/*.proto') _protoc
$(BUF) mod update
Expand Down Expand Up @@ -242,4 +248,4 @@ yamllint: $(YAMLLINT_BIN)
.PHONY: _protoc ## Install all required tools for use with this Makefile.
_protoc:
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/go-logr/zerologr v1.2.3
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kr/pretty v0.3.1
github.com/onsi/ginkgo/v2 v2.9.4
github.com/onsi/gomega v1.27.6
github.com/opencontainers/image-spec v1.1.0-rc.3
Expand Down Expand Up @@ -77,6 +78,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -690,6 +691,7 @@ github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
95 changes: 71 additions & 24 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package agent
import (
"context"
"errors"
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
Expand All @@ -30,6 +30,10 @@ type Agent struct {

// sem ensure we handle a single workflow at a time.
sem chan struct{}

// executionContext tracks the currently executing workflow.
executionContext *executionContext
mtx sync.Mutex
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
Expand All @@ -45,7 +49,12 @@ func (agent *Agent) Start(ctx context.Context) error {
}

if agent.Runtime == nil {
return errors.New("agent.Runtime must be set before calling Start()")
//nolint:stylecheck // Runtime is a field of agent.
return errors.New("Runtime field must be set before calling Start()")
}

if agent.Log.GetSink() == nil {
agent.Log = logr.Discard()
}

agent.Log = agent.Log.WithValues("agent_id", agent.ID)
Expand All @@ -54,41 +63,79 @@ func (agent *Agent) Start(ctx context.Context) error {
agent.sem = make(chan struct{}, 1)
agent.sem <- struct{}{}

// Launch the transport ensuring we can recover any errors.
transportErr := make(chan error, 1)
go func() {
agent.Log.Info("Starting agent")
transportErr <- agent.Transport.Start(ctx, agent.ID, agent)
}()

select {
case err := <-transportErr:
return fmt.Errorf("transport: %w", err)
case <-ctx.Done():
return ctx.Err()
}
return agent.Transport.Start(ctx, agent.ID, agent)
}

// HandleWorkflow satisfies transport.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
// sem isn't protected by a synchronization data structure so this is technically invoking
// undefined behavior - consider this a best effort to ensuring Start() has been called.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) {
if agent.sem == nil {
return errors.New("agent must have Start() called before calling HandleWorkflow()")
agent.Log.Info("Agent must have Start() called before calling HandleWorkflow()")
}

select {
case <-agent.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()
return agent.run(ctx, wflw, events)
// Ensure we configure the current workflow and cancellation func before we launch the
// goroutine to avoid a race with CancelWorkflow.
agent.mtx.Lock()
defer agent.mtx.Unlock()

ctx, cancel := context.WithCancel(ctx)
agent.executionContext = &executionContext{
Workflow: wflw,
Cancel: cancel,
}

go func() {
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()

if err := agent.run(ctx, wflw, events); err != nil {
// TODO(chrisdoherty4) An error indicates something catastrophic happened and we need
// to signal termination of the agent.
_ = err
}

// Nilify the execution context after running so cancellation requests are ignored.
agent.mtx.Lock()
defer agent.mtx.Unlock()
agent.executionContext = nil
}()

default:
reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
events.RecordEvent(ctx, reject)
return nil

// TODO(chrisdoherty) Change event recording to return an error because if we can't record
// events we need to exit the agent as somethings wrong.
events.RecordEvent(context.Background(), reject)
}
}

func (agent *Agent) CancelWorkflow(workflowID string) {
agent.mtx.Lock()
defer agent.mtx.Unlock()

if agent.executionContext == nil {
agent.Log.Info("No workflow running; ignoring cancellation request", "workflow_id", workflowID)
return
}

if agent.executionContext.Workflow.ID != workflowID {
agent.Log.Info(
"Incorrect workflow ID in cancellation request; ignoring cancellation request",
"workflow_id", workflowID,
"running_workflow_id", agent.executionContext.Workflow.ID,
)
return
}

agent.Log.Info("Cancelling workflow", "workflow_id", workflowID)
agent.executionContext.Cancel()
}

type executionContext struct {
Workflow workflow.Workflow
Cancel context.CancelFunc
}
93 changes: 57 additions & 36 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/zapr"
Expand Down Expand Up @@ -59,6 +60,14 @@ func TestAgent_InvalidStart(t *testing.T) {
Runtime: runtime.Noop(),
},
},
{
Name: "NoLogger",
Agent: agent.Agent{
ID: "1234",
Transport: transport.Noop(),
Runtime: runtime.Noop(),
},
},
}

for _, tc := range cases {
Expand All @@ -76,16 +85,13 @@ func TestAgent_InvalidStart(t *testing.T) {
}
}

// The goal of this test is to ensure the agent rejects concurrent workflows.
func TestAgent_ConcurrentWorkflows(t *testing.T) {
// The goal of this test is to ensure the agent rejects concurrent workflows. We have to
// build a valid agent because it will also reject calls to HandleWorkflow without first
// starting the Agent.

logger := zapr.NewLogger(zap.Must(zap.NewDevelopment()))

recorder := event.NoopRecorder()
trnport := transport.Noop()

wrkflow := workflow.Workflow{
wflw := workflow.Workflow{
ID: "1234",
Actions: []workflow.Action{
{
Expand All @@ -96,14 +102,8 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) {
},
}

trnport := agent.TransportMock{
StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error {
return handler.HandleWorkflow(ctx, wrkflow, recorder)
},
}

// Started is used to indicate the runtime has received the workflow.
started := make(chan struct{})

rntime := agent.ContainerRuntimeMock{
RunFunc: func(ctx context.Context, action workflow.Action) error {
started <- struct{}{}
Expand All @@ -119,26 +119,28 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) {
ID: "1234",
}

// Build a cancellable context so we can tear the goroutine down.
// HandleWorkflow will reject us if we haven't started the agent first.
if err := agnt.Start(context.Background()); err != nil {
t.Fatal(err)
}

errs := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
// Build a cancellable context so we can tear everything down. The timeout is guesswork but
// this test shouldn't take long.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go func() { errs <- agnt.Start(ctx) }()
// Handle the first workflow and wait for it to start.
agnt.HandleWorkflow(ctx, wflw, recorder)

// Await either an error or the mock runtime to tell us its stated.
// Wait for the container runtime to start.
select {
case err := <-errs:
t.Fatalf("Unexpected error: %v", err)
case <-started:
case <-ctx.Done():
t.Fatal(ctx.Err())
}

// Attempt to fire off another workflow.
err := agnt.HandleWorkflow(context.Background(), wrkflow, recorder)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Attempt to fire off a second workflow.
agnt.HandleWorkflow(ctx, wflw, recorder)

// Ensure the latest event recorded is a event.WorkflowRejected.
calls := recorder.RecordEventCalls()
Expand All @@ -153,7 +155,7 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) {
}

expectEvent := event.WorkflowRejected{
ID: wrkflow.ID,
ID: wflw.ID,
Message: "workflow already in progress",
}
if !cmp.Equal(expectEvent, ev) {
Expand Down Expand Up @@ -407,13 +409,7 @@ message`,

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
recorder := event.NoopRecorder()

trnport := agent.TransportMock{
StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error {
return handler.HandleWorkflow(ctx, tc.Workflow, recorder)
},
}
trnport := transport.Noop()

rntime := agent.ContainerRuntimeMock{
RunFunc: func(ctx context.Context, action workflow.Action) error {
Expand All @@ -424,18 +420,43 @@ message`,
},
}

// The event recorder is what tells us the workflow has finished executing so we use it
// to check for the last expected action.
lastEventReceived := make(chan struct{})
recorder := event.RecorderMock{
RecordEventFunc: func(contextMoqParam context.Context, event event.Event) {
if cmp.Equal(event, tc.Events[len(tc.Events)-1]) {
lastEventReceived <- struct{}{}
}
},
}

// Create and start the agent as start is a prereq to calling HandleWorkflow().
agnt := agent.Agent{
Log: logger,
Transport: &trnport,
Runtime: &rntime,
ID: "1234",
}

err := agnt.Start(context.Background())
if err != nil {
if err := agnt.Start(context.Background()); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Configure a timeout of 5 seconds, this test shouldn't take long.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Handle the workflow
agnt.HandleWorkflow(ctx, tc.Workflow, &recorder)

// Wait for the last expected event or timeout.
select {
case <-lastEventReceived:
case <-ctx.Done():
t.Fatal(ctx.Err())
}

// Validate all events received are what we expected.
calls := recorder.RecordEventCalls()
if len(calls) != len(tc.Events) {
t.Fatalf("Expected %v events; Received %v\n%+v", len(tc.Events), len(calls), calls)
Expand Down
Loading

0 comments on commit 77e83f2

Please sign in to comment.