Skip to content

Commit

Permalink
Support volume and transport features for external plugins
Browse files Browse the repository at this point in the history
Add features that external plugins can request Wash provide them from
its core capabilities. Implement `volume.FS` and `transport`.

Signed-off-by: Michael Smith <[email protected]>
  • Loading branch information
MikaelSmith committed Jan 24, 2020
1 parent 9caefba commit b6396ae
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 14 deletions.
4 changes: 2 additions & 2 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *domainSocketClient) doRequestAndParseJSONBody(method, endpoint string,
}

if err := json.Unmarshal(respBodyBytes, result); err != nil {
return fmt.Errorf("Non-JSON body at %v: %v", endpoint, string(respBodyBytes))
return fmt.Errorf("Non-JSON body at %v: %v\n%v", endpoint, err, string(respBodyBytes))
}

return nil
Expand Down Expand Up @@ -270,7 +270,7 @@ func (c *domainSocketClient) Clear(path string) ([]string, error) {

var result []string
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("Non-JSON body at %v: %v", "/cache", string(body))
return nil, fmt.Errorf("Non-JSON body at %v: %v\n%v", "/cache", err, string(body))
}

return result, nil
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/hpcloud/tail v1.0.0
github.com/imdario/mergo v0.3.7 // indirect
github.com/jedib0t/go-pretty v4.2.1+incompatible
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a
github.com/json-iterator/go v1.1.7 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kevinburke/ssh_config v0.0.0-20190724205821-6cfae18c12b8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jedib0t/go-pretty v4.2.1+incompatible h1:YgpgCwastS5GrAK9z7OEv6rsgolGjU6h1WT3MmM5Hp8=
github.com/jedib0t/go-pretty v4.2.1+incompatible/go.mod h1:XemHduiw8R651AF9Pt4FwCTKeG3oo7hrHJAoznj9nag=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a h1:zPPuIq2jAWWPTrGt70eK/BSch+gFAGrNzecsoENgu2o=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a/go.mod h1:yL958EeXv8Ylng6IfnvG4oflryUi3vgA3xPs9hmII1s=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
Expand Down
36 changes: 36 additions & 0 deletions plugin/external/coreEntries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package external

import (
"encoding/json"
"fmt"

"github.com/puppetlabs/wash/plugin"
"github.com/puppetlabs/wash/volume"
)

type coreEntry interface {
createInstance(parent *pluginEntry, decodedEntry decodedExternalPluginEntry) (plugin.Entry, error)
schema() *plugin.EntrySchema
}

var coreEntries = map[string]coreEntry{
"__volume::fs__": volumeFS{},
}

type volumeFS struct{}

func (volumeFS) createInstance(parent *pluginEntry, e decodedExternalPluginEntry) (plugin.Entry, error) {
var opts struct{ Maxdepth uint }
// Use a default of 3 if unspecified.
opts.Maxdepth = 3

if err := json.Unmarshal([]byte(e.State), &opts); err != nil {
return nil, fmt.Errorf("volume filesystem options invalid: %v", err)
}

return volume.NewFS(e.Name, parent, int(opts.Maxdepth)), nil
}

func (volumeFS) schema() *plugin.EntrySchema {
return (&volume.FS{}).Schema()
}
62 changes: 57 additions & 5 deletions plugin/external/pluginEntry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"time"

"github.com/getlantern/deepcopy"
"github.com/jinzhu/copier"

"github.com/emirpasic/gods/maps/linkedhashmap"
"github.com/puppetlabs/wash/activity"
"github.com/puppetlabs/wash/plugin"
"github.com/puppetlabs/wash/transport"
)

