Skip to content

Commit

Permalink
Add Stream support to the volume package
Browse files Browse the repository at this point in the history
Adds a VolumeStream method to `volume.Interface` that's used to make
files in a volume Streamable.

Resolves puppetlabs-toy-chest#227.

Signed-off-by: Michael Smith <[email protected]>
  • Loading branch information
MikaelSmith committed Apr 25, 2019
1 parent f35b808 commit bf53b94
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 22 deletions.
16 changes: 16 additions & 0 deletions plugin/cleanupReader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package plugin

import "io"

// CleanupReader is a wrapper for an io.ReadCloser that performs cleanup when closed.
type CleanupReader struct {
io.ReadCloser
Cleanup func()
}

// Close closes the reader it wraps, then calls the Cleanup function and returns any errors.
func (c CleanupReader) Close() error {
err := c.ReadCloser.Close()
c.Cleanup()
return err
}
35 changes: 35 additions & 0 deletions plugin/cleanupReader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package plugin

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCleanupReader(t *testing.T) {
called := false
cleanup := func() {
called = true
}

r, w := io.Pipe()
defer w.Close()
rdr := CleanupReader{ReadCloser: r, Cleanup: cleanup}
go func() {
_, err := w.Write([]byte("hello"))
assert.NoError(t, err)
}()

buf := make([]byte, 5)
n, err := rdr.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 5, n)
assert.Equal(t, "hello", string(buf))
assert.False(t, called)

rdr.Close()
assert.True(t, called)
_, err = w.Write([]byte("goodbye"))
assert.Error(t, err)
}
55 changes: 55 additions & 0 deletions plugin/docker/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"time"

Expand Down Expand Up @@ -173,3 +174,57 @@ func (v *volume) VolumeOpen(ctx context.Context, path string) (plugin.SizedReade
}
return bytes.NewReader(bits), nil
}

func (v *volume) VolumeStream(ctx context.Context, path string) (io.ReadCloser, error) {
// Create a container that mounts a volume and tails a file. Run it and capture the output.
cid, err := v.createContainer(ctx, []string{"tail", "-f", mountpoint + path})
if err != nil {
return nil, err
}

// Manually use this in case of errors. On success, the returned Closer will need to call instead.
delete := func() {
err := v.client.ContainerRemove(ctx, cid, types.ContainerRemoveOptions{})
activity.Record(ctx, "Deleted container %v: %v", cid, err)
}

activity.Record(ctx, "Starting container %v", cid)
if err := v.client.ContainerStart(ctx, cid, types.ContainerStartOptions{}); err != nil {
delete()
return nil, err
}

// Manually use this in case of errors. On success, the returned Closer will need to call instead.
killAndDelete := func() {
err := v.client.ContainerKill(ctx, cid, "SIGKILL")
activity.Record(ctx, "Stopped temporary container %v: %v", cid, err)
delete()
}

activity.Record(ctx, "Waiting for container %v", cid)
waitC, errC := v.client.ContainerWait(ctx, cid, docontainer.WaitConditionNotRunning)
var statusCode int64
select {
case err := <-errC:
killAndDelete()
return nil, err
case result := <-waitC:
statusCode = result.StatusCode
activity.Record(ctx, "Container %v finished[%v]: %v", cid, result.StatusCode, result.Error)
}

opts := types.ContainerLogsOptions{ShowStdout: true}
if statusCode != 0 {
opts.ShowStderr = true
}

activity.Record(ctx, "Gathering log for %v", cid)
output, err := v.client.ContainerLogs(ctx, cid, opts)
if err != nil {
killAndDelete()
return nil, err
}

// Wrap the log output in a ReadCloser that stops and kills the container on Close.
return plugin.CleanupReader{ReadCloser: output, Cleanup: killAndDelete}, nil
}
47 changes: 46 additions & 1 deletion plugin/kubernetes/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"time"

Expand Down Expand Up @@ -171,7 +172,7 @@ func (v *pvc) VolumeList(ctx context.Context) (volume.DirMap, error) {
}

