Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show protos some love #345

Merged
merged 16 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
root = true

[*]
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true
indent_style = space
indent_size = 2
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true

[{Makefile,go.mod,go.sum,*.go,.gitmodules,*.sh}]
indent_style = tab
indent_size = 8
indent_style = tab

[*.md]
indent_size = 4
35 changes: 35 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## Hello Contributors!

Thx for your interest!
We're so glad you're here.

### Important Resources

#### bugs: [https://github.com/tinkerbell/tink/issues](https://github.com/tinkerbell/tink/issues)

### Code of Conduct

Available via [https://github.com/tinkerbell/tink/blob/master/.github/CODE_OF_CONDUCT.md](https://github.com/tinkerbell/tink/blob/master/.github/CODE_OF_CONDUCT.md)

### Environment Details

[https://github.com/tinkerbell/tink/blob/master/Makefile](https://github.com/tinkerbell/tink/blob/master/Makefile)

### How to Submit Change Requests

Please submit change requests and / or features via [Issues](https://github.com/tinkerbell/tink/issues).
There's no guarantee it'll be changed, but you never know until you try.
We'll try to add comments as soon as possible, though.

### How to Report a Bug

Bugs are problems in code, in the functionality of an application or in its UI design; you can submit them through [Issues](https://github.com/tinkerbell/tink/issues).

## Code Style Guides

#### Protobuf

Please ensure protobuf related files are generated along with _any_ change to a protobuf file.
CI will enforce this, but its best to commit the generated files along with the protobuf changes in the same commit.
Handling of protobuf deps and generating the go files are both handled by the [protoc.sh](./protos/protoc.sh) script.
Both go & protoc are required by protoc.sh, these are both installed and used if using nix-shell.
12 changes: 6 additions & 6 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (

// gRPC clients
var (
TemplateClient template.TemplateClient
WorkflowClient workflow.WorkflowSvcClient
TemplateClient template.TemplateServiceClient
WorkflowClient workflow.WorkflowServiceClient
HardwareClient hardware.HardwareServiceClient
)

Expand Down Expand Up @@ -63,8 +63,8 @@ func Setup() error {
if err != nil {
return err
}
TemplateClient = template.NewTemplateClient(conn)
WorkflowClient = workflow.NewWorkflowSvcClient(conn)
TemplateClient = template.NewTemplateServiceClient(conn)
WorkflowClient = workflow.NewWorkflowServiceClient(conn)
HardwareClient = hardware.NewHardwareServiceClient(conn)
return nil
}
Expand All @@ -79,10 +79,10 @@ func TinkHardwareClient() (hardware.HardwareServiceClient, error) {
}

// TinkWorkflowClient creates a new workflow client
func TinkWorkflowClient() (workflow.WorkflowSvcClient, error) {
func TinkWorkflowClient() (workflow.WorkflowServiceClient, error) {
conn, err := GetConnection()
if err != nil {
log.Fatal(err)
}
return workflow.NewWorkflowSvcClient(conn), nil
return workflow.NewWorkflowServiceClient(conn), nil
}
2 changes: 1 addition & 1 deletion cmd/tink-cli/cmd/workflow/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var dataCmd = &cobra.Command{
},
Run: func(c *cobra.Command, args []string) {
for _, arg := range args {
req := &workflow.GetWorkflowDataRequest{WorkflowID: arg, Version: version}
req := &workflow.GetWorkflowDataRequest{WorkflowId: arg, Version: version}
var res *workflow.GetWorkflowDataResponse
var err error
if needsMetadata {
Expand Down
6 changes: 3 additions & 3 deletions cmd/tink-cli/cmd/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ var stateCmd = &cobra.Command{
},
}

func calWorkflowProgress(cur int64, total int64, state workflow.ActionState) string {
if total == 0 || (cur == 0 && state != workflow.ActionState_ACTION_SUCCESS) {
func calWorkflowProgress(cur int64, total int64, state workflow.State) string {
if total == 0 || (cur == 0 && state != workflow.State_STATE_SUCCESS) {
return "0%"
}
var taskCompleted int64
if state == workflow.ActionState_ACTION_SUCCESS {
if state == workflow.State_STATE_SUCCESS {
taskCompleted = cur + 1
} else {
taskCompleted = cur
Expand Down
2 changes: 1 addition & 1 deletion cmd/tink-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {
if err != nil {
return err
}
rClient := pb.NewWorkflowSvcClient(conn)
rClient := pb.NewWorkflowServiceClient(conn)

regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger)
worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize)
Expand Down
22 changes: 11 additions & 11 deletions cmd/tink-worker/internal/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func startContainer(ctx context.Context, l log.Logger, cli *client.Client, id st
return errors.Wrap(cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START")
}

func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.ActionState, error) {
func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.State, error) {
// Inspect whether the container is in running state
if _, err := cli.ContainerInspect(ctx, id); err != nil {
return pb.ActionState_ACTION_FAILED, nil
return pb.State_STATE_FAILED, nil
}

// send API call to wait for the container completion
Expand All @@ -63,32 +63,32 @@ func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.Actio
select {
case status := <-wait:
if status.StatusCode == 0 {
return pb.ActionState_ACTION_SUCCESS, nil
return pb.State_STATE_SUCCESS, nil
}
return pb.ActionState_ACTION_FAILED, nil
return pb.State_STATE_FAILED, nil
case err := <-errC:
return pb.ActionState_ACTION_FAILED, err
return pb.State_STATE_FAILED, err
case <-ctx.Done():
return pb.ActionState_ACTION_TIMEOUT, ctx.Err()
return pb.State_STATE_TIMEOUT, ctx.Err()
}
}

func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.ActionState) {
func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.State) {
// send API call to wait for the container completion
wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning)

select {
case status := <-wait:
if status.StatusCode == 0 {
failedActionStatus <- pb.ActionState_ACTION_SUCCESS
failedActionStatus <- pb.State_STATE_SUCCESS
}
failedActionStatus <- pb.ActionState_ACTION_FAILED
failedActionStatus <- pb.State_STATE_FAILED
case err := <-errC:
l.Error(err)
failedActionStatus <- pb.ActionState_ACTION_FAILED
failedActionStatus <- pb.State_STATE_FAILED
case <-ctx.Done():
l.Error(ctx.Err())
failedActionStatus <- pb.ActionState_ACTION_TIMEOUT
failedActionStatus <- pb.State_STATE_TIMEOUT
}
}

Expand Down
48 changes: 24 additions & 24 deletions cmd/tink-worker/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type WorkflowMetadata struct {

// Worker details provide all the context needed to run a
type Worker struct {
client pb.WorkflowSvcClient
client pb.WorkflowServiceClient
regConn *RegistryConnDetails
registryClient *client.Client
logger log.Logger
Expand All @@ -59,7 +59,7 @@ type Worker struct {
}

// NewWorker creates a new Worker, creating a new Docker registry client
func NewWorker(client pb.WorkflowSvcClient, regConn *RegistryConnDetails, logger log.Logger, registry string, retries int, retryInterval time.Duration, maxFileSize int64) *Worker {
func NewWorker(client pb.WorkflowServiceClient, regConn *RegistryConnDetails, logger log.Logger, registry string, retries int, retryInterval time.Duration, maxFileSize int64) *Worker {
registryClient, err := regConn.NewClient()
if err != nil {
panic(err)
Expand Down Expand Up @@ -94,16 +94,16 @@ func (w *Worker) captureLogs(ctx context.Context, id string) {
}
}

func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.ActionState, error) {
func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) {
l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage())

cli := w.registryClient
if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil {
return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER PULL")
}
id, err := w.createContainer(ctx, action.Command, wfID, action)
if err != nil {
return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER CREATE")
}
l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created")

Expand All @@ -119,10 +119,10 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc

err = startContainer(timeCtx, l, cli, id)
if err != nil {
return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER RUN")
}

failedActionStatus := make(chan pb.ActionState)
failedActionStatus := make(chan pb.State)

// capturing logs of action container in a go-routine
go w.captureLogs(ctx, id)
Expand All @@ -139,14 +139,14 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
}

l.With("status", status.String()).Info("container removed")
if status != pb.ActionState_ACTION_SUCCESS {
if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil {
if status != pb.State_STATE_SUCCESS {
if status == pb.State_STATE_TIMEOUT && action.OnTimeout != nil {
id, err = w.createContainer(ctx, action.OnTimeout, wfID, action)
if err != nil {
l.Error(errors.Wrap(err, errCreateContainer))
}
l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created")
failedActionStatus := make(chan pb.ActionState)
failedActionStatus := make(chan pb.State)
go w.captureLogs(ctx, id)
go waitFailedContainer(ctx, l, cli, id, failedActionStatus)
err = startContainer(ctx, l, cli, id)
Expand Down Expand Up @@ -208,15 +208,15 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
}
} else {
switch wfContext.GetCurrentActionState() {
case pb.ActionState_ACTION_SUCCESS:
case pb.State_STATE_SUCCESS:
if isLastAction(wfContext, actions) {
continue
}
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()+1]
actionIndex = int(wfContext.GetCurrentActionIndex()) + 1
case pb.ActionState_ACTION_FAILED:
case pb.State_STATE_FAILED:
continue
case pb.ActionState_ACTION_TIMEOUT:
case pb.State_STATE_TIMEOUT:
continue
default:
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()]
Expand Down Expand Up @@ -269,12 +269,12 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
l := l.With("actionName", action.GetName(),
"taskName", action.GetTaskName(),
)
if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_IN_PROGRESS {
if wfContext.GetCurrentActionState() != pb.State_STATE_RUNNING {
actionStatus := &pb.WorkflowActionStatus{
WorkflowId: wfID,
TaskName: action.GetTaskName(),
ActionName: action.GetName(),
ActionStatus: pb.ActionState_ACTION_IN_PROGRESS,
ActionStatus: pb.State_STATE_RUNNING,
Seconds: 0,
Message: "Started execution",
WorkerId: action.GetWorkerId(),
Expand Down Expand Up @@ -303,11 +303,11 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
WorkerId: action.GetWorkerId(),
}

if err != nil || status != pb.ActionState_ACTION_SUCCESS {
if status == pb.ActionState_ACTION_TIMEOUT {
actionStatus.ActionStatus = pb.ActionState_ACTION_TIMEOUT
if err != nil || status != pb.State_STATE_SUCCESS {
if status == pb.State_STATE_TIMEOUT {
actionStatus.ActionStatus = pb.State_STATE_TIMEOUT
} else {
actionStatus.ActionStatus = pb.ActionState_ACTION_FAILED
actionStatus.ActionStatus = pb.State_STATE_FAILED
}
l.With("actionStatus", actionStatus.ActionStatus.String())
l.Error(err)
Expand All @@ -318,7 +318,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
return err
}

actionStatus.ActionStatus = pb.ActionState_ACTION_SUCCESS
actionStatus.ActionStatus = pb.State_STATE_SUCCESS
actionStatus.Message = "finished execution successfully"

err = w.reportActionStatus(ctx, actionStatus)
Expand Down Expand Up @@ -382,11 +382,11 @@ func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.Workfl
return err
}

func getWorkflowData(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, workerID, workflowID string) {
func getWorkflowData(ctx context.Context, logger log.Logger, client pb.WorkflowServiceClient, workerID, workflowID string) {
l := logger.With("workflowID", workflowID,
"workerID", workerID,
)
res, err := client.GetWorkflowData(ctx, &pb.GetWorkflowDataRequest{WorkflowID: workflowID})
res, err := client.GetWorkflowData(ctx, &pb.GetWorkflowDataRequest{WorkflowId: workflowID})
if err != nil {
l.Error(err)
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.Workfl
}
}

func sendUpdate(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) {
func sendUpdate(ctx context.Context, logger log.Logger, client pb.WorkflowServiceClient, st *pb.WorkflowActionStatus, data []byte, checksum string) {
l := logger.With("workflowID", st.GetWorkflowId,
"workerID", st.GetWorkerId(),
"actionName", st.GetActionName(),
Expand All @@ -456,7 +456,7 @@ func sendUpdate(ctx context.Context, logger log.Logger, client pb.WorkflowSvcCli
}

_, err = client.UpdateWorkflowData(ctx, &pb.UpdateWorkflowDataRequest{
WorkflowID: st.GetWorkflowId(),
WorkflowId: st.GetWorkflowId(),
Data: data,
Metadata: metadata,
})
Expand Down
Loading