Skip to content
This repository has been archived by the owner on Jun 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #709 from MikaelSmith/windows-targets
Browse files Browse the repository at this point in the history
Support commands against Windows targets in external plugins
  • Loading branch information
MikaelSmith authored Feb 5, 2020
2 parents c6965fa + 50bab03 commit 8e36868
Show file tree
Hide file tree
Showing 16 changed files with 802 additions and 293 deletions.
247 changes: 161 additions & 86 deletions cmd/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"bufio"
"encoding/csv"
"fmt"
"os"
"strconv"
Expand All @@ -11,8 +12,10 @@ import (

"github.com/spf13/cobra"

"github.com/puppetlabs/wash/api/client"
apitypes "github.com/puppetlabs/wash/api/types"
cmdutil "github.com/puppetlabs/wash/cmd/util"
"github.com/puppetlabs/wash/plugin"
)

func psCommand() *cobra.Command {
Expand Down Expand Up @@ -52,7 +55,130 @@ func collectOutput(ch <-chan apitypes.ExecPacket) (string, error) {
return stdout, nil
}

// Get the list of processes separately from iterating over them. This avoids including'find' as
type psresult struct {
pid int
active time.Duration
command string
}

func formatStats(paths []string, results map[string][]psresult) string {
headers := []cmdutil.ColumnHeader{
{ShortName: "node", FullName: "NODE"},
{ShortName: "pid", FullName: "PID"},
{ShortName: "time", FullName: "TIME"},
{ShortName: "cmd", FullName: "COMMAND"},
}
var table [][]string
for _, path := range paths {
// Shorten path segments to probably-unique short strings, like `ku*s/do*p/de*t/pods/redis`.
for _, st := range results[path] {
segments := strings.Split(strings.Trim(path, "/"), "/")
for i, segment := range segments[:len(segments)-1] {
if len(segment) > 4 {
segments[i] = segment[:2] + "*" + segment[len(segment)-1:]
}
}

table = append(table, []string{
strings.Join(segments, "/"),
strconv.Itoa(st.pid),
cmdutil.FormatDuration(st.active),
st.command,
})
}
}
return cmdutil.NewTableWithHeaders(headers, table).Format()
}

func psMain(cmd *cobra.Command, args []string) exitCode {
var paths []string
if len(args) > 0 {
paths = args
} else {
cwd, err := os.Getwd()
if err != nil {
cmdutil.ErrPrintf("%v\n", err)
return exitCode{1}
}

paths = []string{cwd}
}

conn := cmdutil.NewClient()

results := make(map[string][]psresult)
// Prepulate the map so it doesn't change size while all the goroutines are adding data.
for _, path := range paths {
results[path] = []psresult{}
}

var wg sync.WaitGroup
wg.Add(len(paths))
for i, path := range paths {
go func(k string, idx int) {
defer wg.Done()

entry, err := conn.Info(k)
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
}
var loginShell plugin.Shell
if entry.Attributes.HasOS() {
loginShell = entry.Attributes.OS().LoginShell
}
if loginShell == plugin.UnknownShell {
// Assume posix if unknown
loginShell = plugin.POSIXShell
}

dispatcher := dispatchers[loginShell]
ch, err := dispatcher.execPS(conn, k)
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
return
}
out, err := collectOutput(ch)
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
return
}

results[k], err = dispatcher.parseOutput(out)
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
}
}(path, i)
}

wg.Wait()
cmdutil.Print(formatStats(paths, results))
return exitCode{0}
}

var dispatchers = []struct {
execPS func(client.Client, string) (<-chan apitypes.ExecPacket, error)
parseOutput func(string) ([]psresult, error)
}{
{}, // Unknown
{ // POSIX shell
execPS: func(conn client.Client, name string) (<-chan apitypes.ExecPacket, error) {
return conn.Exec(name, "sh", []string{}, apitypes.ExecOptions{Input: psScript})
},
parseOutput: parseStatLines,
},
{ // PowerShell
execPS: func(conn client.Client, name string) (<-chan apitypes.ExecPacket, error) {
cmd := "Get-Process | Where TotalProcessorTime | Where Path | " +
"Select-Object -Property Id,TotalProcessorTime,Path | ConvertTo-Csv"
return conn.Exec(name, cmd, []string{}, apitypes.ExecOptions{})
},
parseOutput: parseCsvLines,
},
}

// PS POSIX

// Get the list of processes separately from iterating over them. This avoids including 'find' as
// one of the active processes. Also exclude the pid of the shell we use to run this script. Uses
// printf to put everything on one line; \0-terminated cmdline, then stat, then \0 and statm.
// Proc parsing: http://man7.org/linux/man-pages/man5/proc.5.html
Expand All @@ -70,14 +196,7 @@ done
// Assume _SC_CLK_TCK is 100Hz for now. Can maybe get with 'getconf CLK_TCK'.
const clockTick = 100

type psresult struct {
node string
pid int
active time.Duration
command string
}

