Skip to content

Commit

Permalink
worker: add labels
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiro Suda <[email protected]>
  • Loading branch information
AkihiroSuda committed Dec 20, 2017
1 parent 621596d commit 8456e32
Show file tree
Hide file tree
Showing 18 changed files with 1,102 additions and 176 deletions.
736 changes: 668 additions & 68 deletions api/services/control/control.pb.go

Large diffs are not rendered by default.

18 changes: 16 additions & 2 deletions api/services/control/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ service Control {
rpc Solve(SolveRequest) returns (SolveResponse);
rpc Status(StatusRequest) returns (stream StatusResponse);
rpc Session(stream BytesMessage) returns (stream BytesMessage);
rpc ListWorkers(ListWorkersRequest) returns (ListWorkersResponse);
}

message DiskUsageRequest {
string filter = 1;
string filter = 1; // FIXME: this should be containerd-compatible repeated string?
}

message DiskUsageResponse {
Expand Down Expand Up @@ -98,4 +99,17 @@ message VertexLog {

message BytesMessage {
bytes data = 1;
}
}

message ListWorkersRequest {
repeated string filter = 1; // containerd style
}

message ListWorkersResponse {
repeated WorkerRecord record = 1;
}

message WorkerRecord {
string ID = 1;
map<string, string> Labels = 2;
}
49 changes: 49 additions & 0 deletions client/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package client

import (
"context"

controlapi "github.com/moby/buildkit/api/services/control"
"github.com/pkg/errors"
)

type WorkerInfo struct {
ID string
Labels map[string]string
}

func (c *Client) ListWorkers(ctx context.Context, opts ...ListWorkersOption) ([]*WorkerInfo, error) {
info := &ListWorkersInfo{}
for _, o := range opts {
o(info)
}

req := &controlapi.ListWorkersRequest{Filter: info.Filter}
resp, err := c.controlClient().ListWorkers(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "failed to list workers")
}

var wi []*WorkerInfo

for _, w := range resp.Record {
wi = append(wi, &WorkerInfo{
ID: w.ID,
Labels: w.Labels,
})
}

return wi, nil
}

type ListWorkersOption func(*ListWorkersInfo)

type ListWorkersInfo struct {
Filter []string
}

func WithWorkerFilter(f []string) ListWorkersOption {
return func(wi *ListWorkersInfo) {
wi.Filter = f
}
}
1 change: 1 addition & 0 deletions cmd/buildctl/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ var debugCommand = cli.Command{
Subcommands: []cli.Command{
debug.DumpLLBCommand,
debug.DumpMetadataCommand,
debug.WorkersCommand,
},
}
87 changes: 87 additions & 0 deletions cmd/buildctl/debug/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package debug

import (
"fmt"
"os"
"sort"
"text/tabwriter"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/appcontext"
"github.com/urfave/cli"
)

var WorkersCommand = cli.Command{
Name: "workers",
Usage: "list workers",
Action: listWorkers,
Flags: []cli.Flag{
cli.StringSliceFlag{
Name: "filter, f",
Usage: "containerd-style filter string slice",
},
cli.BoolFlag{
Name: "verbose, v",
Usage: "Verbose output",
},
},
}

func resolveClient(c *cli.Context) (*client.Client, error) {
return client.New(c.GlobalString("addr"), client.WithBlock())
}

func listWorkers(clicontext *cli.Context) error {
c, err := resolveClient(clicontext)
if err != nil {
return err
}

workers, err := c.ListWorkers(appcontext.Context(), client.WithWorkerFilter(clicontext.StringSlice("filter")))
if err != nil {
return err
}
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)

if clicontext.Bool("verbose") {
printWorkersVerbose(tw, workers)
} else {
printWorkersTable(tw, workers)
}
return nil
}

func printWorkersVerbose(tw *tabwriter.Writer, winfo []*client.WorkerInfo) {
for _, wi := range winfo {
fmt.Fprintf(tw, "ID:\t%s\n", wi.ID)
fmt.Fprintf(tw, "Labels:\n")
for _, kv := range sortMap(wi.Labels) {
fmt.Fprintf(tw, "\t%s:\t%s\n", kv[0], kv[1])
}
fmt.Fprintf(tw, "\n")
}

tw.Flush()
}

func printWorkersTable(tw *tabwriter.Writer, winfo []*client.WorkerInfo) {
fmt.Fprintln(tw, "ID")

for _, wi := range winfo {
id := wi.ID
fmt.Fprintf(tw, "%s\n", id)
}

tw.Flush()
}

