Skip to content

Commit

Permalink
A task in a workflow can now have volumes.
Browse files Browse the repository at this point in the history
The intention here is to specify all the volumes at task level, that are common for all
actions within that task. The volumes at task level will be mounted in all actions. However,
if a user has the same mount at action level, it will get the higher preference and will
override the one at task level.
  • Loading branch information
Gaurav Gahlot committed Feb 13, 2020
1 parent dccdaca commit 0823e69
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"strconv"
"strings"
"time"

"github.com/docker/distribution/reference"
Expand All @@ -34,16 +35,17 @@ type (
Name string `yaml:"name"`
WorkerAddr string `yaml:"worker"`
Actions []action `yaml:"actions"`
Volumes []string `yaml:"volumes"`
}

// Action is the basic executional unit for a workflow
action struct {
Name string `yaml:"name"`
Image string `yaml:"image"`
Timeout int64 `yaml:"timeout"`
Command []string `yaml:"command"`
OnTimeout []string `yaml:"on-timeout"`
OnFailure []string `yaml:"on-failure"`
Name string `yaml:"name"`
Image string `yaml:"image"`
Timeout int64 `yaml:"timeout"`
Command []string `yaml:"command"`
OnTimeout []string `yaml:"on-timeout"`
OnFailure []string `yaml:"on-failure"`
Volumes []string `yaml:"volumes,omitempty"`
Environment map[string]string `yaml:"environment,omitempty"`
}
Expand Down Expand Up @@ -132,6 +134,12 @@ func insertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
var actionList []pb.WorkflowAction
var uniqueWorkerID uuid.UUID
for _, task := range wfymldata.Tasks {
taskVolumes := map[string][]string{}
for _, vol := range task.Volumes {
v := strings.Split(vol, ":")
taskVolumes[v[0]] = v[1:]
}

workerID, err := getWorkerID(ctx, db, task.WorkerAddr)
if err != nil {
return err
Expand All @@ -154,6 +162,24 @@ func insertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
for key, val := range ac.Environment {
envs = append(envs, key+"="+val)
}

acVolumes := map[string]string{}
for _, vol := range ac.Volumes {
v := strings.Split(vol, ":")
acVolumes[v[0]] = strings.Join(v[1:], ":")
}

for k, v := range taskVolumes {
if _, ok := acVolumes[k]; !ok {
acVolumes[k] = strings.Join(v, ":")
}
}

volumes := []string{}
for k, v := range acVolumes {
volumes = append(volumes, k+":"+v)
}

action := pb.WorkflowAction{
TaskName: task.Name,
WorkerId: workerUID.String(),
Expand All @@ -164,7 +190,7 @@ func insertActionList(ctx context.Context, db *sql.DB, yamlData string, id uuid.
OnTimeout: ac.OnTimeout,
OnFailure: ac.OnFailure,
Environment: envs,
Volumes: ac.Volumes,
Volumes: volumes,
}
actionList = append(actionList, action)
}
Expand Down

0 comments on commit 0823e69

Please sign in to comment.