func parseEntry(line string) (psresult, error) {
func parseStatEntry(line string) (psresult, error) {
tokens := strings.Split(line, "\t")
if len(tokens) != 3 {
return psresult{}, fmt.Errorf("Line had %v, not 3 tokens: %#v", len(tokens), tokens)
Expand Down Expand Up @@ -112,98 +231,54 @@ func parseEntry(line string) (psresult, error) {
return psresult{pid: pid, active: activeTime, command: command}, nil
}

func parseLines(node string, chunk string) []psresult {
func parseStatLines(chunk string) ([]psresult, error) {
scanner := bufio.NewScanner(strings.NewReader(chunk))
var results []psresult
for scanner.Scan() {
if result, err := parseEntry(scanner.Text()); err != nil {
cmdutil.ErrPrintf("%v\n", err)
} else {
result.node = node
results = append(results, result)
line := scanner.Text()
result, err := parseStatEntry(line)
if err != nil {
return nil, fmt.Errorf("could not parse line %v: %v", line, err)
}

results = append(results, result)
}
if err := scanner.Err(); err != nil {
cmdutil.ErrPrintf("reading standard input: %v", err)
return nil, err
}
return results
return results, nil
}

func formatStats(stats []psresult) string {
headers := []cmdutil.ColumnHeader{
{ShortName: "node", FullName: "NODE"},
{ShortName: "pid", FullName: "PID"},
{ShortName: "time", FullName: "TIME"},
{ShortName: "cmd", FullName: "COMMAND"},
}
table := make([][]string, len(stats))
for i, st := range stats {
// Shorten path segments to probably-unique short strings, like `ku*s/do*p/de*t/pods/redis`.
segments := strings.Split(strings.Trim(st.node, "/"), "/")
for i, segment := range segments[:len(segments)-1] {
if len(segment) > 4 {
segments[i] = segment[:2] + "*" + segment[len(segment)-1:]
}
}
// PS PowerShell

table[i] = []string{
strings.Join(segments, "/"),
strconv.Itoa(st.pid),
cmdutil.FormatDuration(st.active),
st.command,
}
func parseCsvLines(chunk string) ([]psresult, error) {
scanner := csv.NewReader(strings.NewReader(chunk))
scanner.Comment = '#'
scanner.FieldsPerRecord = 3
records, err := scanner.ReadAll()
if err != nil {
return nil, err
}
return cmdutil.NewTableWithHeaders(headers, table).Format()
}
// Skip header
records = records[1:]

func psMain(cmd *cobra.Command, args []string) exitCode {
var paths []string
if len(args) > 0 {
paths = args
} else {
cwd, err := os.Getwd()
results := make([]psresult, len(records))
for i, record := range records {
pid, err := strconv.Atoi(record[0])
if err != nil {
cmdutil.ErrPrintf("%v\n", err)
return exitCode{1}
return nil, fmt.Errorf("could not parse pid in %v: %v", record, err)
}

paths = []string{cwd}
}

conn := cmdutil.NewClient()

results := make(map[string][]psresult)
// Prepulate the map so it doesn't change size while all the goroutines are adding data.
for _, path := range paths {
results[path] = []psresult{}
}

var wg sync.WaitGroup
wg.Add(len(paths))
for i, path := range paths {
go func(k string, idx int) {
defer wg.Done()
ch, err := conn.Exec(k, "sh", []string{}, apitypes.ExecOptions{Input: psScript})
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
return
}
out, err := collectOutput(ch)
const layoutSecs = "15:04:05"
rawTime, err := time.Parse("15:04:05.0000000", record[1])
if err != nil {
// Exact seconds seem to show up occasionally, so try that too.
rawTime, err = time.Parse(layoutSecs, record[1])
if err != nil {
cmdutil.ErrPrintf("errored on %v: %v\n", k, err)
} else {
results[k] = parseLines(k, out)
return nil, fmt.Errorf("could not parse active time as %v in %v: %v", layoutSecs, record, err)
}
}(path, i)
}

wg.Wait()

var stats []psresult
for _, path := range paths {
stats = append(stats, results[path]...)
}
active := rawTime.Sub(time.Date(0, time.January, 1, 0, 0, 0, 0, time.UTC))
results[i] = psresult{pid: pid, active: active, command: record[2]}
}

cmdutil.Print(formatStats(stats))
return exitCode{0}
return results, nil
}
11 changes: 10 additions & 1 deletion plugin/aws/ec2Instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -144,9 +145,15 @@ func getAttributesAndMetadata(inst *ec2Client.Instance) (plugin.EntryAttributes,
}
}

shell := plugin.POSIXShell
if strings.EqualFold(awsSDK.StringValue(inst.Platform), "windows") {
shell = plugin.PowerShell
}

attr.
SetCrtime(crtime).
SetMtime(mtime)
SetMtime(mtime).
SetOS(plugin.OS{LoginShell: shell})

meta := plugin.ToJSONObject(ec2InstanceMetadata{
Instance: inst,
Expand Down Expand Up @@ -266,6 +273,8 @@ func (inst *ec2Instance) Delete(ctx context.Context) (bool, error) {
}

func (inst *ec2Instance) Exec(ctx context.Context, cmd string, args []string, opts plugin.ExecOptions) (plugin.ExecCommand, error) {
// TBD: how to get WinRM connection info. Only work with Kerberos? Require a mini-inventory from wash.yaml?

meta, err := inst.Metadata(ctx)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions plugin/docker/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ func (v *volume) runInTemporaryContainer(ctx context.Context, cmd []string) ([]b
func (v *volume) VolumeList(ctx context.Context, path string) (volpkg.DirMap, error) {
// Use a larger maxdepth because volumes have relatively few files and VolumeList is slow.
maxdepth := 10
output, err := v.runInTemporaryContainer(ctx, volpkg.StatCmd(mountpoint+path, maxdepth))
output, err := v.runInTemporaryContainer(ctx, volpkg.StatCmdPOSIX(mountpoint+path, maxdepth))
if err != nil {
return nil, err
}
return volpkg.StatParseAll(bytes.NewReader(output), mountpoint, path, maxdepth)
return volpkg.ParseStatPOSIX(bytes.NewReader(output), mountpoint, path, maxdepth)
}

func (v *volume) VolumeRead(ctx context.Context, path string) ([]byte, error) {
Expand Down
Loading

0 comments on commit 8e36868

Please sign in to comment.