func (v *pvc) VolumeOpen(ctx context.Context, path string) (plugin.SizedReader, error) {
// Create a container that mounts a pvc and waits. Use it to download a file.
// Create a container that mounts a pvc and output the file.
pid, err := v.createPod([]string{"cat", mountpoint + path})
activity.Record(ctx, "Reading from: %v", mountpoint+path)
if err != nil {
Expand Down Expand Up @@ -208,3 +209,47 @@ func (v *pvc) VolumeOpen(ctx context.Context, path string) (plugin.SizedReader,
}
return bytes.NewReader(bits), nil
}

func (v *pvc) VolumeStream(ctx context.Context, path string) (io.ReadCloser, error) {
// Create a container that mounts a pvc and tail the file.
pid, err := v.createPod([]string{"tail", "-f", mountpoint + path})
activity.Record(ctx, "Streaming from: %v", mountpoint+path)
if err != nil {
return nil, err
}

// Manually use this in case of errors. On success, the returned Closer will need to call instead.
delete := func() {
activity.Record(ctx, "Deleted temporary pod %v: %v", pid, v.podi.Delete(pid, &metav1.DeleteOptions{}))
}

activity.Record(ctx, "Waiting for pod %v", pid)
// Start watching for new events related to the pod we created.
if err = v.waitForPod(ctx, pid); err != nil && err != errPodTerminated {
delete()
return nil, err
}
podErr := err

activity.Record(ctx, "Gathering log for %v", pid)
output, err := v.podi.GetLogs(pid, &corev1.PodLogOptions{}).Stream()
if err != nil {
delete()
return nil, err
}

if podErr == errPodTerminated {
bits, err := ioutil.ReadAll(output)
activity.Record(ctx, "Closed log for %v: %v", pid, output.Close())
delete()
if err != nil {
return nil, err
}
activity.Record(ctx, "Read: %v", bits)

return nil, errors.New(string(bits))
}

// Wrap the log output in a ReadCloser that stops and kills the container on Close.
return plugin.CleanupReader{ReadCloser: output, Cleanup: delete}, nil
}
3 changes: 3 additions & 0 deletions plugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type ExecOutputChunk struct {
Err error
}

// StreamTypes provides the name of the corresponding StreamID.
var StreamTypes = []string{"Stdout", "Stderr"}

// ExecResult is a struct that contains the result of invoking Execable#exec.
// Any of these fields can be nil. The OutputCh will be closed when execution completes.
type ExecResult struct {
Expand Down
6 changes: 4 additions & 2 deletions volume/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package volume

import (
"context"
"io"
"sort"
"time"

Expand All @@ -25,7 +26,8 @@ type Interface interface {
VolumeList(context.Context) (DirMap, error)
// Accepts a path and returns the content associated with that path.
VolumeOpen(context.Context, string) (plugin.SizedReader, error)
// TODO: add VolumeStream
// Accepts a path and streams updates to the content associated with that path.
VolumeStream(context.Context, string) (io.ReadCloser, error)
}

// A Dir is a map of files in a directory to their attributes.
Expand All @@ -51,7 +53,7 @@ func List(ctx context.Context, impl Interface, path string) ([]plugin.Entry, err
if attr.Mode().IsDir() {
entries = append(entries, newDir(name, attr, impl, path+"/"+name))
} else {
entries = append(entries, newFile(name, attr, impl.VolumeOpen, path+"/"+name))
entries = append(entries, newFile(name, attr, impl, path+"/"+name))
}
}
// Sort entries so they have a deterministic order.
Expand Down
13 changes: 9 additions & 4 deletions volume/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package volume

import (
"context"
"io"
"os"
"strings"
"testing"
Expand All @@ -11,16 +12,20 @@ import (
"github.com/stretchr/testify/assert"
)

type mockEntry struct {
type mockDirEntry struct {
plugin.EntryBase
dmap DirMap
}

func (m *mockEntry) VolumeList(context.Context) (DirMap, error) {
func (m *mockDirEntry) VolumeList(context.Context) (DirMap, error) {
return m.dmap, nil
}

func (m *mockEntry) VolumeOpen(context.Context, string) (plugin.SizedReader, error) {
func (m *mockDirEntry) VolumeOpen(context.Context, string) (plugin.SizedReader, error) {
return nil, nil
}

func (m *mockDirEntry) VolumeStream(context.Context, string) (io.ReadCloser, error) {
return nil, nil
}

Expand All @@ -29,7 +34,7 @@ func TestVolumeDir(t *testing.T) {
assert.Nil(t, err)

plugin.SetTestCache(datastore.NewMemCache())
entry := mockEntry{EntryBase: plugin.NewEntry("mine"), dmap: dmap}
entry := mockDirEntry{EntryBase: plugin.NewEntry("mine"), dmap: dmap}
entry.SetTestID("/mine")

assert.NotNil(t, dmap[""]["path"])
Expand Down
18 changes: 10 additions & 8 deletions volume/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@ package volume

import (
"context"
"io"
"time"

"github.com/puppetlabs/wash/plugin"
)

// contentCB accepts a path and returns the content associated with that path.
type contentCB = func(context.Context, string) (plugin.SizedReader, error)

// file represents a file in a volume that has content we can access.
type file struct {
plugin.EntryBase
contentcb contentCB
path string
impl Interface
path string
}

// newFile creates a VolumeFile.
func newFile(name string, attr plugin.EntryAttributes, cb contentCB, path string) *file {
func newFile(name string, attr plugin.EntryAttributes, impl Interface, path string) *file {
vf := &file{
EntryBase: plugin.NewEntry(name),
contentcb: cb,
impl: impl,
path: path,
}
vf.SetAttributes(attr)
Expand All @@ -32,5 +30,9 @@ func newFile(name string, attr plugin.EntryAttributes, cb contentCB, path string

// Open returns the content of the file as a SizedReader.
func (v *file) Open(ctx context.Context) (plugin.SizedReader, error) {
return v.contentcb(ctx, v.path)
return v.impl.VolumeOpen(ctx, v.path)
}

func (v *file) Stream(ctx context.Context) (io.ReadCloser, error) {
return v.impl.VolumeStream(ctx, v.path)
}
52 changes: 45 additions & 7 deletions volume/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package volume
import (
"context"
"errors"
"io"
"io/ioutil"
"strings"
"testing"
"time"
Expand All @@ -11,19 +13,43 @@ import (
"github.com/stretchr/testify/assert"
)

type mockFileEntry struct {
plugin.EntryBase
content string
err error
}

func (m *mockFileEntry) VolumeList(context.Context) (DirMap, error) {
return nil, nil
}

func (m *mockFileEntry) VolumeOpen(context.Context, string) (plugin.SizedReader, error) {
if m.err != nil {
return nil, m.err
}
return strings.NewReader(m.content), nil
}

func (m *mockFileEntry) VolumeStream(context.Context, string) (io.ReadCloser, error) {
if m.err != nil {
return nil, m.err
}
return ioutil.NopCloser(strings.NewReader(m.content)), nil
}

func TestVolumeFile(t *testing.T) {
now := time.Now()
initialAttr := plugin.EntryAttributes{}
initialAttr.SetCtime(now)
vf := newFile("mine", initialAttr, func(ctx context.Context, path string) (plugin.SizedReader, error) {
assert.Equal(t, "my path", path)
return strings.NewReader("hello"), nil
}, "my path")

impl := &mockFileEntry{EntryBase: plugin.NewEntry("parent"), content: "hello"}
vf := newFile("mine", initialAttr, impl, "my path")

attr := plugin.Attributes(vf)
expectedAttr := plugin.EntryAttributes{}
expectedAttr.SetCtime(now)
assert.Equal(t, expectedAttr, attr)

rdr, err := vf.Open(context.Background())
assert.Nil(t, err)
if assert.NotNil(t, rdr) {
Expand All @@ -33,14 +59,26 @@ func TestVolumeFile(t *testing.T) {
assert.Equal(t, int64(n), rdr.Size())
assert.Equal(t, "hello", string(buf))
}

rdr2, err := vf.Stream(context.Background())
assert.Nil(t, err)
if assert.NotNil(t, rdr2) {
buf, err := ioutil.ReadAll(rdr2)
if assert.NoError(t, err) {
assert.Equal(t, "hello", string(buf))
}
}
}

func TestVolumeFileErr(t *testing.T) {
vf := newFile("mine", plugin.EntryAttributes{}, func(ctx context.Context, path string) (plugin.SizedReader, error) {
return nil, errors.New("fail")
}, "my path")
impl := &mockFileEntry{EntryBase: plugin.NewEntry("parent"), err: errors.New("fail")}
vf := newFile("mine", plugin.EntryAttributes{}, impl, "my path")

rdr, err := vf.Open(context.Background())
assert.Nil(t, rdr)
assert.Equal(t, errors.New("fail"), err)

rdr2, err := vf.Stream(context.Background())
assert.Nil(t, rdr2)
assert.Equal(t, errors.New("fail"), err)
}
Loading

0 comments on commit bf53b94

Please sign in to comment.