func sortMap(m map[string]string) [][2]string {
var s [][2]string
for k, v := range m {
s = append(s, [2]string{k, v})
}
sort.Slice(s, func(i, j int) bool {
return s[i][0] < s[j][0]
})
return s
}
19 changes: 16 additions & 3 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,27 +256,40 @@ func newController(c *cli.Context, root string) (*control.Controller, error) {

func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) {
wc := &worker.Controller{}
nWorkers := 0
for _, wi := range workerInitializers {
ws, err := wi.fn(c, wiOpt)
if err != nil {
return nil, err
}
for _, w := range ws {
logrus.Infof("found worker %q", w.Name())
logrus.Infof("found worker %q, labels=%v", w.ID(), w.Labels())
if err = wc.Add(w); err != nil {
return nil, err
}
nWorkers++
}
}
nWorkers := len(wc.GetAll())
if nWorkers == 0 {
return nil, errors.New("no worker found, rebuild the buildkit daemon?")
}
defaultWorker, err := wc.GetDefault()
if err != nil {
return nil, err
}
logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.Name)
logrus.Infof("found %d workers, default=%q", nWorkers, defaultWorker.ID())
logrus.Warn("currently, only the default worker can be used.")
return wc, nil
}

func attrMap(sl []string) (map[string]string, error) {
m := map[string]string{}
for _, v := range sl {
parts := strings.SplitN(v, "=", 2)
if len(parts) != 2 {
return nil, errors.Errorf("invalid value %s", v)
}
m[parts[0]] = parts[1]
}
return m, nil
}
13 changes: 11 additions & 2 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func init() {
Name: "containerd-worker-addr",
Usage: "containerd socket",
Value: "/run/containerd/containerd.sock",
})
},
cli.StringSliceFlag{
Name: "containerd-worker-labels",
Usage: "user-specific annotation labels (com.example.foo=bar)",
},
)
// TODO(AkihiroSuda): allow using multiple snapshotters. should be useful for some applications that does not work with the default overlay snapshotter. e.g. mysql (docker/for-linux#72)",
}

Expand All @@ -43,7 +48,11 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
if (boolOrAuto == nil && !validContainerdSocket(socket)) || (boolOrAuto != nil && !*boolOrAuto) {
return nil, nil
}
opt, err := containerd.NewWorkerOpt(common.root, socket, ctd.DefaultSnapshotter)
labels, err := attrMap(c.GlobalStringSlice("containerd-worker-labels"))
if err != nil {
return nil, err
}
opt, err := containerd.NewWorkerOpt(common.root, socket, ctd.DefaultSnapshotter, labels)
if err != nil {
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ func init() {
Name: "oci-worker",
Usage: "enable oci workers (true/false/auto)",
Value: "auto",
})
},
cli.StringSliceFlag{
Name: "oci-worker-labels",
Usage: "user-specific annotation labels (com.example.foo=bar)",
},
)
// TODO: allow multiple oci runtimes and snapshotters
}

Expand All @@ -34,7 +39,11 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
if (boolOrAuto == nil && !validOCIBinary()) || (boolOrAuto != nil && !*boolOrAuto) {
return nil, nil
}
opt, err := runc.NewWorkerOpt(common.root)
labels, err := attrMap(c.GlobalStringSlice("oci-worker-labels"))
if err != nil {
return nil, err
}
opt, err := runc.NewWorkerOpt(common.root, labels)
if err != nil {
return nil, err
}
Expand Down
21 changes: 20 additions & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func (c *Controller) Register(server *grpc.Server) error {

func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
resp := &controlapi.DiskUsageResponse{}
for _, w := range c.opt.WorkerController.GetAll() {
workers, err := c.opt.WorkerController.List()
if err != nil {
return nil, err
}
for _, w := range workers {
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
Filter: r.Filter,
})
Expand Down Expand Up @@ -201,3 +205,18 @@ func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
logrus.Debugf("session finished: %v", err)
return err
}

func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersRequest) (*controlapi.ListWorkersResponse, error) {
resp := &controlapi.ListWorkersResponse{}
workers, err := c.opt.WorkerController.List(r.Filter...)
if err != nil {
return nil, err
}
for _, w := range workers {
resp.Record = append(resp.Record, &controlapi.WorkerRecord{
ID: w.ID(),
Labels: w.Labels(),
})
}
return resp, nil
}
Loading

0 comments on commit 8456e32

Please sign in to comment.