type decodedCacheTTLs struct {
Expand Down Expand Up @@ -63,6 +65,11 @@ func (m *methodTuple) UnmarshalJSON(data []byte) error {
return nil
}

type execImpl struct {
Transport string `json:"transport"`
Options transport.Identity `json:"options"`
}

func (e decodedExternalPluginEntry) getMungedMethods() (map[string]methodInfo, error) {
methods := make(map[string]methodInfo)
for _, raw := range e.Methods {
Expand Down Expand Up @@ -115,6 +122,15 @@ func (e decodedExternalPluginEntry) getMungedMethods() (map[string]methodInfo, e
)
}
info.tupleValue = graph
case "exec":
// Check if we have ["exec", <exec_implementation>].
var impl execImpl
if err := json.Unmarshal(tuple.Value, &impl); err != nil {
return nil, fmt.Errorf("result for exec must specify an implementation transport and options")
} else if impl.Transport != "ssh" {
return nil, fmt.Errorf("unsupported transport %v requested, only ssh is supported", impl.Transport)
}
info.tupleValue = impl
}

methods[tuple.Method] = info
Expand Down Expand Up @@ -242,10 +258,6 @@ func (e *pluginEntry) implements(method string) bool {
return ok
}

func (e *pluginEntry) supportedMethods() map[string]methodInfo {
return e.methods
}

func (e *pluginEntry) MethodSignature(name string) plugin.MethodSignature {
if info, ok := e.methods[name]; ok {
return info.signature
Expand Down Expand Up @@ -323,7 +335,7 @@ func (e *pluginEntry) SchemaGraph() (*linkedhashmap.Map, error) {
es, _ := graph.Get(plugin.TypeID(e))
schemaMethods := es.(plugin.EntrySchema).Actions
instanceMethods := []string{}
for method := range e.supportedMethods() {
for method := range e.methods {
instanceMethods = append(instanceMethods, method)
}
sort.Strings(schemaMethods)
Expand Down Expand Up @@ -396,6 +408,15 @@ func (e *pluginEntry) List(ctx context.Context) ([]plugin.Entry, error) {

entries := make([]plugin.Entry, len(decodedEntries))
for i, decodedExternalPluginEntry := range decodedEntries {
if coreEnt, ok := coreEntries[decodedExternalPluginEntry.TypeID]; ok {
entry, err := coreEnt.createInstance(e, decodedExternalPluginEntry)
if err != nil {
return nil, err
}
entries[i] = entry
continue
}

entry, err := decodedExternalPluginEntry.toExternalPluginEntry(ctx, e.schemaKnown, false)
if err != nil {
return nil, err
Expand All @@ -405,6 +426,7 @@ func (e *pluginEntry) List(ctx context.Context) ([]plugin.Entry, error) {
entry.schemaGraphs = e.schemaGraphs
entries[i] = entry
}

return entries, nil
}

Expand Down Expand Up @@ -566,7 +588,20 @@ func (e *pluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) {
}
}

// Used for mocking tests.
var execSSHFn = transport.ExecSSH

func (e *pluginEntry) Exec(ctx context.Context, cmd string, args []string, opts plugin.ExecOptions) (plugin.ExecCommand, error) {
if result := e.methods["exec"].tupleValue; result != nil {
impl := result.(execImpl)
if impl.Transport != "ssh" {
panic("transport must be ssh")
}

args = append([]string{cmd}, args...)
return execSSHFn(ctx, impl.Options, args, opts)
}

// Serialize opts to JSON
type serializedOptions struct {
plugin.ExecOptions
Expand Down Expand Up @@ -667,12 +702,29 @@ func unmarshalSchemaGraph(pluginName, rawTypeID string, stdout []byte) (*linkedh
requiredTypeIDs := map[string]bool{
rawTypeID: true,
}

type decodedEntrySchema struct {
plugin.EntrySchema
Methods []string `json:"methods"`
}

graph := linkedhashmap.New()
putNode := func(rawTypeID string, rawSchema interface{}) error {
if coreEnt, ok := coreEntries[rawTypeID]; ok {
pluginSchema := coreEnt.schema()

// Copy only the public fields so we serialize it as just data. Uses copier because it uses
// reflect to copy public fields, rather than Marshal/UnmarshalJSON which we've overridden.
var schema plugin.EntrySchema
err := copier.Copy(&schema, pluginSchema)
if err != nil {
panic(fmt.Sprintf("should always be able to copy from EntrySchema to EntrySchema: %v", err))
}
populatedTypeIDs[rawTypeID] = true
graph.Put(namespace(pluginName, rawTypeID), schema)
return nil
}

// Deep-copy the value into the decodedEntrySchema object
schema, ok := rawSchema.(map[string]interface{})
if !ok {
Expand Down
157 changes: 153 additions & 4 deletions plugin/external/pluginEntry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

"github.com/emirpasic/gods/maps/linkedhashmap"
"github.com/puppetlabs/wash/plugin"
"github.com/puppetlabs/wash/transport"
"github.com/puppetlabs/wash/volume"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -100,10 +102,9 @@ func (suite *ExternalPluginEntryTestSuite) TestDecodeExternalPluginEntryExtraFie
suite.Nil(entry.methods["stream"].tupleValue)
suite.False(plugin.IsPrefetched(entry))

methods := entry.supportedMethods()
suite.Equal(2, len(methods))
suite.Contains(methods, "list")
suite.Contains(methods, "stream")
suite.Equal(2, len(entry.methods))
suite.Contains(entry.methods, "list")
suite.Contains(entry.methods, "stream")
}
}

Expand Down Expand Up @@ -665,6 +666,121 @@ func (suite *ExternalPluginEntryTestSuite) TestListReadWithMethodResults() {
}
}

func (suite *ExternalPluginEntryTestSuite) TestListWithCoreEntry() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
script: mockScript,
}
entry.SetTestID("/foo")

ctx := context.Background()
stdout := []byte(`
[
{"name": "bar", "methods": ["list"]},
{"type_id": "__volume::fs__", "name": "fs1", "state": "{\"maxdepth\": 2}"}
]`)
mockScript.OnInvokeAndWait(ctx, "list", entry).Return(mockInvocation(stdout), nil).Once()

entries, err := entry.List(ctx)
if suite.NoError(err) {
suite.Equal(2, len(entries))

suite.Equal([]string{"list"}, plugin.SupportedActionsOf(entries[0]))
suite.Equal("bar", plugin.Name(entries[0]))

suite.Equal([]string{"list"}, plugin.SupportedActionsOf(entries[1]))
suite.Equal("fs1", plugin.Name(entries[1]))
suite.IsType(&volume.FS{}, entries[1])
}
}

func (suite *ExternalPluginEntryTestSuite) TestListWithUnknownCoreEntry() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
script: mockScript,
}
entry.SetTestID("/foo")

ctx := context.Background()
stdout := []byte(`[{"type_id": "__wash::other__", "name": "bar", "state": "{}"}]`)
mockScript.OnInvokeAndWait(ctx, "list", entry).Return(mockInvocation(stdout), nil).Once()

_, err := entry.List(ctx)
suite.EqualError(err, "the entry's methods must be provided")
}

func (suite *ExternalPluginEntryTestSuite) TestListWithExec_Transport() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
script: mockScript,
}
entry.SetTestID("/foo")

ctx := context.Background()
stdout := []byte(`[
{"name": "bar", "methods": [
["exec", {"transport": "ssh", "options": {"host": "example.com", "user": "ubuntu"}}]
]}
]`)
mockScript.OnInvokeAndWait(ctx, "list", entry).Return(mockInvocation(stdout), nil).Once()

entries, err := entry.List(ctx)
if suite.NoError(err) {
suite.Equal(1, len(entries))
suite.Equal([]string{"exec"}, plugin.SupportedActionsOf(entries[0]))

if suite.IsType(&pluginEntry{}, entries[0]) {
entry := entries[0].(*pluginEntry)
if suite.Contains(entry.methods, "exec") {
info := entry.methods["exec"]
if suite.IsType(execImpl{}, info.tupleValue) {
exec := info.tupleValue.(execImpl)
suite.Equal("ssh", exec.Transport)
suite.Equal("example.com", exec.Options.Host)
suite.Equal("ubuntu", exec.Options.User)
}
}
}
}
}

func (suite *ExternalPluginEntryTestSuite) TestListWithExec_TransportUnknown() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
script: mockScript,
}
entry.SetTestID("/foo")

ctx := context.Background()
stdout := []byte(`[
{"name": "bar", "methods": [["exec", {"transport": "foo", "options": {}}]]}
]`)
mockScript.OnInvokeAndWait(ctx, "list", entry).Return(mockInvocation(stdout), nil).Once()

_, err := entry.List(ctx)
suite.EqualError(err, "unsupported transport foo requested, only ssh is supported")
}

