From 6f4cc161df6b5f2276d47360e9776e832386159a Mon Sep 17 00:00:00 2001 From: Sascha Grunert Date: Tue, 8 Oct 2024 13:04:49 +0200 Subject: [PATCH] crictl exec: add `--quiet/-q`, `--ignore-error/-e` and `--parallel` flags The flags can be used to further manipulate on the exec behavior. Follow-up on: https://github.com/kubernetes-sigs/cri-tools/pull/1603#issuecomment-2396328240 Signed-off-by: Sascha Grunert --- cmd/crictl/exec.go | 82 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/cmd/crictl/exec.go b/cmd/crictl/exec.go index f5affa0d85..565ec6e631 100644 --- a/cmd/crictl/exec.go +++ b/cmd/crictl/exec.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "net/url" + "runtime" + "sync" "time" mobyterm "github.com/moby/term" @@ -109,6 +111,20 @@ var runtimeExecCommand = &cli.Command{ Aliases: []string{"n"}, Usage: "Exec command for all last n containers, set to 0 for unlimited", }, + &cli.BoolFlag{ + Name: "quiet", + Aliases: []string{"q"}, + Usage: "Do not print the container ID if multiple containers are selected", + }, + &cli.BoolFlag{ + Name: "ignore-errors", + Aliases: []string{"e"}, + Usage: "Ignore errors during command execution", + }, + &cli.BoolFlag{ + Name: "parallel", + Usage: "Run the command in parallel if multiple containers are selected", + }, }, Action: func(c *cli.Context) error { if c.NArg() < 1 { @@ -124,6 +140,7 @@ var runtimeExecCommand = &cli.Command{ ids := []string{c.Args().First()} cmd := c.Args().Slice()[1:] outputContainerID := false + quiet := c.Bool("quiet") // If any of the filter flags are set, then we assume that no // CONTAINER-ID is provided as CLI parameter. @@ -137,7 +154,7 @@ var runtimeExecCommand = &cli.Command{ ids = []string{} cmd = c.Args().Slice() - outputContainerID = true + outputContainerID = !quiet && !c.Bool("parallel") opts := &listOptions{ nameRegexp: c.String("name"), @@ -178,37 +195,74 @@ var runtimeExecCommand = &cli.Command{ transport: c.String(transportFlag), } - for _, id := range ids { - opts.id = id + maxParallel := 1 + if c.Bool("parallel") { + maxParallel = runtime.NumCPU() + } + + results := mapParallel(ids, maxParallel, func(id string) error { + optsCopy := *&opts + optsCopy.id = id if outputContainerID { fmt.Println(id + ":") } - if c.Bool("sync") { - exitCode, err := ExecSync(runtimeClient, opts) + exitCode, err := ExecSync(runtimeClient, optsCopy) if err != nil { - return fmt.Errorf("execing command in container synchronously: %w", err) + return fmt.Errorf("execing command in container %s synchronously: %w", id, err) } if exitCode != 0 { return cli.Exit("non-zero exit code", exitCode) } - continue + } else { + ctx, cancel := context.WithCancel(c.Context) + defer cancel() + err = Exec(ctx, runtimeClient, optsCopy) + if err != nil { + return fmt.Errorf("execing command in container %s: %w", id, err) + } } - ctx, cancel := context.WithCancel(c.Context) - defer cancel() - err = Exec(ctx, runtimeClient, opts) - if err != nil { - return fmt.Errorf("execing command in container: %w", err) - } + return nil + }) + + errs := errors.Join(results...) + if c.Bool("ignore-errors") { + if !quiet { + logrus.Warnf("Ignoring errors: %v", errs) + } + return nil } - return nil + return errs }, } +func mapParallel[T1 any, T2 any](input []T1, maxParallel int, fn func(T1) T2) []T2 { + wg := &sync.WaitGroup{} + wg.Add(len(input)) + + results := make([]T2, len(input)) + maxParallelChan := make(chan struct{}, maxParallel) + + for i := range input { + maxParallelChan <- struct{}{} + go func(index int, x T1) { + defer wg.Done() + + result := fn(x) + results[index] = result + + <-maxParallelChan + }(i, input[i]) + } + + wg.Wait() + return results +} + // ExecSync sends an ExecSyncRequest to the server, and parses // the returned ExecSyncResponse. The function returns the corresponding exit // code beside an general error.