Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged rover.proto into workflow.proto #7

Merged
merged 1 commit into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ proto-gen:
protoc protos/target/target.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc protos/workflow/workflow.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc protos/hardware/hardware.proto --go_out=plugins=grpc:$(GOPATH)/src
protoc -I$(GOPATH)/src --go_out=plugins=grpc:$(GOPATH)/src $(GOPATH)/src/github.com/packethost/rover/protos/rover/rover.proto

run: ${binaries}
docker-compose up -d --build db
Expand Down
21 changes: 10 additions & 11 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/docker/distribution/reference"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
pb "github.com/packethost/rover/protos/workflow"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -367,7 +366,7 @@ func UpdateWorkflow(ctx context.Context, db *sql.DB, wf Workflow, state int32) e
return nil
}

func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *workflowpb.WorkflowContext) error {
func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *pb.WorkflowContext) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
Expand All @@ -393,7 +392,7 @@ func UpdateWorkflowState(ctx context.Context, db *sql.DB, wfContext *workflowpb.
return nil
}

func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*workflowpb.WorkflowContext, error) {
func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowContext, error) {
query := `
SELECT current_worker, current_task_name, current_action_name, current_action_index, current_action_state, total_number_of_actions
FROM workflow_state
Expand All @@ -403,10 +402,10 @@ func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*workflo
row := db.QueryRowContext(ctx, query, wfId)
var cw, ct, ca string
var cai, tact int64
var cas workflowpb.ActionState
var cas pb.ActionState
err := row.Scan(&cw, &ct, &ca, &cai, &cas, &tact)
if err == nil {
return &workflowpb.WorkflowContext{
return &pb.WorkflowContext{
WorkflowId: wfId,
CurrentWorker: cw,
CurrentTask: ct,
Expand All @@ -421,7 +420,7 @@ func GetWorkflowContexts(ctx context.Context, db *sql.DB, wfId string) (*workflo
} else {
err = nil
}
return &workflowpb.WorkflowContext{}, nil
return &pb.WorkflowContext{}, nil
}

func GetWorkflowActions(ctx context.Context, db *sql.DB, wfId string) (*pb.WorkflowActionList, error) {
Expand Down Expand Up @@ -449,7 +448,7 @@ func GetWorkflowActions(ctx context.Context, db *sql.DB, wfId string) (*pb.Workf
return &pb.WorkflowActionList{}, nil
}

func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *workflowpb.WorkflowActionStatus, time time.Time) error {
func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *pb.WorkflowActionStatus, time time.Time) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
Expand All @@ -473,7 +472,7 @@ func InsertIntoWorkflowEventTable(ctx context.Context, db *sql.DB, wfEvent *work
}

// ShowWorkflowEvents returns all workflows
func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs workflowpb.WorkflowActionStatus) error) error {
func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs pb.WorkflowActionStatus) error) error {
rows, err := db.Query(`
SELECT worker_id, task_name, action_name, execution_time, message, status, created_at
FROM workflow_event
Expand Down Expand Up @@ -503,13 +502,13 @@ func ShowWorkflowEvents(db *sql.DB, wfId string, fn func(wfs workflowpb.Workflow
return err
}
createdAt, _ := ptypes.TimestampProto(evTime)
wfs := workflowpb.WorkflowActionStatus{
wfs := pb.WorkflowActionStatus{
WorkerId: id,
TaskName: tName,
ActionName: aName,
Seconds: secs,
Message: msg,
ActionStatus: workflowpb.ActionState(status),
ActionStatus: pb.ActionState(status),
CreatedAt: createdAt,
}
err = fn(wfs)
Expand Down
16 changes: 7 additions & 9 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import (
"fmt"
"time"

empty "github.com/golang/protobuf/ptypes/empty"
"github.com/packethost/rover/db"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
pb "github.com/packethost/rover/protos/workflow"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -24,7 +22,7 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
return nil, status.Errorf(codes.InvalidArgument, "Worker not found for any workflows")
}

wfContexts := []*workflowpb.WorkflowContext{}
wfContexts := []*pb.WorkflowContext{}

for _, wf := range wfs {
wfContext, err := db.GetWorkflowContexts(context, sdb, wf)
Expand Down Expand Up @@ -53,7 +51,7 @@ func GetWorkflowActions(context context.Context, req *pb.WorkflowActionsRequest,
}

// ReportActionStatus implements rover.ReportActionStatus
func ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionStatus, sdb *sql.DB) (*empty.Empty, error) {
func ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus, sdb *sql.DB) (*pb.Empty, error) {
wfID := req.GetWorkflowId()
if len(wfID) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "workflow_id is invalid")
Expand All @@ -77,7 +75,7 @@ func ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionS
// We need bunch of checks here considering
// Considering concurrency and network latencies & accuracy for proceeding of WF
actionIndex := wfContext.GetCurrentActionIndex()
if req.GetActionStatus() == workflowpb.ActionState_ACTION_IN_PROGRESS {
if req.GetActionStatus() == pb.ActionState_ACTION_IN_PROGRESS {
if wfContext.GetCurrentAction() != "" {
actionIndex = actionIndex + 1
}
Expand All @@ -96,14 +94,14 @@ func ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionS
wfContext.CurrentActionIndex = actionIndex
err = db.UpdateWorkflowState(context, sdb, wfContext)
if err != nil {
return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_state table. Error : %s", err)
return &pb.Empty{}, fmt.Errorf("Failed to update the workflow_state table. Error : %s", err)
}
// TODO the below "time" would be a part of the request which is coming form worker.
time := time.Now()
err = db.InsertIntoWorkflowEventTable(context, sdb, req, time)
if err != nil {
return &empty.Empty{}, fmt.Errorf("Failed to update the workflow_event table. Error : %s", err)
return &pb.Empty{}, fmt.Errorf("Failed to update the workflow_event table. Error : %s", err)
}
fmt.Printf("Current context %s\n", wfContext)
return &empty.Empty{}, nil
return &pb.Empty{}, nil
}
2 changes: 0 additions & 2 deletions grpc-server/grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/packethost/rover/db"
"github.com/packethost/rover/metrics"
"github.com/packethost/rover/protos/hardware"
"github.com/packethost/rover/protos/rover"
"github.com/packethost/rover/protos/target"
"github.com/packethost/rover/protos/template"
"github.com/packethost/rover/protos/workflow"
Expand Down Expand Up @@ -74,7 +73,6 @@ func SetupGRPC(ctx context.Context, log log.Logger, facility string, errCh chan<
target.RegisterTargetServer(s, server)
workflow.RegisterWorkflowSvcServer(s, server)
hardware.RegisterHardwareServiceServer(s, server)
rover.RegisterRoverServer(s, server)

grpc_prometheus.Register(s)

Expand Down
6 changes: 2 additions & 4 deletions grpc-server/rover.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package grpcserver
import (
"context"

"github.com/golang/protobuf/ptypes/empty"
exec "github.com/packethost/rover/executor"
pb "github.com/packethost/rover/protos/rover"
workflowpb "github.com/packethost/rover/protos/workflow"
pb "github.com/packethost/rover/protos/workflow"
)

// GetWorkflowContexts implements rover.GetWorkflowContexts
Expand All @@ -20,6 +18,6 @@ func (s *server) GetWorkflowActions(context context.Context, req *pb.WorkflowAct
}

// ReportActionStatus implements rover.ReportActionStatus
func (s *server) ReportActionStatus(context context.Context, req *workflowpb.WorkflowActionStatus) (*empty.Empty, error) {
func (s *server) ReportActionStatus(context context.Context, req *pb.WorkflowActionStatus) (*pb.Empty, error) {
return exec.ReportActionStatus(context, req, s.db)
}
Loading