func (suite *ExternalPluginEntryTestSuite) TestListWithExec_Unknown() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
script: mockScript,
}
entry.SetTestID("/foo")

ctx := context.Background()
stdout := []byte(`[{"name": "bar", "methods": [["exec", true]]}]`)
mockScript.OnInvokeAndWait(ctx, "list", entry).Return(mockInvocation(stdout), nil).Once()

_, err := entry.List(ctx)
suite.EqualError(err, "result for exec must specify an implementation transport and options")
}

type mockedInvocation struct {
Command
mock.Mock
Expand Down Expand Up @@ -872,6 +988,39 @@ func (suite *ExternalPluginEntryTestSuite) TestExec() {
}
}

func (suite *ExternalPluginEntryTestSuite) TestExec_Transport() {
// Mock transport.ExecSSH
savedFn := execSSHFn
var execMock mock.Mock
execSSHFn = func(ctx context.Context, id transport.Identity, cmd []string, opts plugin.ExecOptions) (plugin.ExecCommand, error) {
args := execMock.Called(ctx, id, cmd, opts)
return args.Get(0).(plugin.ExecCommand), args.Error(1)
}
defer func() { execSSHFn = savedFn }()

execVals := execImpl{Transport: "ssh", Options: transport.Identity{Host: "example.com"}}
entry := &pluginEntry{
EntryBase: plugin.NewEntry("foo"),
methods: map[string]methodInfo{"exec": methodInfo{tupleValue: execVals}},
}
entry.SetTestID("/foo")

var opts plugin.ExecOptions
ctx, mockErr := context.Background(), fmt.Errorf("execution error")
args, result := []string{"echo", "hello"}, plugin.NewExecCommand(ctx)

// Test that if ExecSSH errors then Exec returns the error
execMock.On("func1", ctx, execVals.Options, args, opts).Return(result, mockErr).Once()
_, err := entry.Exec(ctx, "echo", []string{"hello"}, opts)
suite.EqualError(err, mockErr.Error())

// Test that if ExecSSH runs then Exec returns the result
execMock.On("func1", ctx, execVals.Options, args, opts).Return(result, nil).Once()
cmd, err := entry.Exec(ctx, "echo", []string{"hello"}, opts)
suite.NoError(err)
suite.Equal(cmd, result)
}

func (suite *ExternalPluginEntryTestSuite) TestSignal() {
mockScript := &mockPluginScript{path: "plugin_script"}
entry := &pluginEntry{
Expand Down
Loading

0 comments on commit b6396ae

Please sign in to comment.