Skip to content

Commit

Permalink
Don't send output chunk if channel is closed
Browse files Browse the repository at this point in the history
Fixes puppetlabs-toy-chest#562

Signed-off-by: Enis Inan <[email protected]>
  • Loading branch information
ekinanp committed Nov 5, 2019
1 parent b6c39a4 commit 7e46b43
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
11 changes: 9 additions & 2 deletions plugin/outputStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type OutputStream struct {
id ExecPacketType
ch chan ExecOutputChunk
closer *multiCloser
mux sync.Mutex
mux sync.RWMutex
closed bool
}

Expand All @@ -27,6 +27,13 @@ func (s *OutputStream) sendError(timestamp time.Time, err error) {

// WriteWithTimestamp writes the given data with the specified timestamp
func (s *OutputStream) WriteWithTimestamp(timestamp time.Time, data []byte) error {
s.mux.RLock()
defer s.mux.RUnlock()

if s.closed {
return nil
}

select {
case <-s.ctx.Done():
s.sendError(timestamp, s.ctx.Err())
Expand Down Expand Up @@ -79,4 +86,4 @@ func (c *multiCloser) Close() {
close(c.ch)
}
c.mux.Unlock()
}
}
23 changes: 19 additions & 4 deletions plugin/outputStream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ func newOutputStream(ctx context.Context) *OutputStream {
ch := make(chan ExecOutputChunk, 1)
return &OutputStream{
ctx: ctx,
id: Stdout,
ch: ch,
id: Stdout,
ch: ch,
closer: &multiCloser{
ch: ch,
ch: ch,
countdown: 1,
},
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func (suite *OutputStreamTestSuite) TestWrite() {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
stream := newOutputStream(ctx)

// Test a successful write
data := []byte("data")
nw, writeErr := stream.Write(data)
Expand Down Expand Up @@ -187,6 +187,21 @@ func (suite *OutputStreamTestSuite) TestCloseWithError_ConsecutiveCloses() {
suite.assertClosed(stream)
}

func (suite *OutputStreamTestSuite) TestCloseWithError_AllSubsequentSendsNoop() {
ctx, cancelFunc := context.WithCancel(context.Background())
stream := newOutputStream(ctx)
stream.CloseWithError(nil)
suite.assertClosed(stream)

// Note that the test will panic if stream.Write sends something on
// the closed channel.
_, err := stream.Write([]byte("foo"))
suite.NoError(err)
cancelFunc()
_, err = stream.Write([]byte("bar"))
suite.NoError(err)
}

func TestOutputStream(t *testing.T) {
suite.Run(t, new(OutputStreamTestSuite))
}

0 comments on commit 7e46b43

Please sign in to comment.