forked from puppetlabs-toy-chest/wash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecCommandImpl.go
150 lines (131 loc) · 4.85 KB
/
execCommandImpl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package plugin
import (
"context"
"fmt"
)
/*
ExecCommandImpl implements the plugin.ExecCommand interface.
Use plugin.NewExecCommand to create instances of these objects.
ExecCommandImpl provides Stdout/Stderr streams that you can wire-up
to your plugin's API. These streams ensure that your API's stdout/stderr
ordering is preserved. For example, if your API sends over Stderr, Stdout,
and Stderr packets (in that order), then ExecCommand's OutputCh will
stream them as Stderr, Stdout, and Stderr packets to your clients.
Once the command's finished its execution, use SetExitCode to set the
command's exit code. If your API fails to retrieve the command's exit
code, then use SetExitCodeErr to set an appropriate error that ExitCode()
can return.
ExecCommandImpl also includes helpers that handle context-cancellation
related cleanup for you. plugin.NewExecCommand ensures that the OutputCh
is closed upon context-cancellation so that clients streaming your command's
output are not blocked. SetStopFunc ensures that the exec'ing command is
stopped upon context-cancellation, which prevents orphaned processes.
See Container#Exec in the Docker plugin and ExecSSH in the transport
package for examples of how ExecCommandImpl is used.
*/
type ExecCommandImpl struct {
ctx context.Context
outputCh chan ExecOutputChunk
stdout *OutputStream
stderr *OutputStream
exitCodeCh chan int
exitCodeErrCh chan error
}
// NewExecCommand creates a new ExecCommandImpl object whose
// lifetime is tied to the passed-in execution context. This
// ctx should match what's passed into Execable#Exec. Note that
// NewExecCommand ensures that the OutputCh is closed when ctx
// is cancelled.
func NewExecCommand(ctx context.Context) *ExecCommandImpl {
if ctx == nil {
panic("plugin.NewExecCommand called with a nil context")
}
cmd := &ExecCommandImpl{
ctx: ctx,
exitCodeCh: make(chan int, 1),
exitCodeErrCh: make(chan error, 1),
}
// Create the output streams
cmd.outputCh = make(chan ExecOutputChunk)
closer := &multiCloser{ch: cmd.outputCh, countdown: 2}
cmd.stdout = &OutputStream{ctx: cmd.ctx, id: Stdout, ch: cmd.outputCh, closer: closer}
cmd.stderr = &OutputStream{ctx: cmd.ctx, id: Stderr, ch: cmd.outputCh, closer: closer}
// Ensure that the output streams are closed when the context
// is cancelled. This guarantees that callers won't be blocked
// when they are streaming our command's output.
go func() {
<-cmd.ctx.Done()
cmd.CloseStreamsWithError(ctx.Err())
}()
return cmd
}
// SetStopFunc sets the function that stops the running command. stopFunc
// is called when cmd.ctx is cancelled.
//
// NOTE: SetStopFunc can be used to avoid orphaned processes.
func (cmd *ExecCommandImpl) SetStopFunc(stopFunc func()) {
// Thankfully, goroutines are cheap. Otherwise, mixing this in with
// the goroutine in NewExecCommand heavily complicates things.
// For example, we'd have to worry about the possibility that the
// NewExecCommand goroutine is invoked before the Execable#Exec
// implementation can set a stopFunc, which can happen if the context
// is prematurely cancelled. That can result in an orphaned process
// in some plugin APIs, which is bad.
if stopFunc != nil {
go func() {
<-cmd.ctx.Done()
stopFunc()
}()
}
}
// Stdout returns the command's stdout stream. Attach this to your
// plugin API's stdout stream.
func (cmd *ExecCommandImpl) Stdout() *OutputStream {
return cmd.stdout
}
// Stderr returns the command's stderr stream. Attach this to your
// plugin API's stderr stream.
func (cmd *ExecCommandImpl) Stderr() *OutputStream {
return cmd.stderr
}
// CloseStreamsWithError closes the command's stdout/stderr streams
// with the given error.
func (cmd *ExecCommandImpl) CloseStreamsWithError(err error) {
cmd.Stdout().CloseWithError(err)
cmd.Stderr().CloseWithError(err)
}
// SetExitCode sets the command's exit code that will be returned by
// cmd.ExitCode().
func (cmd *ExecCommandImpl) SetExitCode(exitCode int) {
select {
case <-cmd.ctx.Done():
// Don't send anything if the context is cancelled.
default:
cmd.exitCodeCh <- exitCode
}
}
// SetExitCodeErr sets the error that will be returned by cmd.ExitCode().
// Use it if your plugin API fails to get the command's exit code.
func (cmd *ExecCommandImpl) SetExitCodeErr(err error) {
select {
case <-cmd.ctx.Done():
// Don't send anything if the context is cancelled.
default:
cmd.exitCodeErrCh <- err
}
}
// OutputCh implements ExecCommand#OutputCh
func (cmd *ExecCommandImpl) OutputCh() <-chan ExecOutputChunk {
return cmd.outputCh
}
// ExitCode implements ExecCommand#ExitCode
func (cmd *ExecCommandImpl) ExitCode() (int, error) {
select {
case <-cmd.ctx.Done():
return 0, fmt.Errorf("failed to fetch the command's exit code: %v", cmd.ctx.Err())
case exitCode := <-cmd.exitCodeCh:
return exitCode, nil
case err := <-cmd.exitCodeErrCh:
return 0, err
}
}