Skip to content

Commit

Permalink
Merge pull request #734 from chrisdoherty4/feature/transport
Browse files Browse the repository at this point in the history
Add transport implementation for agent
  • Loading branch information
mergify[bot] authored Jun 24, 2023
2 parents 29052da + d857726 commit 9a45f7b
Show file tree
Hide file tree
Showing 25 changed files with 2,648 additions and 301 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ KUSTOMIZE := $(GO) run sigs.k8s.io/kustomize/kustomize/[email protected]
SETUP_ENVTEST := $(GO) run sigs.k8s.io/controller-runtime/tools/[email protected]
GOLANGCI_LINT := $(GO) run github.com/golangci/golangci-lint/cmd/[email protected]
YAMLFMT := $(GO) run github.com/google/yamlfmt/cmd/[email protected]
MOQ := $(GO) run github.com/matryer/[email protected]

# Installed tools
PROTOC_GEN_GO_GRPC := google.golang.org/grpc/cmd/[email protected]
Expand Down Expand Up @@ -94,11 +95,18 @@ e2e-test: ## Run e2e tests
$(SETUP_ENVTEST) use
source <($(SETUP_ENVTEST) use -p env) && $(GO) test -v ./internal/e2e/... -tags=e2e

mocks:
$(MOQ) -fmt goimpots -rm -out ./internal/proto/workflow/v2/mock.go ./internal/proto/workflow/v2 WorkflowServiceClient WorkflowService_GetWorkflowsClient
$(MOQ) -fmt goimports -rm -out ./internal/agent/transport/mock.go ./internal/agent/transport WorkflowHandler
$(MOQ) -fmt goimports -rm -out ./internal/agent/mock.go ./internal/agent Transport ContainerRuntime
$(MOQ) -fmt goimports -rm -out ./internal/agent/event/mock.go ./internal/agent/event Recorder

.PHONY: generate-proto
generate-proto: buf.gen.yaml buf.lock $(shell git ls-files '**/*.proto') _protoc
$(BUF) mod update
$(BUF) generate
$(GOFUMPT) -w internal/proto/*.pb.*
$(GOFUMPT) -w internal/proto/workflow/v2/*.pb.*

.PHONY: generate
generate: generate-proto generate-go generate-manifests ## Generate code, manifests etc.
Expand Down Expand Up @@ -241,4 +249,4 @@ yamllint: $(YAMLLINT_BIN)
.PHONY: _protoc ## Install all required tools for use with this Makefile.
_protoc:
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-logr/zerologr v1.2.3
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kr/pretty v0.3.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/opencontainers/image-spec v1.1.0-rc.3
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
Expand All @@ -94,6 +96,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -692,6 +693,7 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -740,7 +742,9 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
Expand Down
82 changes: 71 additions & 11 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"context"
"errors"
"sync"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
Expand All @@ -27,7 +28,12 @@ type Agent struct {
// Runtime is the container runtime used to execute workflow actions.
Runtime ContainerRuntime

// sem ensure we handle a single workflow at a time.
sem chan struct{}

// executionContext tracks the currently executing workflow.
executionContext *executionContext
mtx sync.RWMutex
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
Expand All @@ -43,40 +49,94 @@ func (agent *Agent) Start(ctx context.Context) error {
}

if agent.Runtime == nil {
//nolint:stylecheck // Specifying field on data structure
//nolint:stylecheck // Runtime is a field of agent.
return errors.New("Runtime field must be set before calling Start()")
}

if agent.Log.GetSink() == nil {
agent.Log = logr.Discard()
}

agent.Log = agent.Log.WithValues("agent_id", agent.ID)

// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time.
agent.sem = make(chan struct{}, 1)
agent.sem <- struct{}{}

agent.Log.Info("Starting agent")
return agent.Transport.Start(ctx, agent.ID, agent)
}

// HandleWorkflow satisfies transport.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
// sem isn't protected by a synchronization data structure so this is technically invoking
// undefined behavior - consider this a best effort to ensuring Start() has been called.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) {
if agent.sem == nil {
return errors.New("agent must have Start() called before calling HandleWorkflow()")
agent.Log.Info("Agent must have Start() called before calling HandleWorkflow()")
}

select {
case <-agent.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()
return agent.run(ctx, wflw, events)
// Ensure we configure the current workflow and cancellation func before we launch the
// goroutine to avoid a race with CancelWorkflow.
agent.mtx.Lock()
defer agent.mtx.Unlock()

ctx, cancel := context.WithCancel(ctx)
agent.executionContext = &executionContext{
Workflow: wflw,
Cancel: cancel,
}

go func() {
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()

agent.run(ctx, wflw, events)

// Nilify the execution context after running so cancellation requests are ignored.
agent.mtx.Lock()
defer agent.mtx.Unlock()
agent.executionContext = nil
}()

default:
log := agent.Log.WithValues("workflow_id", wflw.ID)

reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
events.RecordEvent(ctx, reject)
return nil

if err := events.RecordEvent(ctx, reject); err != nil {
log.Error(err, "Failed to record workflow rejection event")
return
}

log.Info("Workflow already executing; dropping request")
}
}

func (agent *Agent) CancelWorkflow(workflowID string) {
agent.mtx.RLock()
defer agent.mtx.RUnlock()

if agent.executionContext == nil {
agent.Log.Info("No workflow running; ignoring cancellation request", "workflow_id", workflowID)
return
}

if agent.executionContext.Workflow.ID != workflowID {
agent.Log.Info(
"Incorrect workflow ID in cancellation request; ignoring cancellation request",
"workflow_id", workflowID,
"running_workflow_id", agent.executionContext.Workflow.ID,
)
return
}

agent.Log.Info("Cancel workflow", "workflow_id", workflowID)
agent.executionContext.Cancel()
}

type executionContext struct {
Workflow workflow.Workflow
Cancel context.CancelFunc
}
Loading

0 comments on commit 9a45f7b

Please sign in to comment.