forked from puppetlabs-toy-chest/wash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoutputStream_test.go
207 lines (184 loc) · 5.71 KB
/
outputStream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package plugin
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/suite"
)
// NOTE: Some of the assertion helpers in this file take in a suite.Suite
// object as a parameter because they are also used by the ExecCommandImpl
// tests.
type OutputStreamTestSuite struct {
suite.Suite
}
func newOutputStream(ctx context.Context) *OutputStream {
ch := make(chan ExecOutputChunk, 1)
return &OutputStream{
ctx: ctx,
id: Stdout,
ch: ch,
closer: &multiCloser{
ch: ch,
countdown: 1,
},
}
}
func EqualChunk(suite suite.Suite, expected ExecOutputChunk, actual ExecOutputChunk) bool {
expectedStreamName := expected.StreamID
eqlStreamName := suite.Equal(
expectedStreamName,
actual.StreamID,
fmt.Sprintf("The sent ExecOutputChunk should have come from the %v stream", expectedStreamName),
)
suite.NotZero(actual.Timestamp, "The sent ExecOutputChunk should contain a timestamp")
var eqlPacket bool
if expected.Data != "" {
eqlPacket = suite.Equal(
expected.Data,
actual.Data,
"The sent ExecOutputChunk should contain the expected data",
)
} else {
eqlPacket = suite.Equal(
expected.Err,
actual.Err,
"The sent ExecOutputChunk shoudld contain the expected error",
)
}
return eqlStreamName && eqlPacket
}
// We choose not to add tests for WriteWithTimestamp because doing so
// would complicate the test suite for little gain. Instead, since Write
// calls WriteWithTimestamp, it is reasonable to assume that if the tests
// for Write pass then so, too, do the tests for WriteWithTimestamp.
func (suite *OutputStreamTestSuite) TestWrite() {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
stream := newOutputStream(ctx)
// Test a successful write
data := []byte("data")
nw, writeErr := stream.Write(data)
if suite.NoError(writeErr) {
suite.Equal(len(data), nw, "Write should return the number of written bytes")
}
select {
case chunk := <-stream.ch:
EqualChunk(
suite.Suite,
ExecOutputChunk{StreamID: Stdout, Data: string(data)},
chunk,
)
default:
suite.Fail("Write did not write any data")
}
// Test that the write errors when the context is cancelled.
cancelFunc()
_, writeErr = stream.Write(data)
suite.EqualError(writeErr, ctx.Err().Error(), "Write should have returned the context's error")
select {
case chunk := <-stream.ch:
EqualChunk(
suite.Suite,
ExecOutputChunk{StreamID: Stdout, Err: ctx.Err()},
chunk,
)
suite.Equal(stream.sentCtxErr, true, "The stream should mark that the context's error was sent")
default:
suite.Fail("Write did not send the context's error")
}
}
func assertClosedChannel(suite suite.Suite, ch <-chan ExecOutputChunk) {
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
suite.Fail("Timed out while waiting for the output channel to be closed")
case chunk, ok := <-ch:
if ok {
suite.Fail(
fmt.Sprintf("Expected channel to be closed; received %v instead.", chunk),
)
}
}
}
func (suite *OutputStreamTestSuite) assertClosed(stream *OutputStream) {
assertClosedChannel(suite.Suite, stream.ch)
suite.True(stream.closed)
}
func assertSentError(suite suite.Suite, stream *OutputStream, err error) {
timer := time.NewTimer(1 * time.Second)
sentErrorMsg := fmt.Sprintf("Expected the error '%v' to be sent", err)
select {
case <-timer.C:
suite.Fail(sentErrorMsg + ", but timed out while waiting for it")
case chunk, ok := <-stream.ch:
if !ok {
suite.Fail(sentErrorMsg + ", but the channel was closed")
} else {
EqualChunk(
suite,
ExecOutputChunk{StreamID: stream.id, Err: err},
chunk,
)
}
}
}
func (suite *OutputStreamTestSuite) TestCloseWithError_NoError() {
stream := newOutputStream(context.Background())
stream.CloseWithError(nil)
// Note that if an error was sent, then assertClosed would fail
// because it'd read-in the sent error.
suite.assertClosed(stream)
}
func (suite *OutputStreamTestSuite) TestCloseWithError_WithError() {
stream := newOutputStream(context.Background())
err := fmt.Errorf("an arbitrary error")
stream.CloseWithError(err)
assertSentError(suite.Suite, stream, err)
suite.assertClosed(stream)
}
func (suite *OutputStreamTestSuite) TestCloseWithError_ContextError_NotSent() {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
stream := newOutputStream(ctx)
stream.CloseWithError(ctx.Err())
assertSentError(suite.Suite, stream, ctx.Err())
suite.assertClosed(stream)
}
func (suite *OutputStreamTestSuite) TestCloseWithError_ContextError_Sent() {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
stream := newOutputStream(ctx)
stream.sentCtxErr = true
stream.CloseWithError(ctx.Err())
// Note that if CloseWithError sent the passed-in ctx.Err(),
// then assertClosed would fail because it'd read-in the sent
// ctx.Err()
suite.assertClosed(stream)
}
func (suite *OutputStreamTestSuite) TestCloseWithError_ConsecutiveCloses() {
// Main thing to test here is to ensure that multiple close calls
// won't panic by trying to close stream.ch when it is already
// closed
stream := newOutputStream(context.Background())
stream.CloseWithError(nil)
suite.assertClosed(stream)
stream.CloseWithError(nil)
suite.assertClosed(stream)
}
func (suite *OutputStreamTestSuite) TestCloseWithError_AllSubsequentSendsNoop() {
ctx, cancelFunc := context.WithCancel(context.Background())
stream := newOutputStream(ctx)
stream.CloseWithError(nil)
suite.assertClosed(stream)
// Note that the test will panic if stream.Write sends something on
// the closed channel.
_, err := stream.Write([]byte("foo"))
suite.NoError(err)
cancelFunc()
_, err = stream.Write([]byte("bar"))
suite.NoError(err)
}
func TestOutputStream(t *testing.T) {
suite.Run(t, new(OutputStreamTestSuite))
}