From 399776c2f6bb5dffc968fed6590818aff0d15582 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Wed, 4 Mar 2015 15:10:16 -0800 Subject: [PATCH 01/15] Metric types lastAdvertisedTimestamp(int64) to lastAdvertisedTime(time.Time). Easier to do operations on the time struct. --- control/control_test.go | 4 +-- control/metrics.go | 21 ++++++------- control/metrics_test.go | 30 +++++++++---------- control/plugin/collector_test.go | 2 +- control/plugin/plugin.go | 14 ++++----- control/plugin/plugin_test.go | 4 +-- core/metric.go | 6 +++- .../pulse-collector-dummy/dummy/dummy.go | 2 +- 8 files changed, 44 insertions(+), 39 deletions(-) diff --git a/control/control_test.go b/control/control_test.go index c3dcc56d0..cb92a844d 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -360,7 +360,7 @@ func TestResolvePlugin(t *testing.T) { Convey(".resolvePlugin()", t, func() { c := New() lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), lp) + mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) c.metricCatalog.Add(mt) Convey("it resolves the plugin", func() { p, err := c.resolvePlugin([]string{"foo", "bar"}, -1) @@ -379,7 +379,7 @@ func TestExportedMetricCatalog(t *testing.T) { Convey(".MetricCatalog()", t, func() { c := New() lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), lp) + mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) c.metricCatalog.Add(mt) Convey("it returns a collection of core.MetricTypes", func() { t := c.MetricCatalog() diff --git a/control/metrics.go b/control/metrics.go index fa5dc215d..b97a24f82 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -4,6 +4,7 @@ import ( "errors" "strings" "sync" + "time" "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" @@ -17,23 +18,23 @@ var ( type metricType struct { Plugin *loadedPlugin - namespace []string - lastAdvertisedTimestamp int64 - subscriptions int - policy processesConfigData + namespace []string + lastAdvertisedTime time.Time + subscriptions int + policy processesConfigData } type processesConfigData interface { Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors) } -func newMetricType(ns []string, last int64, plugin *loadedPlugin) *metricType { +func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricType { return &metricType{ Plugin: plugin, - namespace: ns, - lastAdvertisedTimestamp: last, - policy: cpolicy.NewPolicyNode(), + namespace: ns, + lastAdvertisedTime: last, + policy: cpolicy.NewPolicyNode(), } } @@ -41,8 +42,8 @@ func (m *metricType) Namespace() []string { return m.namespace } -func (m *metricType) LastAdvertisedTimestamp() int64 { - return m.lastAdvertisedTimestamp +func (m *metricType) LastAdvertisedTime() time.Time { + return m.lastAdvertisedTime } func (m *metricType) Subscribe() { diff --git a/control/metrics_test.go b/control/metrics_test.go index e5e7d3a4f..5b80709cf 100644 --- a/control/metrics_test.go +++ b/control/metrics_test.go @@ -13,14 +13,14 @@ import ( func TestMetricType(t *testing.T) { Convey("newMetricType()", t, func() { Convey("returns a metricType", func() { - mt := newMetricType([]string{"test"}, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType([]string{"test"}, time.Now(), new(loadedPlugin)) So(mt, ShouldHaveSameTypeAs, new(metricType)) }) }) Convey("metricType.Namespace()", t, func() { Convey("returns the namespace of a metricType", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) So(mt.Namespace(), ShouldHaveSameTypeAs, ns) So(mt.Namespace(), ShouldResemble, ns) }) @@ -29,16 +29,16 @@ func TestMetricType(t *testing.T) { Convey("returns the namespace of a metricType", func() { ns := []string{"test"} lp := &loadedPlugin{Meta: plugin.PluginMeta{Version: 1}} - mt := newMetricType(ns, time.Now().Unix(), lp) + mt := newMetricType(ns, time.Now(), lp) So(mt.Version(), ShouldEqual, 1) }) }) Convey("metricType.LastAdvertisedTimestamp()", t, func() { Convey("returns the LastAdvertisedTimestamp for the metricType", func() { - ts := time.Now().Unix() + ts := time.Now() mt := newMetricType([]string{"test"}, ts, new(loadedPlugin)) - So(mt.LastAdvertisedTimestamp(), ShouldHaveSameTypeAs, ts) - So(mt.LastAdvertisedTimestamp(), ShouldResemble, ts) + So(mt.LastAdvertisedTime(), ShouldHaveSameTypeAs, ts) + So(mt.LastAdvertisedTime(), ShouldResemble, ts) }) }) } @@ -53,7 +53,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Add()", t, func() { Convey("adds a metricType to the metricCatalog", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() mc.Add(mt) _mt, err := mc.Get(ns, -1) @@ -63,7 +63,7 @@ func TestMetricCatalog(t *testing.T) { }) Convey("metricCatalog.Get()", t, func() { mc := newMetricCatalog() - ts := time.Now().Unix() + ts := time.Now() Convey("add multiple metricTypes and get them back", func() { ns := [][]string{ []string{"test1"}, @@ -128,7 +128,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Table()", t, func() { Convey("returns a copy of the table", func() { mc := newMetricCatalog() - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), &loadedPlugin{}) + mt := newMetricType([]string{"foo", "bar"}, time.Now(), &loadedPlugin{}) mc.Add(mt) So(mc.Table(), ShouldHaveSameTypeAs, map[string][]*metricType{}) So(mc.Table()["foo.bar"], ShouldResemble, []*metricType{mt}) @@ -137,7 +137,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Remove()", t, func() { Convey("removes a metricType from the catalog", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() mc.Add(mt) mc.Remove(ns) @@ -148,7 +148,7 @@ func TestMetricCatalog(t *testing.T) { }) Convey("metricCatalog.Next()", t, func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() Convey("returns false on empty table", func() { ok := mc.Next() @@ -167,7 +167,7 @@ func TestMetricCatalog(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - t := time.Now().Unix() + t := time.Now() mt := []*metricType{ newMetricType(ns[0], t, lp), newMetricType(ns[1], t, lp), @@ -208,7 +208,7 @@ func TestSubscribe(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - ts := time.Now().Unix() + ts := time.Now() mt := []*metricType{ newMetricType(ns[0], ts, lp), newMetricType(ns[1], ts, lp), @@ -242,7 +242,7 @@ func TestUnsubscribe(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - ts := time.Now().Unix() + ts := time.Now() mt := []*metricType{ newMetricType(ns[0], ts, lp), newMetricType(ns[1], ts, lp), @@ -278,7 +278,7 @@ func TestUnsubscribe(t *testing.T) { } func TestSubscriptionCount(t *testing.T) { - m := newMetricType([]string{"test"}, time.Now().Unix(), &loadedPlugin{}) + m := newMetricType([]string{"test"}, time.Now(), &loadedPlugin{}) Convey("it returns the subscription count", t, func() { m.Subscribe() So(m.SubscriptionCount(), ShouldEqual, 1) diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 2392cced0..6063aadb1 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -76,7 +76,7 @@ func (f *MockPlugin) Collect(args CollectorArgs, reply *CollectorReply) error { func (c *MockPlugin) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { reply.MetricTypes = []*MetricType{ - NewMetricType([]string{"org", "some_metric"}, time.Now().Unix()), + NewMetricType([]string{"org", "some_metric"}, time.Now()), } return nil } diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index 24d47854d..af3db90e7 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -47,22 +47,22 @@ var ( ) type MetricType struct { - namespace []string - lastAdvertisedTimestamp int64 + namespace []string + lastAdvertisedTime time.Time } func (m *MetricType) Namespace() []string { return m.namespace } -func (m *MetricType) LastAdvertisedTimestamp() int64 { - return m.lastAdvertisedTimestamp +func (m *MetricType) LastAdvertisedTime() time.Time { + return m.lastAdvertisedTime } -func NewMetricType(ns []string, last int64) *MetricType { +func NewMetricType(ns []string, last time.Time) *MetricType { return &MetricType{ - namespace: ns, - lastAdvertisedTimestamp: last, + namespace: ns, + lastAdvertisedTime: last, } } diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 84d2a18f2..75d52aecd 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -21,7 +21,7 @@ func TestPluginType(t *testing.T) { func TestMetricType(t *testing.T) { Convey("MetricType", t, func() { - now := time.Now().Unix() + now := time.Now() m := NewMetricType([]string{"foo", "bar"}, now) Convey("New", func() { So(m, ShouldHaveSameTypeAs, &MetricType{}) @@ -30,7 +30,7 @@ func TestMetricType(t *testing.T) { So(m.Namespace(), ShouldResemble, []string{"foo", "bar"}) }) Convey("Get LastAdvertisedTimestamp", func() { - So(m.LastAdvertisedTimestamp(), ShouldEqual, now) + So(m.LastAdvertisedTime().Unix(), ShouldEqual, now.Unix()) }) }) } diff --git a/core/metric.go b/core/metric.go index bf0770e9e..0ff42afa4 100644 --- a/core/metric.go +++ b/core/metric.go @@ -1,9 +1,13 @@ package core +import ( + "time" +) + type MetricType interface { Version() int Namespace() []string - LastAdvertisedTimestamp() int64 + LastAdvertisedTime() time.Time } type Metric interface { diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index a1ff09b8c..276a5a265 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -22,7 +22,7 @@ func (f *Dummy) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) func (f *Dummy) GetMetricTypes(_ plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { reply.MetricTypes = []*plugin.MetricType{ - plugin.NewMetricType([]string{"foo", "bar"}, time.Now().Unix()), + plugin.NewMetricType([]string{"foo", "bar"}, time.Now()), } return nil } From 861defb070111c3f4f1fdaf608ba44bb7fa9d906 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Wed, 4 Mar 2015 17:26:59 -0800 Subject: [PATCH 02/15] Staging --- control/router.go | 55 ++++++++++++++++++++++++++++++++++++++++++ control/router_test.go | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 control/router.go create mode 100644 control/router_test.go diff --git a/control/router.go b/control/router.go new file mode 100644 index 000000000..fc13610e3 --- /dev/null +++ b/control/router.go @@ -0,0 +1,55 @@ +// Router is the entry point for execution commands and routing to plugins +package control + +import ( + "log" + "time" + + "github.com/intelsdilabs/pulse/core" + "github.com/intelsdilabs/pulse/core/cdata" +) + +type Router interface { + Collect([]core.MetricType, *cdata.ConfigDataNode, time.Time) *collectionResponse +} + +type RouterResponse interface { +} + +type pluginRouter struct { + // Pointer to control metric catalog + metricCatalog *metricCatalog +} + +func newRouter(mc *metricCatalog) Router { + return &pluginRouter{metricCatalog: mc} +} + +// Calls collector plugins for the metric types and returns collection response containing metrics. Blocking method. +func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse) { + // For each MT sort into plugin types we need to call + log.Println(metricTypes) + + // For each plugin type select a matching available plugin to call + for _, m := range metricTypes { + log.Println(m.Namespace()) + lp, err := p.metricCatalog.resolvePlugin(m.Namespace(), m.Version()) + + // TODO handle error here. Single error fails entire operation. + log.Println(lp, err) + } + + // For each available plugin call available plugin using RPC client and wait for response (goroutines) + + // Wait for all responses(happy) or timeout(unhappy) + + // (happy)reduce responses into single collection response and return + + // (unhappy)return response with timeout state + + return &collectionResponse{} +} + +type collectionResponse struct { + Errors []error +} diff --git a/control/router_test.go b/control/router_test.go new file mode 100644 index 000000000..c0d30079b --- /dev/null +++ b/control/router_test.go @@ -0,0 +1,46 @@ +package control + +import ( + "fmt" + "testing" + "time" + + "github.com/intelsdilabs/pulse/core" + "github.com/intelsdilabs/pulse/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +type MockMetricType struct { + namespace []string +} + +func (m MockMetricType) Namespace() []string { + return m.namespace +} + +func (m MockMetricType) LastAdvertisedTime() time.Time { + return time.Now() +} + +func (m MockMetricType) Version() int { + return 1 +} + +func TestRouter(t *testing.T) { + Convey("given a new router", t, func() { + c := New() + c.Start() + e := c.Load(PluginPath) + fmt.Println(c.PluginCatalog()) + fmt.Println(c.MetricCatalog(), e) + + mc := newMetricCatalog() + r := newRouter(mc) + + m := MockMetricType{namespace: []string{"foo", "bar"}} + cd := cdata.NewNode() + + r.Collect([]core.MetricType{m}, cd, time.Now().Add(time.Second*60)) + + }) +} From cc56722ac09838d46463d388f75331cc9d929222 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Wed, 4 Mar 2015 22:11:25 -0800 Subject: [PATCH 03/15] MOved resolving plugins to the catalog itself --- control/metrics.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/control/metrics.go b/control/metrics.go index b97a24f82..dff0e98b2 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -184,6 +184,14 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error { return m.Unsubscribe() } +func (mc *metricCatalog) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { + m, err := mc.Get(mns, ver) + if err != nil { + return nil, err + } + return m.Plugin, nil +} + func (mc *metricCatalog) get(key string, ver int) (*metricType, error) { var ( ok bool From c86e5ea1a0b2ba42424b026966cece20cde9d5c2 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Wed, 4 Mar 2015 22:11:38 -0800 Subject: [PATCH 04/15] Better looking test --- control/monitor_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/control/monitor_test.go b/control/monitor_test.go index aa01a25ac..cae02b3f4 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -13,8 +13,13 @@ import ( type mockPluginClient struct{} -func (mp *mockPluginClient) Ping() error { return nil } -func (mp *mockPluginClient) Kill(r string) error { return nil } +func (mp *mockPluginClient) Ping() error { + return nil +} + +func (mp *mockPluginClient) Kill(r string) error { + return nil +} func TestMonitor(t *testing.T) { Convey("monitor", t, func() { From 589f05b3c5e7e5498f1019cbc90f55257d4a8036 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Wed, 4 Mar 2015 22:14:27 -0800 Subject: [PATCH 05/15] Fixes and refactoring * Fixes to plugin * Fixes to mismapped RPC calls and broken reply structs * Renamed Collect to CollectMetrics to match GetMettricTypes * Added a proxy object to make RPC wrapping cleaner * broke up control/plugin package to be easier to find objects --- control/plugin/client/client.go | 23 ++- control/plugin/collector.go | 34 ++-- control/plugin/collector_proxy.go | 50 +++++ control/plugin/collector_test.go | 13 +- control/plugin/metric.go | 33 +++ control/plugin/plugin.go | 189 ------------------ control/plugin/plugin_test.go | 5 +- control/plugin/session.go | 178 +++++++++++++++++ control/plugin_manager.go | 8 +- .../pulse-collector-dummy/dummy/dummy.go | 16 +- .../pulse-collector-facter/facter/facter.go | 12 +- 11 files changed, 314 insertions(+), 247 deletions(-) create mode 100644 control/plugin/collector_proxy.go create mode 100644 control/plugin/metric.go create mode 100644 control/plugin/session.go diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 05296ed45..326a995a1 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -17,8 +17,8 @@ type PluginClient interface { // A client providing collector specific plugin method calls. type PluginCollectorClient interface { PluginClient - Collect(plugin.CollectorArgs, *plugin.CollectorReply) error - GetMetricTypes(plugin.GetMetricTypesArgs, *plugin.GetMetricTypesReply) error + CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) + GetMetricTypes() ([]plugin.MetricType, error) } // A client providing processor specific plugin method calls. @@ -46,14 +46,21 @@ func (p *PluginNativeClient) Kill(reason string) error { return err } -func (p *PluginNativeClient) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - err := p.connection.Call("Collector.Collect", args, reply) - return err +func (p *PluginNativeClient) CollectMetrics(mts []plugin.MetricType) ([]plugin.Metric, error) { + // TODO return err if mts is empty + args := plugin.CollectMetricsArgs{MetricTypes: mts} + reply := plugin.CollectMetricsReply{} + + err := p.connection.Call("Collector.CollectMetrics", args, &reply) + return reply.Metrics, err } -func (p *PluginNativeClient) GetMetricTypes(args plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - err := p.connection.Call("Collector.GetMetricTypes", args, reply) - return err +func (p *PluginNativeClient) GetMetricTypes() ([]plugin.MetricType, error) { + args := plugin.GetMetricTypesArgs{} + reply := plugin.GetMetricTypesReply{} + + err := p.connection.Call("Collector.GetMetricTypes", args, &reply) + return reply.MetricTypes, err } func NewCollectorClient(address string, timeout time.Duration) (PluginCollectorClient, error) { diff --git a/control/plugin/collector.go b/control/plugin/collector.go index b5d0a1cf3..d4a16e99a 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector.go @@ -7,28 +7,14 @@ import ( "net/rpc" ) +// Acts as a proxy for RPC calls to a CollectorPlugin. This helps keep the function signature simple +// within plugins vs. having to match required RPC patterns. + // Collector plugin type CollectorPlugin interface { Plugin - Collect(CollectorArgs, *CollectorReply) error - GetMetricTypes(GetMetricTypesArgs, *GetMetricTypesReply) error -} - -// Arguments passed to Collect() for a Collector implementation -type CollectorArgs struct { -} - -// Reply assigned by a Collector implementation using Collect() -type CollectorReply struct { -} - -// GetMetricTypesArgs args passed to GetMetricTypes -type GetMetricTypesArgs struct { -} - -// GetMetricTypesReply assigned by GetMetricTypes() implementation -type GetMetricTypesReply struct { - MetricTypes []*MetricType + CollectMetrics([]MetricType) ([]Metric, error) + GetMetricTypes() ([]MetricType, error) } // Execution method for a Collector plugin. Error and exit code (int) returned. @@ -44,9 +30,13 @@ func StartCollector(c CollectorPlugin, s Session, r *Response) (error, int) { s.Logger().Printf("Listening %s\n", l.Addr()) s.Logger().Printf("Session token %s\n", s.Token()) - // We register RPC even in non-daemon mode to ensure it would be successful. - // Register the collector RPC methods from plugin implementation - rpc.Register(c) + // Create our proxy + proxy := &collectorPluginProxy{ + Plugin: c, + Session: s, + } + // Register the proxy under the "Collector" namespace + rpc.RegisterName("Collector", proxy) // Register common plugin methods used for utility reasons e := rpc.Register(s) if e != nil { diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go new file mode 100644 index 000000000..075ee5497 --- /dev/null +++ b/control/plugin/collector_proxy.go @@ -0,0 +1,50 @@ +package plugin + +import ( + "errors" + "fmt" +) + +// Arguments passed to CollectMetrics() for a Collector implementation +type CollectMetricsArgs struct { + MetricTypes []MetricType +} + +// Reply assigned by a Collector implementation using CollectMetrics() +type CollectMetricsReply struct { + Metrics []Metric +} + +// GetMetricTypesArgs args passed to GetMetricTypes +type GetMetricTypesArgs struct { +} + +// GetMetricTypesReply assigned by GetMetricTypes() implementation +type GetMetricTypesReply struct { + MetricTypes []MetricType +} + +type collectorPluginProxy struct { + Plugin CollectorPlugin + Session Session +} + +func (c *collectorPluginProxy) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { + c.Session.Logger().Println("GetMetricTypes called") + mts, err := c.Plugin.GetMetricTypes() + if err != nil { + return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) + } + reply.MetricTypes = mts + return nil +} + +func (c *collectorPluginProxy) CollectMetrics(args CollectMetricsArgs, reply *CollectMetricsReply) error { + c.Session.Logger().Println("CollectMetrics called") + ms, err := c.Plugin.CollectMetrics(args.MetricTypes) + if err != nil { + return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) + } + reply.Metrics = ms + return nil +} diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 6063aadb1..ab5efa56b 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -70,15 +70,14 @@ type MockPlugin struct { Policy ConfigPolicy } -func (f *MockPlugin) Collect(args CollectorArgs, reply *CollectorReply) error { - return nil +func (f *MockPlugin) CollectMetrics(_ []MetricType) ([]Metric, error) { + return []Metric{}, nil } -func (c *MockPlugin) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { - reply.MetricTypes = []*MetricType{ - NewMetricType([]string{"org", "some_metric"}, time.Now()), - } - return nil +func (c *MockPlugin) GetMetricTypes() ([]MetricType, error) { + return []MetricType{ + *NewMetricType([]string{"org", "some_metric"}), + }, nil } func TestStartCollector(t *testing.T) { diff --git a/control/plugin/metric.go b/control/plugin/metric.go new file mode 100644 index 000000000..3c45c70e2 --- /dev/null +++ b/control/plugin/metric.go @@ -0,0 +1,33 @@ +package plugin + +import ( + "time" +) + +// The metric used by plugin will probably have to be typed by metric data to +// allow passing over RPC. This can on the agent side meet core.Metric interface +// by functions supporting conversion. +type Metric struct { + Namespace []string + Data interface{} +} + +type MetricType struct { + Namespace_ []string + LastAdvertisedTime_ time.Time +} + +func (m *MetricType) Namespace() []string { + return m.Namespace_ +} + +func (m *MetricType) LastAdvertisedTime() time.Time { + return m.LastAdvertisedTime_ +} + +func NewMetricType(ns []string) *MetricType { + return &MetricType{ + Namespace_: ns, + LastAdvertisedTime_: time.Now(), + } +} diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index af3db90e7..a94de7aee 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -4,14 +4,7 @@ package plugin // task > control > default import ( - "crypto/rand" "crypto/rsa" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "log" - "os" "time" ) @@ -46,26 +39,6 @@ var ( } ) -type MetricType struct { - namespace []string - lastAdvertisedTime time.Time -} - -func (m *MetricType) Namespace() []string { - return m.namespace -} - -func (m *MetricType) LastAdvertisedTime() time.Time { - return m.lastAdvertisedTime -} - -func NewMetricType(ns []string, last time.Time) *MetricType { - return &MetricType{ - namespace: ns, - lastAdvertisedTime: last, - } -} - type PluginResponseState int type PluginType int @@ -79,33 +52,6 @@ func (p PluginType) String() string { return types[p] } -// Session interface -type Session interface { - Ping(arg PingArgs, b *bool) error - Kill(arg KillArgs, b *bool) error - Logger() *log.Logger - ListenAddress() string - SetListenAddress(string) - ListenPort() string - Token() string - KillChan() chan int - - generateResponse(r *Response) []byte - heartbeatWatch(killChan chan int) - isDaemon() bool -} - -// Started plugin session state -type SessionState struct { - *Arg - LastPing time.Time - - token string - listenAddress string - killChan chan int - logger *log.Logger -} - // Arguments passed to startup of Plugin type Arg struct { // Plugin file path to binary @@ -125,13 +71,6 @@ func NewArg(pubkey *rsa.PublicKey, logpath string) Arg { } } -// Arguments passed to ping -type PingArgs struct{} - -type KillArgs struct { - Reason string -} - // Response from started plugin type Response struct { Meta PluginMeta @@ -164,134 +103,6 @@ func NewPluginMeta(name string, version int, pluginType PluginType) *PluginMeta } } -// Ping returns nothing in normal operation -func (s *SessionState) Ping(arg PingArgs, b *bool) error { - // For now we return nil. We can return an error if we are shutting - // down or otherwise in a state we should signal poor health. - // Reply should contain any context. - s.LastPing = time.Now() - s.logger.Println("Ping received") - return nil -} - -// Kill will stop a running plugin -func (s *SessionState) Kill(arg KillArgs, b *bool) error { - // Right now we have no coordination needed. In the future we should - // add control to wait on a lock before halting. - s.logger.Printf("Kill called by agent, reason: %s\n", arg.Reason) - go func() { - time.Sleep(time.Second * 2) - s.killChan <- 0 - }() - return nil -} - -// Logger gets the SessionState logger -func (s *SessionState) Logger() *log.Logger { - return s.logger -} - -// ListenAddress gets the SessionState listen address -func (s *SessionState) ListenAddress() string { - return s.listenAddress -} - -//ListenPort gets the SessionState listen port -func (s *SessionState) ListenPort() string { - return s.listenPort -} - -// SetListenAddress sets SessionState listen address -func (s *SessionState) SetListenAddress(a string) { - s.listenAddress = a -} - -// Token gets the SessionState token -func (s *SessionState) Token() string { - return s.token -} - -// KillChan gets the SessionState killchan -func (s *SessionState) KillChan() chan int { - return s.killChan -} - -func (s *SessionState) isDaemon() bool { - return !s.NoDaemon -} - -func (s *SessionState) generateResponse(r *Response) []byte { - // Add common plugin response properties - r.ListenAddress = s.listenAddress - r.Token = s.token - rs, _ := json.Marshal(r) - return rs -} - -func (s *SessionState) heartbeatWatch(killChan chan int) { - s.logger.Println("Heartbeat started") - count := 0 - for { - if time.Now().Sub(s.LastPing) >= PingTimeoutDuration { - count++ - if count >= PingTimeoutLimit { - s.logger.Println("Heartbeat timeout expired") - defer close(killChan) - return - } - } else { - s.logger.Println("Heartbeat timeout reset") - // Reset count - count = 0 - } - time.Sleep(PingTimeoutDuration) - } -} - -// NewSessionState takes the plugin args and returns a SessionState -func NewSessionState(pluginArgsMsg string) (*SessionState, error, int) { - pluginArg := &Arg{} - err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) - if err != nil { - return nil, err, 2 - } - - // If no port was provided we let the OS select a port for us. - // This is safe as address is returned in the Response and keep - // alive prevents unattended plugins. - if pluginArg.listenPort == "" { - pluginArg.listenPort = "0" - } - - // Generate random token for this session - rb := make([]byte, 32) - rand.Read(rb) - rs := base64.URLEncoding.EncodeToString(rb) - - var logger *log.Logger - switch lp := pluginArg.PluginLogPath; lp { - case "", "/tmp": - // Empty means use default tmp log (needs to be removed post-alpha) - lf, err := os.OpenFile("/tmp/pulse_plugin.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 - } - logger = log.New(lf, ">>>", log.Ldate|log.Ltime) - default: - lf, err := os.OpenFile(lp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 - } - logger = log.New(lf, ">>>", log.Ldate|log.Ltime) - } - - return &SessionState{ - Arg: pluginArg, - token: rs, - killChan: make(chan int), - logger: logger}, nil, 0 -} - // Start starts a plugin func Start(m *PluginMeta, c Plugin, p *ConfigPolicy, requestString string) (error, int) { diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 75d52aecd..58a54d6e7 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -22,7 +22,7 @@ func TestPluginType(t *testing.T) { func TestMetricType(t *testing.T) { Convey("MetricType", t, func() { now := time.Now() - m := NewMetricType([]string{"foo", "bar"}, now) + m := NewMetricType([]string{"foo", "bar"}) Convey("New", func() { So(m, ShouldHaveSameTypeAs, &MetricType{}) }) @@ -30,7 +30,8 @@ func TestMetricType(t *testing.T) { So(m.Namespace(), ShouldResemble, []string{"foo", "bar"}) }) Convey("Get LastAdvertisedTimestamp", func() { - So(m.LastAdvertisedTime().Unix(), ShouldEqual, now.Unix()) + So(m.LastAdvertisedTime().Unix(), ShouldBeGreaterThan, now.Unix()-2) + So(m.LastAdvertisedTime().Unix(), ShouldBeLessThan, now.Unix()+2) }) }) } diff --git a/control/plugin/session.go b/control/plugin/session.go new file mode 100644 index 000000000..1e1f7eea2 --- /dev/null +++ b/control/plugin/session.go @@ -0,0 +1,178 @@ +package plugin + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "time" +) + +// Session interface +type Session interface { + Ping(arg PingArgs, b *bool) error + Kill(arg KillArgs, b *bool) error + Logger() *log.Logger + ListenAddress() string + SetListenAddress(string) + ListenPort() string + Token() string + KillChan() chan int + + generateResponse(r *Response) []byte + heartbeatWatch(killChan chan int) + isDaemon() bool +} + +// Arguments passed to ping +type PingArgs struct{} + +type KillArgs struct { + Reason string +} + +// Started plugin session state +type SessionState struct { + *Arg + LastPing time.Time + + token string + listenAddress string + killChan chan int + logger *log.Logger +} + +// Ping returns nothing in normal operation +func (s *SessionState) Ping(arg PingArgs, b *bool) error { + // For now we return nil. We can return an error if we are shutting + // down or otherwise in a state we should signal poor health. + // Reply should contain any context. + s.ResetHeartbeat() + s.logger.Println("Ping received") + return nil +} + +// Kill will stop a running plugin +func (s *SessionState) Kill(arg KillArgs, b *bool) error { + // Right now we have no coordination needed. In the future we should + // add control to wait on a lock before halting. + s.logger.Printf("Kill called by agent, reason: %s\n", arg.Reason) + go func() { + time.Sleep(time.Second * 2) + s.killChan <- 0 + }() + return nil +} + +// Logger gets the SessionState logger +func (s *SessionState) Logger() *log.Logger { + return s.logger +} + +// ListenAddress gets the SessionState listen address +func (s *SessionState) ListenAddress() string { + return s.listenAddress +} + +//ListenPort gets the SessionState listen port +func (s *SessionState) ListenPort() string { + return s.listenPort +} + +// SetListenAddress sets SessionState listen address +func (s *SessionState) SetListenAddress(a string) { + s.listenAddress = a +} + +func (s *SessionState) ResetHeartbeat() { + s.LastPing = time.Now() +} + +// Token gets the SessionState token +func (s *SessionState) Token() string { + return s.token +} + +// KillChan gets the SessionState killchan +func (s *SessionState) KillChan() chan int { + return s.killChan +} + +func (s *SessionState) isDaemon() bool { + return !s.NoDaemon +} + +func (s *SessionState) generateResponse(r *Response) []byte { + // Add common plugin response properties + r.ListenAddress = s.listenAddress + r.Token = s.token + rs, _ := json.Marshal(r) + return rs +} + +func (s *SessionState) heartbeatWatch(killChan chan int) { + s.logger.Println("Heartbeat started") + count := 0 + for { + if time.Now().Sub(s.LastPing) >= PingTimeoutDuration { + count++ + if count >= PingTimeoutLimit { + s.logger.Println("Heartbeat timeout expired") + defer close(killChan) + return + } + } else { + s.logger.Println("Heartbeat timeout reset") + // Reset count + count = 0 + } + time.Sleep(PingTimeoutDuration) + } +} + +// NewSessionState takes the plugin args and returns a SessionState +func NewSessionState(pluginArgsMsg string) (*SessionState, error, int) { + pluginArg := &Arg{} + err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) + if err != nil { + return nil, err, 2 + } + + // If no port was provided we let the OS select a port for us. + // This is safe as address is returned in the Response and keep + // alive prevents unattended plugins. + if pluginArg.listenPort == "" { + pluginArg.listenPort = "0" + } + + // Generate random token for this session + rb := make([]byte, 32) + rand.Read(rb) + rs := base64.URLEncoding.EncodeToString(rb) + + var logger *log.Logger + switch lp := pluginArg.PluginLogPath; lp { + case "", "/tmp": + // Empty means use default tmp log (needs to be removed post-alpha) + lf, err := os.OpenFile("/tmp/pulse_plugin.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 + } + logger = log.New(lf, ">>>", log.Ldate|log.Ltime) + default: + lf, err := os.OpenFile(lp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + if err != nil { + return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 + } + logger = log.New(lf, ">>>", log.Ldate|log.Ltime) + } + + return &SessionState{ + Arg: pluginArg, + token: rs, + killChan: make(chan int), + logger: logger}, nil, 0 +} diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 7eb5af61d..8a564bf61 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -12,6 +12,7 @@ import ( "time" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/client" ) const ( @@ -223,11 +224,8 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { log.Println(err.Error()) return nil, err } - col := ap.Client.(plugin.CollectorPlugin) - args := new(plugin.GetMetricTypesArgs) - var reply *plugin.GetMetricTypesReply - //TODO update metric catalog with metric types below - _ = col.GetMetricTypes(*args, reply) + col := ap.Client.(client.PluginCollectorClient) + col.GetMetricTypes() } err = ePlugin.Kill() diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index 276a5a265..fdf75e10a 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -1,8 +1,6 @@ package dummy import ( - "time" - "github.com/intelsdilabs/pulse/control/plugin" ) @@ -16,15 +14,15 @@ const ( type Dummy struct { } -func (f *Dummy) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - return nil +func (f *Dummy) CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) { + m := plugin.Metric{Namespace: []string{"intel", "dummy", "foo"}, Data: 1} + ms := []plugin.Metric{m} + return ms, nil } -func (f *Dummy) GetMetricTypes(_ plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - reply.MetricTypes = []*plugin.MetricType{ - plugin.NewMetricType([]string{"foo", "bar"}, time.Now()), - } - return nil +func (f *Dummy) GetMetricTypes() ([]plugin.MetricType, error) { + m := plugin.NewMetricType([]string{"intel", "dummy", "foo"}) + return []plugin.MetricType{*m}, nil } func Meta() *plugin.PluginMeta { diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go index fe5f85123..43c15f718 100644 --- a/plugin/collector/pulse-collector-facter/facter/facter.go +++ b/plugin/collector/pulse-collector-facter/facter/facter.go @@ -13,13 +13,15 @@ const ( type Facter struct { } -func (f *Facter) GetMetricTypes(_ plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - //reply *[]*plugin.MetricType - return nil +func (f *Facter) CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) { + m := plugin.Metric{Namespace: []string{"intel", "facter", "foo"}, Data: 1} + ms := []plugin.Metric{m} + return ms, nil } -func (f *Facter) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - return nil +func (f *Facter) GetMetricTypes() ([]plugin.MetricType, error) { + m := plugin.NewMetricType([]string{"intel", "facter", "foo"}) + return []plugin.MetricType{*m}, nil } func Meta() *plugin.PluginMeta { From e21c03c3f15d8e8b513e82ce449dc7f6fbad10e6 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 5 Mar 2015 08:27:13 -0800 Subject: [PATCH 06/15] Breakup of client package --- control/available_plugin.go | 2 +- control/plugin/client/client.go | 52 ------------------------------ control/plugin/client/native.go | 57 +++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 53 deletions(-) create mode 100644 control/plugin/client/native.go diff --git a/control/available_plugin.go b/control/available_plugin.go index a084aa4ce..c8050c3f2 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -50,7 +50,7 @@ func newAvailablePlugin(resp *plugin.Response) (*availablePlugin, error) { // Create RPC Client switch resp.Type { case plugin.CollectorPluginType: - c, e := client.NewCollectorClient(resp.ListenAddress, DefaultClientTimeout) + c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout) ap.Client = c if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 326a995a1..f8eee63f8 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -1,10 +1,6 @@ package client import ( - "net" - "net/rpc" - "time" - "github.com/intelsdilabs/pulse/control/plugin" ) @@ -26,51 +22,3 @@ type PluginProcessorClient interface { PluginClient ProcessMetricData() } - -// Native clients use golang net/rpc for communication to a native rpc server. -type PluginNativeClient struct { - connection *rpc.Client -} - -func (p *PluginNativeClient) Ping() error { - a := plugin.PingArgs{} - b := true - err := p.connection.Call("SessionState.Ping", a, &b) - return err -} - -func (p *PluginNativeClient) Kill(reason string) error { - a := plugin.KillArgs{Reason: reason} - b := true - err := p.connection.Call("SessionState.Kill", a, &b) - return err -} - -func (p *PluginNativeClient) CollectMetrics(mts []plugin.MetricType) ([]plugin.Metric, error) { - // TODO return err if mts is empty - args := plugin.CollectMetricsArgs{MetricTypes: mts} - reply := plugin.CollectMetricsReply{} - - err := p.connection.Call("Collector.CollectMetrics", args, &reply) - return reply.Metrics, err -} - -func (p *PluginNativeClient) GetMetricTypes() ([]plugin.MetricType, error) { - args := plugin.GetMetricTypesArgs{} - reply := plugin.GetMetricTypesReply{} - - err := p.connection.Call("Collector.GetMetricTypes", args, &reply) - return reply.MetricTypes, err -} - -func NewCollectorClient(address string, timeout time.Duration) (PluginCollectorClient, error) { - // Attempt to dial address error on timeout or problem - conn, err := net.DialTimeout("tcp", address, timeout) - // Return nil RPCClient and err if encoutered - if err != nil { - return nil, err - } - r := rpc.NewClient(conn) - p := &PluginNativeClient{connection: r} - return p, nil -} diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go new file mode 100644 index 000000000..b8b7d50c0 --- /dev/null +++ b/control/plugin/client/native.go @@ -0,0 +1,57 @@ +package client + +import ( + "net" + "net/rpc" + "time" + + "github.com/intelsdilabs/pulse/control/plugin" +) + +// Native clients use golang net/rpc for communication to a native rpc server. +type PluginNativeClient struct { + connection *rpc.Client +} + +func NewCollectorNativeClient(address string, timeout time.Duration) (PluginCollectorClient, error) { + // Attempt to dial address error on timeout or problem + conn, err := net.DialTimeout("tcp", address, timeout) + // Return nil RPCClient and err if encoutered + if err != nil { + return nil, err + } + r := rpc.NewClient(conn) + p := &PluginNativeClient{connection: r} + return p, nil +} + +func (p *PluginNativeClient) Ping() error { + a := plugin.PingArgs{} + b := true + err := p.connection.Call("SessionState.Ping", a, &b) + return err +} + +func (p *PluginNativeClient) Kill(reason string) error { + a := plugin.KillArgs{Reason: reason} + var b bool + err := p.connection.Call("SessionState.Kill", a, &b) + return err +} + +func (p *PluginNativeClient) CollectMetrics(mts []plugin.MetricType) ([]plugin.Metric, error) { + // TODO return err if mts is empty + args := plugin.CollectMetricsArgs{MetricTypes: mts} + reply := plugin.CollectMetricsReply{} + + err := p.connection.Call("Collector.CollectMetrics", args, &reply) + return reply.Metrics, err +} + +func (p *PluginNativeClient) GetMetricTypes() ([]plugin.MetricType, error) { + args := plugin.GetMetricTypesArgs{} + reply := plugin.GetMetricTypesReply{} + + err := p.connection.Call("Collector.GetMetricTypes", args, &reply) + return reply.MetricTypes, err +} From 157f96e0f1e315e7c7d39816ea8c806386bf2f98 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 5 Mar 2015 09:20:28 -0800 Subject: [PATCH 07/15] Partial redo of plugin metric and metric type passing behavior --- control/plugin/client/client.go | 7 +- control/plugin/client/native.go | 28 ++++++-- control/plugin/collector.go | 4 +- control/plugin/collector_proxy.go | 16 +++-- control/plugin/metric.go | 72 +++++++++++++++---- control/plugin/session.go | 1 + control/plugin_manager.go | 9 ++- control/router_test.go | 8 +++ .../pulse-collector-dummy/dummy/dummy.go | 12 ++-- .../pulse-collector-facter/facter/facter.go | 12 ++-- 10 files changed, 126 insertions(+), 43 deletions(-) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index f8eee63f8..7eb61d414 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -1,7 +1,8 @@ package client import ( - "github.com/intelsdilabs/pulse/control/plugin" + // "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" ) // A client providing common plugin method calls. @@ -13,8 +14,8 @@ type PluginClient interface { // A client providing collector specific plugin method calls. type PluginCollectorClient interface { PluginClient - CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) - GetMetricTypes() ([]plugin.MetricType, error) + CollectMetrics([]core.MetricType) ([]core.Metric, error) + GetMetricTypes() ([]core.MetricType, error) } // A client providing processor specific plugin method calls. diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index b8b7d50c0..e80cce9aa 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -6,6 +6,7 @@ import ( "time" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" ) // Native clients use golang net/rpc for communication to a native rpc server. @@ -39,19 +40,36 @@ func (p *PluginNativeClient) Kill(reason string) error { return err } -func (p *PluginNativeClient) CollectMetrics(mts []plugin.MetricType) ([]plugin.Metric, error) { +func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.MetricType) ([]core.Metric, error) { + // Convert core.MetricType slice into plugin.PluginMetricType slice as we have + // to send structs over RPC + pluginMetricTypes := make([]plugin.PluginMetricType, len(coreMetricTypes)) + for i, _ := range coreMetricTypes { + pluginMetricTypes[i] = *plugin.NewPluginMetricType(coreMetricTypes[i].Namespace()) + } + // TODO return err if mts is empty - args := plugin.CollectMetricsArgs{MetricTypes: mts} + args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes} reply := plugin.CollectMetricsReply{} err := p.connection.Call("Collector.CollectMetrics", args, &reply) - return reply.Metrics, err + + retMetrics := make([]core.Metric, len(reply.PluginMetrics)) + for i, _ := range reply.PluginMetrics { + retMetrics[i] = reply.PluginMetrics[i] + } + return retMetrics, err } -func (p *PluginNativeClient) GetMetricTypes() ([]plugin.MetricType, error) { +func (p *PluginNativeClient) GetMetricTypes() ([]core.MetricType, error) { args := plugin.GetMetricTypesArgs{} reply := plugin.GetMetricTypesReply{} err := p.connection.Call("Collector.GetMetricTypes", args, &reply) - return reply.MetricTypes, err + + retMetricTypes := make([]core.MetricType, len(reply.PluginMetricTypes)) + for i, _ := range reply.PluginMetricTypes { + retMetricTypes[i] = reply.PluginMetricTypes[i] + } + return retMetricTypes, err } diff --git a/control/plugin/collector.go b/control/plugin/collector.go index d4a16e99a..e161f0565 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector.go @@ -13,8 +13,8 @@ import ( // Collector plugin type CollectorPlugin interface { Plugin - CollectMetrics([]MetricType) ([]Metric, error) - GetMetricTypes() ([]MetricType, error) + CollectMetrics([]PluginMetricType) ([]PluginMetric, error) + GetMetricTypes() ([]PluginMetricType, error) } // Execution method for a Collector plugin. Error and exit code (int) returned. diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index 075ee5497..ddc7be501 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -7,12 +7,12 @@ import ( // Arguments passed to CollectMetrics() for a Collector implementation type CollectMetricsArgs struct { - MetricTypes []MetricType + PluginMetricTypes []PluginMetricType } // Reply assigned by a Collector implementation using CollectMetrics() type CollectMetricsReply struct { - Metrics []Metric + PluginMetrics []PluginMetric } // GetMetricTypesArgs args passed to GetMetricTypes @@ -21,7 +21,7 @@ type GetMetricTypesArgs struct { // GetMetricTypesReply assigned by GetMetricTypes() implementation type GetMetricTypesReply struct { - MetricTypes []MetricType + PluginMetricTypes []PluginMetricType } type collectorPluginProxy struct { @@ -31,20 +31,24 @@ type collectorPluginProxy struct { func (c *collectorPluginProxy) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { c.Session.Logger().Println("GetMetricTypes called") + // Reset heartbeat + c.Session.ResetHeartbeat() mts, err := c.Plugin.GetMetricTypes() if err != nil { return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) } - reply.MetricTypes = mts + reply.PluginMetricTypes = mts return nil } func (c *collectorPluginProxy) CollectMetrics(args CollectMetricsArgs, reply *CollectMetricsReply) error { c.Session.Logger().Println("CollectMetrics called") - ms, err := c.Plugin.CollectMetrics(args.MetricTypes) + // Reset heartbeat + c.Session.ResetHeartbeat() + ms, err := c.Plugin.CollectMetrics(args.PluginMetricTypes) if err != nil { return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) } - reply.Metrics = ms + reply.PluginMetrics = ms return nil } diff --git a/control/plugin/metric.go b/control/plugin/metric.go index 3c45c70e2..3babd8b0f 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -4,30 +4,76 @@ import ( "time" ) -// The metric used by plugin will probably have to be typed by metric data to -// allow passing over RPC. This can on the agent side meet core.Metric interface -// by functions supporting conversion. -type Metric struct { - Namespace []string - Data interface{} +// Represents a collected metric. Only used within plugins and across plugin calls. +// Converted to core.Metric before being used within modules. +type PluginMetric struct { + Namespace_ []string + Data_ interface{} } -type MetricType struct { +func (p PluginMetric) Namespace() []string { + return p.Namespace_ +} + +func (p PluginMetric) Data() interface{} { + return nil +} + +// Represents a metric type. Only used within plugins and across plugin calls. +// Converted to core.MetricType before being used within modules. +type PluginMetricType struct { Namespace_ []string LastAdvertisedTime_ time.Time + Version_ int } -func (m *MetricType) Namespace() []string { - return m.Namespace_ +// Returns the namespace. +func (p PluginMetricType) Namespace() []string { + return p.Namespace_ } -func (m *MetricType) LastAdvertisedTime() time.Time { - return m.LastAdvertisedTime_ +// Returns the last time this metric type was received from the plugin. +func (p PluginMetricType) LastAdvertisedTime() time.Time { + return p.LastAdvertisedTime_ } -func NewMetricType(ns []string) *MetricType { - return &MetricType{ +// Returns the namespace. +func (p PluginMetricType) Version() int { + return p.Version_ +} + +func NewPluginMetricType(ns []string) *PluginMetricType { + return &PluginMetricType{ Namespace_: ns, LastAdvertisedTime_: time.Now(), } } + +/* + +core.Metric(i) (used by pulse modules) +core.MetricType(i) (used by pulse modules) + +PluginMetric (created by plugins and returned, goes over RPC) +PLuginMetricType (created by plugins and returned, goes over RPC) + +Get + +agent - call Get +client - call Get +plugin - builds array of PluginMetricTypes +plugin - return array of PluginMetricTypes +client - returns array of PluginMetricTypes through MetricType interface +agent - receives MetricTypes + +Collect + +agent - call Collect pass MetricTypes +client - call Collect, convert MetricTypes into new (plugin) PluginMetricTypes struct +plugin - build array of PluginMetric based on (plugin) MetricTypes +plugin - return array of PluginMetrics +client - return array of PluginMetrics through core.Metrics interface +agent - receive array of core.Metrics + + +*/ diff --git a/control/plugin/session.go b/control/plugin/session.go index 1e1f7eea2..85324f197 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -21,6 +21,7 @@ type Session interface { ListenPort() string Token() string KillChan() chan int + ResetHeartbeat() generateResponse(r *Response) []byte heartbeatWatch(killChan chan int) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 8a564bf61..10d03164c 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -224,8 +224,13 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { log.Println(err.Error()) return nil, err } - col := ap.Client.(client.PluginCollectorClient) - col.GetMetricTypes() + colClient := ap.Client.(client.PluginCollectorClient) + metricTypes, err := colClient.GetMetricTypes() + if err != nil { + log.Println(err) + return nil, err + } + fmt.Println(metricTypes) } err = ePlugin.Kill() diff --git a/control/router_test.go b/control/router_test.go index c0d30079b..bfc581ede 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -2,6 +2,8 @@ package control import ( "fmt" + "os" + "path" "testing" "time" @@ -10,6 +12,12 @@ import ( . "github.com/smartystreets/goconvey/convey" ) +var ( + PluginName = "pulse-collector-dummy" + PulsePath = os.Getenv("PULSE_PATH") + PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) +) + type MockMetricType struct { namespace []string } diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index fdf75e10a..f65c31131 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -14,15 +14,15 @@ const ( type Dummy struct { } -func (f *Dummy) CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) { - m := plugin.Metric{Namespace: []string{"intel", "dummy", "foo"}, Data: 1} - ms := []plugin.Metric{m} +func (f *Dummy) CollectMetrics([]plugin.PluginMetricType) ([]plugin.PluginMetric, error) { + m := plugin.PluginMetric{Namespace_: []string{"intel", "dummy", "foo"}, Data_: 1} + ms := []plugin.PluginMetric{m} return ms, nil } -func (f *Dummy) GetMetricTypes() ([]plugin.MetricType, error) { - m := plugin.NewMetricType([]string{"intel", "dummy", "foo"}) - return []plugin.MetricType{*m}, nil +func (f *Dummy) GetMetricTypes() ([]plugin.PluginMetricType, error) { + m := plugin.NewPluginMetricType([]string{"intel", "dummy", "foo"}) + return []plugin.PluginMetricType{*m}, nil } func Meta() *plugin.PluginMeta { diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go index 43c15f718..d2fa40bca 100644 --- a/plugin/collector/pulse-collector-facter/facter/facter.go +++ b/plugin/collector/pulse-collector-facter/facter/facter.go @@ -13,15 +13,15 @@ const ( type Facter struct { } -func (f *Facter) CollectMetrics([]plugin.MetricType) ([]plugin.Metric, error) { - m := plugin.Metric{Namespace: []string{"intel", "facter", "foo"}, Data: 1} - ms := []plugin.Metric{m} +func (f *Facter) CollectMetrics([]plugin.PluginMetricType) ([]plugin.PluginMetric, error) { + m := plugin.PluginMetric{Namespace_: []string{"intel", "facter", "foo"}, Data_: 1} + ms := []plugin.PluginMetric{m} return ms, nil } -func (f *Facter) GetMetricTypes() ([]plugin.MetricType, error) { - m := plugin.NewMetricType([]string{"intel", "facter", "foo"}) - return []plugin.MetricType{*m}, nil +func (f *Facter) GetMetricTypes() ([]plugin.PluginMetricType, error) { + m := plugin.NewPluginMetricType([]string{"intel", "facter", "foo"}) + return []plugin.PluginMetricType{*m}, nil } func Meta() *plugin.PluginMeta { From 59ff825b6ae47eb1106d6b7bdf40fe4614e634b0 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 5 Mar 2015 14:55:37 -0800 Subject: [PATCH 08/15] On plugin loading metric types are now added to metric catalog --- control/control.go | 30 ++++++++++++--------- control/control_test.go | 46 +++++++++++++++++++------------- control/metrics.go | 11 ++++++++ control/plugin/collector_test.go | 14 ++++++---- control/plugin/plugin_test.go | 4 +-- control/plugin_manager.go | 18 ++++++++++--- control/plugin_manager_test.go | 4 +++ control/router_test.go | 20 ++++++++------ 8 files changed, 97 insertions(+), 50 deletions(-) diff --git a/control/control.go b/control/control.go index 876b1a14e..a757ae727 100644 --- a/control/control.go +++ b/control/control.go @@ -37,11 +37,13 @@ type managesPlugins interface { LoadPlugin(string) (*loadedPlugin, error) UnloadPlugin(CatalogedPlugin) error LoadedPlugins() *loadedPlugins + SetMetricCatalog(catalogsMetrics) } type catalogsMetrics interface { Get([]string, int) (*metricType, error) Add(*metricType) + AddLoadedMetricType(*loadedPlugin, core.MetricType) Item() (string, []*metricType) Next() bool Subscribe([]string, int) error @@ -65,11 +67,21 @@ func (s *subGetError) Errors() []error { // Returns a new pluginControl instance func New() *pluginControl { - c := &pluginControl{ - eventManager: gomit.NewEventController(), - pluginManager: newPluginManager(), - metricCatalog: newMetricCatalog(), - } + c := &pluginControl{} + // Initialize components + // + // 1. Metric Catalog + c.metricCatalog = newMetricCatalog() + + // 2. Plugin Manager + c.pluginManager = newPluginManager() + // Plugin Manager needs a reference to the metric catalog + c.pluginManager.SetMetricCatalog(c.metricCatalog) + + // Event Manager + c.eventManager = gomit.NewEventController() + + // Wire event manager // c.loadRequestsChan = make(chan LoadedPlugin) // privatekey, err := rsa.GenerateKey(rand.Reader, 4096) @@ -201,14 +213,6 @@ func (p *pluginControl) UnsubscribeMetric(metric []string, ver int) { p.eventManager.Emit(e) } -func (p *pluginControl) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { - m, err := p.metricCatalog.Get(mns, ver) - if err != nil { - return nil, err - } - return m.Plugin, nil -} - // the public interface for a plugin // this should be the contract for // how mgmt modules know a plugin diff --git a/control/control_test.go b/control/control_test.go index cb92a844d..2758364c8 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" @@ -42,6 +43,10 @@ func (m *MockPluginManagerBadSwap) LoadedPlugins() *loadedPlugins { return nil } +func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) { + +} + func TestControlNew(t *testing.T) { } @@ -300,6 +305,10 @@ func (m *mc) Next() bool { return false } +func (m *mc) AddLoadedMetricType(*loadedPlugin, core.MetricType) { + +} + type mockCDProc struct { } @@ -356,24 +365,25 @@ func TestUnsubscribeMetric(t *testing.T) { }) } -func TestResolvePlugin(t *testing.T) { - Convey(".resolvePlugin()", t, func() { - c := New() - lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) - c.metricCatalog.Add(mt) - Convey("it resolves the plugin", func() { - p, err := c.resolvePlugin([]string{"foo", "bar"}, -1) - So(err, ShouldBeNil) - So(p, ShouldEqual, lp) - }) - Convey("it returns an error if the metricType cannot be found", func() { - p, err := c.resolvePlugin([]string{"baz", "qux"}, -1) - So(p, ShouldBeNil) - So(err, ShouldResemble, errors.New("metric not found")) - }) - }) -} +// TODO move to metricCatalog +// func TestResolvePlugin(t *testing.T) { +// Convey(".resolvePlugin()", t, func() { +// c := New() +// lp := &loadedPlugin{} +// mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) +// c.metricCatalog.Add(mt) +// Convey("it resolves the plugin", func() { +// p, err := c.resolvePlugin([]string{"foo", "bar"}, -1) +// So(err, ShouldBeNil) +// So(p, ShouldEqual, lp) +// }) +// Convey("it returns an error if the metricType cannot be found", func() { +// p, err := c.resolvePlugin([]string{"baz", "qux"}, -1) +// So(p, ShouldBeNil) +// So(err, ShouldResemble, errors.New("metric not found")) +// }) +// }) +// } func TestExportedMetricCatalog(t *testing.T) { Convey(".MetricCatalog()", t, func() { diff --git a/control/metrics.go b/control/metrics.go index dff0e98b2..c5e06cc1e 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" ) @@ -84,6 +85,16 @@ func newMetricCatalog() *metricCatalog { } } +func (m *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.MetricType) { + newMt := metricType{ + Plugin: lp, + namespace: mt.Namespace(), + lastAdvertisedTime: mt.LastAdvertisedTime(), + // policy processesConfigData, TODO + } + m.Add(&newMt) +} + // adds a metricType pointer to the loadedPlugins table func (mc *metricCatalog) Add(m *metricType) { diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index ab5efa56b..93951e4b7 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -48,6 +48,10 @@ func (s *MockSessionState) Token() string { return s.token } +func (m *MockSessionState) ResetHeartbeat() { + +} + func (s *MockSessionState) KillChan() chan int { return s.killChan } @@ -70,13 +74,13 @@ type MockPlugin struct { Policy ConfigPolicy } -func (f *MockPlugin) CollectMetrics(_ []MetricType) ([]Metric, error) { - return []Metric{}, nil +func (f *MockPlugin) CollectMetrics(_ []PluginMetricType) ([]PluginMetric, error) { + return []PluginMetric{}, nil } -func (c *MockPlugin) GetMetricTypes() ([]MetricType, error) { - return []MetricType{ - *NewMetricType([]string{"org", "some_metric"}), +func (c *MockPlugin) GetMetricTypes() ([]PluginMetricType, error) { + return []PluginMetricType{ + *NewPluginMetricType([]string{"org", "some_metric"}), }, nil } diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 58a54d6e7..e9dc7bbf0 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -22,9 +22,9 @@ func TestPluginType(t *testing.T) { func TestMetricType(t *testing.T) { Convey("MetricType", t, func() { now := time.Now() - m := NewMetricType([]string{"foo", "bar"}) + m := NewPluginMetricType([]string{"foo", "bar"}) Convey("New", func() { - So(m, ShouldHaveSameTypeAs, &MetricType{}) + So(m, ShouldHaveSameTypeAs, &PluginMetricType{}) }) Convey("Get Namespace", func() { So(m.Namespace(), ShouldResemble, []string{"foo", "bar"}) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 10d03164c..feb30e586 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -173,10 +173,10 @@ func (lp *loadedPlugin) LoadedTimestamp() int64 { // the struct representing the object responsible for // loading and unloading plugins type pluginManager struct { + metricCatalog catalogsMetrics loadedPlugins *loadedPlugins - - privKey *rsa.PrivateKey - pubKey *rsa.PublicKey + privKey *rsa.PrivateKey + pubKey *rsa.PublicKey } func newPluginManager() *pluginManager { @@ -186,6 +186,10 @@ func newPluginManager() *pluginManager { return p } +func (p *pluginManager) SetMetricCatalog(mc catalogsMetrics) { + p.metricCatalog = mc +} + // Returns loaded plugins func (p *pluginManager) LoadedPlugins() *loadedPlugins { return p.loadedPlugins @@ -218,6 +222,7 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { log.Println(err) return nil, err } + if resp.Type == plugin.CollectorPluginType { ap, err := newAvailablePlugin(resp) if err != nil { @@ -230,7 +235,12 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { log.Println(err) return nil, err } - fmt.Println(metricTypes) + // Add metric types to metric catalog + // + // For each MT returned we pair the loaded plugin and call AddLoadedMetricType to add + for _, mt := range metricTypes { + p.metricCatalog.AddLoadedMetricType(lPlugin, mt) + } } err = ePlugin.Kill() diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 78dbac420..68a8e8178 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -63,6 +63,7 @@ func TestLoadPlugin(t *testing.T) { Convey("loads plugin successfully", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath) So(lp, ShouldHaveSameTypeAs, new(loadedPlugin)) @@ -91,6 +92,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a loaded plugin is unloaded", func() { Convey("then it is removed from the loadedPlugins", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) num_plugins_loaded := len(p.LoadedPlugins().Table()) @@ -105,6 +107,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a loaded plugin is not in a PluginLoaded state", func() { Convey("then an error is thrown", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) lp, _ := p.LoadedPlugins().Get(0) lp.State = DetectedState @@ -116,6 +119,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a plugin is already unloaded", func() { Convey("then an error is thrown", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) plugin, _ := p.LoadedPlugins().Get(0) diff --git a/control/router_test.go b/control/router_test.go index bfc581ede..547799095 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -2,8 +2,8 @@ package control import ( "fmt" - "os" - "path" + // "os" + // "path" "testing" "time" @@ -12,11 +12,11 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -var ( - PluginName = "pulse-collector-dummy" - PulsePath = os.Getenv("PULSE_PATH") - PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) -) +// var ( +// PluginName = "pulse-collector-dummy" +// PulsePath = os.Getenv("PULSE_PATH") +// PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) +// ) type MockMetricType struct { namespace []string @@ -39,7 +39,11 @@ func TestRouter(t *testing.T) { c := New() c.Start() e := c.Load(PluginPath) - fmt.Println(c.PluginCatalog()) + fmt.Println("\nPlugin Catalog\n*****") + for _, p := range c.PluginCatalog() { + fmt.Printf("%s %d\n", p.Name(), p.Version()) + } + fmt.Println("\nMetric Catalog\n*****") fmt.Println(c.MetricCatalog(), e) mc := newMetricCatalog() From 95e997e93631c8c2a8fa2709cf4f9d0082a012b0 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 5 Mar 2015 18:58:09 -0800 Subject: [PATCH 09/15] Wiring up to AP check in router --- control/control.go | 30 +++++++- control/plugin_manager.go | 4 ++ control/router.go | 71 ++++++++++++++----- control/router_test.go | 48 ++++++++----- control/runner.go | 4 ++ .../pulse-collector-dummy/dummy/dummy.go | 5 +- 6 files changed, 123 insertions(+), 39 deletions(-) diff --git a/control/control.go b/control/control.go index a757ae727..9c0ffb7cb 100644 --- a/control/control.go +++ b/control/control.go @@ -3,6 +3,7 @@ package control import ( "crypto/rsa" "errors" + "time" "github.com/intelsdilabs/gomit" @@ -29,8 +30,21 @@ type pluginControl struct { controlPrivKey *rsa.PrivateKey controlPubKey *rsa.PublicKey eventManager *gomit.EventController - pluginManager managesPlugins - metricCatalog catalogsMetrics + + pluginManager managesPlugins + metricCatalog catalogsMetrics + pluginRunner runsPlugins + pluginRouter routesToPlugins +} + +type routesToPlugins interface { + Collect([]core.MetricType, *cdata.ConfigDataNode, time.Time) (*collectionResponse, error) + SetRunner(runsPlugins) + SetMetricCatalog(catalogsMetrics) +} + +type runsPlugins interface { + AvailablePlugins() *availablePlugins } type managesPlugins interface { @@ -48,6 +62,8 @@ type catalogsMetrics interface { Next() bool Subscribe([]string, int) error Unsubscribe([]string, int) error + Table() map[string][]*metricType + resolvePlugin([]string, int) (*loadedPlugin, error) } // an interface used to polymorph the return from @@ -73,7 +89,15 @@ func New() *pluginControl { // 1. Metric Catalog c.metricCatalog = newMetricCatalog() - // 2. Plugin Manager + // 2. Plugin Runner + c.pluginRunner = newRunner() + + // 3. Plugin Router + c.pluginRouter = newPluginRouter() + c.pluginRouter.SetRunner(c.pluginRunner) + c.pluginRouter.SetMetricCatalog(c.metricCatalog) + + // 3. Plugin Manager c.pluginManager = newPluginManager() // Plugin Manager needs a reference to the metric catalog c.pluginManager.SetMetricCatalog(c.metricCatalog) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index feb30e586..3b1ac6781 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -146,6 +146,10 @@ func (lp *loadedPlugin) Name() string { return lp.Meta.Name } +func (l *loadedPlugin) Key() string { + return fmt.Sprintf("%s-%d", l.Name(), l.Version()) +} + // returns plugin version // implements the CatalogedPlugin interface func (lp *loadedPlugin) Version() int { diff --git a/control/router.go b/control/router.go index fc13610e3..fbd168e92 100644 --- a/control/router.go +++ b/control/router.go @@ -2,44 +2,75 @@ package control import ( - "log" + "errors" + "fmt" + "strings" "time" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" ) -type Router interface { - Collect([]core.MetricType, *cdata.ConfigDataNode, time.Time) *collectionResponse -} - type RouterResponse interface { } type pluginRouter struct { - // Pointer to control metric catalog - metricCatalog *metricCatalog + metricCatalog catalogsMetrics + pluginRunner runsPlugins +} + +func newPluginRouter() *pluginRouter { + return &pluginRouter{} } -func newRouter(mc *metricCatalog) Router { - return &pluginRouter{metricCatalog: mc} +type pluginCallSelection struct { + Plugin *loadedPlugin + MetricTypes []core.MetricType } // Calls collector plugins for the metric types and returns collection response containing metrics. Blocking method. -func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse) { +func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse, err error) { // For each MT sort into plugin types we need to call - log.Println(metricTypes) + fmt.Println(metricTypes) + fmt.Println("\nMetric Catalog\n*****") + fmt.Println(p.metricCatalog) + for k, m := range p.metricCatalog.Table() { + fmt.Println(k, m) + } + fmt.Println("\n") + + pluginCallSelectionMap := make(map[string]pluginCallSelection) // For each plugin type select a matching available plugin to call for _, m := range metricTypes { - log.Println(m.Namespace()) - lp, err := p.metricCatalog.resolvePlugin(m.Namespace(), m.Version()) + + // This is set to choose the newest and not pin version. TODO, be sure version is set to -1 if not provided by user on Task creation. + lp, err := p.metricCatalog.resolvePlugin(m.Namespace(), -1) + + // fmt.Println("\nMetric Catalog\n*****") + // for k, m := range p.metricCatalog.Table() { + // fmt.Println(k, m) + // } // TODO handle error here. Single error fails entire operation. - log.Println(lp, err) - } + if err != nil { + // can't find a matching plugin, fail - TODO + } + + if lp == nil { + return nil, errors.New(fmt.Sprintf("Metric missing: %s", strings.Join(m.Namespace(), "/"))) + } + fmt.Printf("Found plugin (%s v%d) for metric (%s)\n", lp.Name(), lp.Version(), strings.Join(m.Namespace(), "/")) + x, _ := pluginCallSelectionMap[lp.Key()] + x.Plugin = lp + x.MetricTypes = append(x.MetricTypes, m) + pluginCallSelectionMap[lp.Key()] = x + + } // For each available plugin call available plugin using RPC client and wait for response (goroutines) + fmt.Println(pluginCallSelectionMap) + fmt.Println(p.pluginRunner.AvailablePlugins().Collectors.Table()) // Wait for all responses(happy) or timeout(unhappy) @@ -47,9 +78,17 @@ func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.Conf // (unhappy)return response with timeout state - return &collectionResponse{} + return &collectionResponse{}, nil } type collectionResponse struct { Errors []error } + +func (p *pluginRouter) SetRunner(r runsPlugins) { + p.pluginRunner = r +} + +func (p *pluginRouter) SetMetricCatalog(m catalogsMetrics) { + p.metricCatalog = m +} diff --git a/control/router_test.go b/control/router_test.go index 547799095..c6b7db85a 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -2,8 +2,8 @@ package control import ( "fmt" - // "os" - // "path" + "os" + "path" "testing" "time" @@ -12,11 +12,11 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -// var ( -// PluginName = "pulse-collector-dummy" -// PulsePath = os.Getenv("PULSE_PATH") -// PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) -// ) +var ( + PluginName = "pulse-collector-dummy" + PulsePath = os.Getenv("PULSE_PATH") + PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) +) type MockMetricType struct { namespace []string @@ -36,23 +36,35 @@ func (m MockMetricType) Version() int { func TestRouter(t *testing.T) { Convey("given a new router", t, func() { + // Create controller c := New() c.Start() - e := c.Load(PluginPath) - fmt.Println("\nPlugin Catalog\n*****") - for _, p := range c.PluginCatalog() { - fmt.Printf("%s %d\n", p.Name(), p.Version()) - } - fmt.Println("\nMetric Catalog\n*****") - fmt.Println(c.MetricCatalog(), e) + // Load plugin + c.Load(PluginPath) + // fmt.Println("\nPlugin Catalog\n*****") + // for _, p := range c.PluginCatalog() { + // fmt.Printf("%s %d\n", p.Name(), p.Version()) + // } - mc := newMetricCatalog() - r := newRouter(mc) + // Create router + // r := newPluginRouter() + // r.metricCatalog = c.metricCatalog - m := MockMetricType{namespace: []string{"foo", "bar"}} + m := []core.MetricType{} + m1 := MockMetricType{namespace: []string{"intel", "dummy", "foo"}} + m2 := MockMetricType{namespace: []string{"intel", "dummy", "bar"}} + // m3 := MockMetricType{namespace: []string{"intel", "dummy", "baz"}} + m = append(m, m1) + m = append(m, m2) + // m = append(m, m3) cd := cdata.NewNode() - r.Collect([]core.MetricType{m}, cd, time.Now().Add(time.Second*60)) + // Call collect on router + cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) + if err != nil { + panic(err) + } + fmt.Println(cr) }) } diff --git a/control/runner.go b/control/runner.go index 975fadba0..7471cb555 100644 --- a/control/runner.go +++ b/control/runner.go @@ -40,6 +40,10 @@ func newRunner() *runner { return r } +func (r *runner) AvailablePlugins() *availablePlugins { + return r.availablePlugins +} + // Adds Delegates (gomit.Delegator) for adding Runner handlers to on Start and // unregistration on Stop. func (r *runner) AddDelegates(delegates ...gomit.Delegator) { diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index f65c31131..d55e07382 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -21,8 +21,9 @@ func (f *Dummy) CollectMetrics([]plugin.PluginMetricType) ([]plugin.PluginMetric } func (f *Dummy) GetMetricTypes() ([]plugin.PluginMetricType, error) { - m := plugin.NewPluginMetricType([]string{"intel", "dummy", "foo"}) - return []plugin.PluginMetricType{*m}, nil + m1 := plugin.NewPluginMetricType([]string{"intel", "dummy", "foo"}) + m2 := plugin.NewPluginMetricType([]string{"intel", "dummy", "bar"}) + return []plugin.PluginMetricType{*m1, *m2}, nil } func Meta() *plugin.PluginMeta { From 52881f83fd3fcd4c374c646e9666836195f4af2f Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Mon, 9 Mar 2015 13:05:51 -0700 Subject: [PATCH 10/15] Commit before merge --- control/control.go | 19 +++++++++----- control/metrics.go | 6 +---- control/plugin/collector_test.go | 3 +-- control/plugin/plugin.go | 26 +++++++++++-------- control/plugin/plugin_test.go | 10 ++++--- control/plugin_manager.go | 2 +- control/router.go | 15 +++++++++-- control/router_test.go | 8 +++++- .../pulse-collector-dummy/dummy/dummy.go | 5 ++-- .../collector/pulse-collector-dummy/main.go | 6 ++--- .../pulse-collector-facter/facter/facter.go | 5 ++-- .../collector/pulse-collector-facter/main.go | 4 +-- 12 files changed, 68 insertions(+), 41 deletions(-) diff --git a/control/control.go b/control/control.go index 9c0ffb7cb..3715c16f4 100644 --- a/control/control.go +++ b/control/control.go @@ -5,6 +5,8 @@ import ( "errors" "time" + // "fmt" + "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" @@ -207,12 +209,17 @@ func (p *pluginControl) SubscribeMetric(metric []string, ver int, cd *cdata.Conf return nil, &subGetError{errs: []error{err}} } - ncdTable, errs := m.policy.Process(cd.Table()) - if errs != nil && errs.HasErrors() { - return nil, errs - } + // fmt.Println(m.Plugin.) + // if m.policy == nil { + // panic("NIL!") + // } + // ncdTable, errs := m.policy.Process(cd.Table()) + // if errs != nil && errs.HasErrors() { + // return nil, errs + // } - ncd := cdata.FromTable(*ncdTable) + // panic(1) + // ncd := cdata.FromTable(*ncdTable) m.Subscribe() @@ -221,7 +228,7 @@ func (p *pluginControl) SubscribeMetric(metric []string, ver int, cd *cdata.Conf } defer p.eventManager.Emit(e) - return ncd, nil + return nil, nil } // unsubscribes a metric diff --git a/control/metrics.go b/control/metrics.go index c5e06cc1e..ed45299a6 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -7,7 +7,6 @@ import ( "time" "github.com/intelsdilabs/pulse/core" - "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" ) @@ -22,11 +21,10 @@ type metricType struct { namespace []string lastAdvertisedTime time.Time subscriptions int - policy processesConfigData } type processesConfigData interface { - Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors) + Process(map[string]ctypes.ConfigValue) *map[string]ctypes.ConfigValue } func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricType { @@ -35,7 +33,6 @@ func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricTyp namespace: ns, lastAdvertisedTime: last, - policy: cpolicy.NewPolicyNode(), } } @@ -90,7 +87,6 @@ func (m *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.MetricType Plugin: lp, namespace: mt.Namespace(), lastAdvertisedTime: mt.LastAdvertisedTime(), - // policy processesConfigData, TODO } m.Add(&newMt) } diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 93951e4b7..396ceeaa8 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -70,8 +70,7 @@ func (s *MockSessionState) heartbeatWatch(killChan chan int) { } type MockPlugin struct { - Meta PluginMeta - Policy ConfigPolicy + Meta PluginMeta } func (f *MockPlugin) CollectMetrics(_ []PluginMetricType) ([]PluginMetric, error) { diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index a94de7aee..f426f361b 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -6,6 +6,8 @@ package plugin import ( "crypto/rsa" "time" + + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) var ( @@ -73,19 +75,20 @@ func NewArg(pubkey *rsa.PublicKey, logpath string) Arg { // Response from started plugin type Response struct { - Meta PluginMeta - ListenAddress string - Token string - Type PluginType + Meta PluginMeta + ListenAddress string + Token string + Type PluginType + ConfigPolicyTree cpolicy.ConfigPolicyTree // State is a signal from plugin to control that it passed // its own loading requirements State PluginResponseState ErrorMessage string } -// ConfigPolicy for plugin -type ConfigPolicy struct { -} +// // ConfigPolicy for plugin +// type ConfigPolicy struct { +// } // PluginMeta for plugin type PluginMeta struct { @@ -104,7 +107,7 @@ func NewPluginMeta(name string, version int, pluginType PluginType) *PluginMeta } // Start starts a plugin -func Start(m *PluginMeta, c Plugin, p *ConfigPolicy, requestString string) (error, int) { +func Start(m *PluginMeta, c Plugin, t *cpolicy.ConfigPolicyTree, requestString string) (error, int) { sessionState, sErr, retCode := NewSessionState(requestString) if sErr != nil { @@ -117,9 +120,10 @@ func Start(m *PluginMeta, c Plugin, p *ConfigPolicy, requestString string) (erro switch m.Type { case CollectorPluginType: r := &Response{ - Type: CollectorPluginType, - State: PluginSuccess, - Meta: *m, + Type: CollectorPluginType, + State: PluginSuccess, + Meta: *m, + ConfigPolicyTree: *t, } err, retCode := StartCollector(c.(CollectorPlugin), sessionState, r) if err != nil { diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index e9dc7bbf0..7795c8904 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" + . "github.com/smartystreets/goconvey/convey" ) @@ -113,17 +115,17 @@ func TestArg(t *testing.T) { func TestPlugin(t *testing.T) { Convey("Start", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicy := new(ConfigPolicy) + mockConfigPolicyTree := new(cpolicy.ConfigPolicyTree) var mockPluginArgs string = "{\"PluginLogPath\": \"/var/tmp/pulse_plugin.log\"}" - err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicy, mockPluginArgs) + err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldBeNil) So(rc, ShouldEqual, 0) }) Convey("Start with invalid args", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicy := new(ConfigPolicy) + mockConfigPolicyTree := new(cpolicy.ConfigPolicyTree) var mockPluginArgs string = "" - err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicy, mockPluginArgs) + err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldNotBeNil) So(rc, ShouldNotEqual, 0) }) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 3b1ac6781..dfad0fde3 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -147,7 +147,7 @@ func (lp *loadedPlugin) Name() string { } func (l *loadedPlugin) Key() string { - return fmt.Sprintf("%s-%d", l.Name(), l.Version()) + return fmt.Sprintf("%s:%d", l.Name(), l.Version()) } // returns plugin version diff --git a/control/router.go b/control/router.go index fbd168e92..3e339b9d9 100644 --- a/control/router.go +++ b/control/router.go @@ -28,6 +28,10 @@ type pluginCallSelection struct { MetricTypes []core.MetricType } +func (p *pluginCallSelection) Count() int { + return len(p.MetricTypes) +} + // Calls collector plugins for the metric types and returns collection response containing metrics. Blocking method. func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse, err error) { // For each MT sort into plugin types we need to call @@ -69,8 +73,15 @@ func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.Conf } // For each available plugin call available plugin using RPC client and wait for response (goroutines) - fmt.Println(pluginCallSelectionMap) - fmt.Println(p.pluginRunner.AvailablePlugins().Collectors.Table()) + + fmt.Println("") + for pluginKey, metrics := range pluginCallSelectionMap { + fmt.Printf("plugin: (%s) has (%d) metrics to gather\n", pluginKey, metrics.Count()) + + p.pluginRunner.AvailablePlugins().Collectors.GetPluginPool(pluginKey) + } + + // fmt.Println(p.pluginRunner.AvailablePlugins().Collectors.Table()["dummy:1"]) // Wait for all responses(happy) or timeout(unhappy) diff --git a/control/router_test.go b/control/router_test.go index c6b7db85a..d7edc4e73 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -58,13 +58,19 @@ func TestRouter(t *testing.T) { m = append(m, m2) // m = append(m, m3) cd := cdata.NewNode() + fmt.Println(cd.Table()) + + fmt.Println(m1.Namespace(), m1.Version(), cd) + // Subscribe + c.SubscribeMetric(m1.Namespace(), m1.Version(), cd) + // fmt.Println(a, e) // Call collect on router cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) if err != nil { panic(err) } - fmt.Println(cr) + fmt.Printf("\nresponse: %+v\n", cr) }) } diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index d55e07382..74cdd5764 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -2,6 +2,7 @@ package dummy import ( "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -30,7 +31,7 @@ func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type) } -func ConfigPolicy() *plugin.ConfigPolicy { - c := new(plugin.ConfigPolicy) +func ConfigPolicyTree() *cpolicy.ConfigPolicyTree { + c := cpolicy.NewTree() return c } diff --git a/plugin/collector/pulse-collector-dummy/main.go b/plugin/collector/pulse-collector-dummy/main.go index 3ddfcec95..8307764cb 100644 --- a/plugin/collector/pulse-collector-dummy/main.go +++ b/plugin/collector/pulse-collector-dummy/main.go @@ -15,12 +15,12 @@ func main() { // the implementation satfiying plugin.CollectorPlugin // the collector configuration policy satifying plugin.ConfigRules - // Define default policy - policy := dummy.ConfigPolicy() + // Define default policy tree + policyTree := dummy.ConfigPolicyTree() // Define metadata about Plugin meta := dummy.Meta() // Start a collector - plugin.Start(meta, new(dummy.Dummy), policy, os.Args[1]) + plugin.Start(meta, new(dummy.Dummy), policyTree, os.Args[1]) } diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go index d2fa40bca..77ee8fd25 100644 --- a/plugin/collector/pulse-collector-facter/facter/facter.go +++ b/plugin/collector/pulse-collector-facter/facter/facter.go @@ -2,6 +2,7 @@ package facter import ( "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -28,7 +29,7 @@ func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type) } -func ConfigPolicy() *plugin.ConfigPolicy { - c := new(plugin.ConfigPolicy) +func ConfigPolicyTree() *cpolicy.ConfigPolicyTree { + c := cpolicy.NewTree() return c } diff --git a/plugin/collector/pulse-collector-facter/main.go b/plugin/collector/pulse-collector-facter/main.go index ec65701a4..d0804a040 100644 --- a/plugin/collector/pulse-collector-facter/main.go +++ b/plugin/collector/pulse-collector-facter/main.go @@ -16,12 +16,12 @@ func main() { // the collector configuration policy satifying plugin.ConfigRules // Define default policy - policy := facter.ConfigPolicy() + policyTree := facter.ConfigPolicyTree() // Define metadata about Plugin meta := facter.Meta() // Start a collector //plugin.StartCollector(meta, new(facter.Facter), policy, os.Args[0], os.Args[1]) - plugin.Start(meta, new(facter.Facter), policy, os.Args[1]) + plugin.Start(meta, new(facter.Facter), policyTree, os.Args[1]) } From 1a3b267d6be373867574eb32bd221546c56db54a Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Mon, 9 Mar 2015 14:15:35 -0700 Subject: [PATCH 11/15] Removed type for Sub error, changed to simple err slice, added err for nil metrictype from catalog --- control/control.go | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/control/control.go b/control/control.go index f6b46dcfd..db900cc6a 100644 --- a/control/control.go +++ b/control/control.go @@ -3,10 +3,9 @@ package control import ( "crypto/rsa" "errors" + "fmt" "time" - // "fmt" - "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" @@ -68,21 +67,6 @@ type catalogsMetrics interface { resolvePlugin([]string, int) (*loadedPlugin, error) } -// an interface used to polymorph the return from -// SubscribeMetric. Processing config data returns -// a type which holds a collection of errors. -type SubscriptionError interface { - Errors() []error -} - -type subGetError struct { - errs []error -} - -func (s *subGetError) Errors() []error { - return s.errs -} - // Returns a new pluginControl instance func New() *pluginControl { c := &pluginControl{} @@ -202,17 +186,23 @@ func (p *pluginControl) generateArgs() plugin.Arg { // SubscribeMetricType validates the given config data, and if valid // returns a MetricType with a config. On error a collection of errors is returned // either from config data processing, or the inability to find the metric. -func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, SubscriptionError) { +func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) { + subErrs := make([]error, 0) m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) if err != nil { - return nil, &subGetError{errs: []error{err}} + subErrs = append(subErrs, err) + return nil, subErrs } + // No metric found return error. if m == nil { - return nil, nil + subErrs = append(subErrs, errors.New(fmt.Sprintf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version()))) + return nil, subErrs } + fmt.Println(m) + // fmt.Println(m.Plugin.) // if m.policy == nil { // panic("NIL!") @@ -234,9 +224,7 @@ func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.Config } defer p.eventManager.Emit(e) - return nil, nil - - // return m, nil + return m, nil } // UnsubscribeMetricType unsubscribes a MetricType From 87ca39e8d7429ca9929e05f70e05faf7993e2002 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Mon, 9 Mar 2015 18:57:28 -0700 Subject: [PATCH 12/15] Refactors available plugins collection to use an available plugin pool and some workflow for router to available plugin pool --- control/available_plugin.go | 122 +++++++++++++++++++++++++----------- control/monitor.go | 6 +- control/router.go | 34 +++++++++- control/router_test.go | 20 +++--- 4 files changed, 130 insertions(+), 52 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index a3d840a6b..b9d902c0b 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -7,8 +7,6 @@ import ( "sync" "time" - "fmt" - "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" "github.com/intelsdilabs/pulse/control/plugin/client" @@ -118,7 +116,7 @@ func (ap *availablePlugin) makeKey() { // apCollection is a collection of availablePlugin type apCollection struct { - table *map[string]*[]*availablePlugin + table *map[string]*availablePluginPool mutex *sync.Mutex keys *[]string currentIter int @@ -126,7 +124,7 @@ type apCollection struct { // newAPCollection returns an apCollection capable of storing availblePlugin func newAPCollection() *apCollection { - m := make(map[string]*[]*availablePlugin) + m := make(map[string]*availablePluginPool) var k []string return &apCollection{ table: &m, @@ -136,15 +134,14 @@ func newAPCollection() *apCollection { } } -func (c *apCollection) GetPluginPool(key string) int { +func (c *apCollection) GetPluginPool(key string) *availablePluginPool { c.Lock() defer c.Unlock() - for k, v := range *c.table { - fmt.Println(k, v) + if ap, ok := (*c.table)[key]; ok { + return ap } - // panic(1) - return 1 + return nil } // Table returns a copy of the apCollection table @@ -154,7 +151,7 @@ func (c *apCollection) Table() map[string][]*availablePlugin { m := make(map[string][]*availablePlugin) for k, v := range *c.table { - m[k] = *v + m[k] = *v.Plugins } return m } @@ -170,23 +167,14 @@ func (c *apCollection) Add(ap *availablePlugin) error { if (*c.table)[ap.Key] != nil { // make sure we don't already have a pointer to this plugin in the table - if exist, i := c.exists(ap); exist { + if exist, i := c.Exists(ap); exist { return errors.New("plugin instance already available at index " + strconv.Itoa(i)) } } else { - var col []*availablePlugin - (*c.table)[ap.Key] = &col + (*c.table)[ap.Key] = newAvailablePluginPool() } - // tell ap its index in the table - ap.Index = len(*(*c.table)[ap.Key]) - - // append - newCollection := append(*(*c.table)[ap.Key], ap) - - // overwrite - (*c.table)[ap.Key] = &newCollection - + (*c.table)[ap.Key].Add(ap) return nil } @@ -194,15 +182,12 @@ func (c *apCollection) Add(ap *availablePlugin) error { func (c *apCollection) Remove(ap *availablePlugin) error { c.Lock() defer c.Unlock() - if exists, _ := c.exists(ap); !exists { + + if exists, _ := c.Exists(ap); !exists { return errors.New("Warning: plugin does not exist in table") } - splicedColl := append((*(*c.table)[ap.Key])[:ap.Index], (*(*c.table)[ap.Key])[ap.Index+1:]...) - (*c.table)[ap.Key] = &splicedColl - //reset indexes - for i, ap := range *(*c.table)[ap.Key] { - ap.Index = i - } + + (*c.table)[ap.Key].Remove(ap) return nil } @@ -217,7 +202,7 @@ func (c *apCollection) Unlock() { } // Item returns the item at current position in the apCollection table -func (c *apCollection) Item() (string, *[]*availablePlugin) { +func (c *apCollection) Item() (string, *availablePluginPool) { key := (*c.keys)[c.currentIter-1] return key, (*c.table)[key] } @@ -234,13 +219,8 @@ func (c *apCollection) Next() bool { // exists checks the table to see if a pointer for the availablePlugin specified // already exists -func (c *apCollection) exists(ap *availablePlugin) (bool, int) { - for i, _ap := range *(*c.table)[ap.Key] { - if ap == _ap { - return true, i - } - } - return false, -1 +func (c *apCollection) Exists(ap *availablePlugin) (bool, int) { + return (*c.table)[ap.Key].Exists(ap) } // availablePlugins is a collection of availablePlugins by type @@ -290,3 +270,71 @@ func (a *availablePlugins) Remove(ap *availablePlugin) error { return errors.New("cannot remove from available plugins, unknown plugin type") } } + +type availablePluginPool struct { + Plugins *[]*availablePlugin + + mutex *sync.Mutex +} + +func newAvailablePluginPool() *availablePluginPool { + app := make([]*availablePlugin, 0) + return &availablePluginPool{ + Plugins: &app, + mutex: &sync.Mutex{}, + } +} + +func (a *availablePluginPool) Lock() { + a.mutex.Lock() +} + +func (a *availablePluginPool) Unlock() { + a.mutex.Unlock() +} + +func (a *availablePluginPool) Count() int { + return len((*a.Plugins)) +} + +func (a *availablePluginPool) Add(ap *availablePlugin) { + a.mutex.Lock() + defer a.mutex.Unlock() + // tell ap its index in the table + ap.Index = len((*a.Plugins)) + // append + newCollection := append((*a.Plugins), ap) + // overwrite + a.Plugins = &newCollection +} + +func (a *availablePluginPool) Remove(ap *availablePlugin) { + a.Lock() + defer a.Unlock() + // Place nil here to allow GC per : https://github.com/golang/go/wiki/SliceTricks + (*a.Plugins)[ap.Index] = nil + splicedColl := append((*a.Plugins)[:ap.Index], (*a.Plugins)[ap.Index+1:]...) + a.Plugins = &splicedColl + //reset indexes + a.resetIndexes() +} + +func (a *availablePluginPool) Exists(ap *availablePlugin) (bool, int) { + for i, _ap := range *a.Plugins { + if ap == _ap { + return true, i + } + } + return false, -1 +} + +func (a *availablePluginPool) resetIndexes() { + for i, ap := range *a.Plugins { + ap.Index = i + } +} + +func (a *availablePluginPool) SelectUsingStrategy(strat RoutingStrategy, timeout time.Duration) (*availablePlugin, error) { + + return nil, nil +} diff --git a/control/monitor.go b/control/monitor.go index 0969ea87b..8ff13d657 100644 --- a/control/monitor.go +++ b/control/monitor.go @@ -60,7 +60,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Collectors.Lock() for availablePlugins.Collectors.Next() { _, apc := availablePlugins.Collectors.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } @@ -70,7 +70,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Publishers.Lock() for availablePlugins.Publishers.Next() { _, apc := availablePlugins.Publishers.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } @@ -80,7 +80,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Processors.Lock() for availablePlugins.Processors.Next() { _, apc := availablePlugins.Processors.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } diff --git a/control/router.go b/control/router.go index 3e339b9d9..59f7f1341 100644 --- a/control/router.go +++ b/control/router.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/intelsdilabs/pulse/control/routing" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" ) @@ -14,13 +15,22 @@ import ( type RouterResponse interface { } +type RoutingStrategy interface { +} + type pluginRouter struct { + Strategy RoutingStrategy + SelectionTimeout time.Duration + metricCatalog catalogsMetrics pluginRunner runsPlugins } func newPluginRouter() *pluginRouter { - return &pluginRouter{} + return &pluginRouter{ + Strategy: &routing.RoundRobinStrategy{}, + SelectionTimeout: time.Second * 30, + } } type pluginCallSelection struct { @@ -78,7 +88,27 @@ func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.Conf for pluginKey, metrics := range pluginCallSelectionMap { fmt.Printf("plugin: (%s) has (%d) metrics to gather\n", pluginKey, metrics.Count()) - p.pluginRunner.AvailablePlugins().Collectors.GetPluginPool(pluginKey) + apPluginPool := p.pluginRunner.AvailablePlugins().Collectors.GetPluginPool(pluginKey) + + if apPluginPool == nil { + // return error because this plugin has no pool + return nil, errors.New(fmt.Sprintf("no available plugins for plugin type (%s)", pluginKey)) + } + + // Lock this apPool so we are the only one operating on it. + if apPluginPool.Count() == 0 { + // return error indicating we have no available plugins to call for Collect + } + // Use a router strategy to select an available plugin from the pool + // This blocks on selection of a non-busy node or timeout expiring + ap, err := apPluginPool.SelectUsingStrategy(p.Strategy, p.SelectionTimeout) + // ap, err := p.Strategy.Select(apPluginPool, p.SelectionTimeout) + // Call CollectMetrics on the client + // metrics, err := ap.Client.CollectMetrics(metricTypes, config) + fmt.Println(ap, err) + if err != nil { + // We had an error on collection return + } } // fmt.Println(p.pluginRunner.AvailablePlugins().Collectors.Table()["dummy:1"]) diff --git a/control/router_test.go b/control/router_test.go index 8ce9ca8a8..167ef1804 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -2,8 +2,8 @@ package control import ( "fmt" - // "os" - // "path" + "os" + "path" "testing" "time" @@ -12,11 +12,11 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -// var ( -// PluginName = "pulse-collector-dummy" -// PulsePath = os.Getenv("PULSE_PATH") -// PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) -// ) +var ( + PluginName = "pulse-collector-dummy" + PulsePath = os.Getenv("PULSE_PATH") + PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) +) type MockMetricType struct { namespace []string @@ -66,13 +66,13 @@ func TestRouter(t *testing.T) { fmt.Println(m1.Namespace(), m1.Version(), cd) // Subscribe - c.SubscribeMetricType(m1, cd) - // fmt.Println(a, e) + a, b := c.SubscribeMetricType(m1, cd) + fmt.Println(a, b) // Call collect on router cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) if err != nil { - panic(err) + fmt.Println(err) } fmt.Printf("\nresponse: %+v\n", cr) From 8baf279b5aaded4d6ea8fc52edc0fcf1cb7312a7 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 12 Mar 2015 17:41:42 -0700 Subject: [PATCH 13/15] New routing behavior. Staged commit awaiting refactoring --- control/available_plugin.go | 38 +++++++- control/control.go | 65 ++++++++------ control/metrics.go | 13 ++- control/monitor.go | 7 +- control/plugin/plugin.go | 4 +- control/plugin/plugin_test.go | 4 +- control/plugin_manager.go | 23 +++-- control/router.go | 88 +++++++++---------- control/router_test.go | 31 ++++--- control/routing/round_robin.go | 35 ++++++++ control/routing/routing.go | 13 +++ control/runner.go | 61 ++++++++++++- core/control_event/control_event.go | 21 ++--- .../pulse-collector-dummy/dummy/dummy.go | 4 + schedule/schedule.go | 5 +- schedule/schedule_test.go | 13 +-- 16 files changed, 293 insertions(+), 132 deletions(-) create mode 100644 control/routing/round_robin.go create mode 100644 control/routing/routing.go diff --git a/control/available_plugin.go b/control/available_plugin.go index b9d902c0b..02a5fb627 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -10,6 +10,7 @@ import ( "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" "github.com/intelsdilabs/pulse/control/plugin/client" + "github.com/intelsdilabs/pulse/control/routing" "github.com/intelsdilabs/pulse/core/control_event" ) @@ -30,6 +31,8 @@ type availablePlugin struct { Client client.PluginClient Index int + hitCount int + lastHitTime time.Time eventManager *gomit.EventController failedHealthChecks int healthChan chan error @@ -45,6 +48,7 @@ func newAvailablePlugin(resp *plugin.Response) (*availablePlugin, error) { eventManager: new(gomit.EventController), healthChan: make(chan error, 1), + lastHitTime: time.Now(), } // Create RPC Client @@ -94,7 +98,7 @@ func (ap *availablePlugin) healthCheckFailed() { pde := &control_event.DisabledPluginEvent{ Name: ap.Name, Version: ap.Version, - Type: ap.Type, + Type: int(ap.Type), Key: ap.Key, Index: ap.Index, } @@ -103,11 +107,19 @@ func (ap *availablePlugin) healthCheckFailed() { hcfe := &control_event.HealthCheckFailedEvent{ Name: ap.Name, Version: ap.Version, - Type: ap.Type, + Type: int(ap.Type), } defer ap.eventManager.Emit(hcfe) } +func (a *availablePlugin) HitCount() int { + return a.hitCount +} + +func (a *availablePlugin) LastHit() time.Time { + return a.lastHitTime +} + // makeKey creates the ap.Key from the ap.Name and ap.Version func (ap *availablePlugin) makeKey() { s := []string{ap.Name, strconv.Itoa(ap.Version)} @@ -144,6 +156,14 @@ func (c *apCollection) GetPluginPool(key string) *availablePluginPool { return nil } +func (c *apCollection) PluginPoolHasAP(key string) bool { + a := c.GetPluginPool(key) + if a != nil && a.Count() > 0 { + return true + } + return false +} + // Table returns a copy of the apCollection table func (c *apCollection) Table() map[string][]*availablePlugin { c.Lock() @@ -334,7 +354,17 @@ func (a *availablePluginPool) resetIndexes() { } } -func (a *availablePluginPool) SelectUsingStrategy(strat RoutingStrategy, timeout time.Duration) (*availablePlugin, error) { +func (a *availablePluginPool) SelectUsingStrategy(strat RoutingStrategy) (*availablePlugin, error) { + a.Lock() + defer a.Unlock() - return nil, nil + sp := make([]routing.SelectablePlugin, len(*a.Plugins)) + for i, _ := range *a.Plugins { + sp[i] = (*a.Plugins)[i] + } + sap, err := strat.Select(a, sp) + if err != nil || sap == nil { + return nil, err + } + return sap.(*availablePlugin), err } diff --git a/control/control.go b/control/control.go index db900cc6a..ff67041cb 100644 --- a/control/control.go +++ b/control/control.go @@ -9,6 +9,7 @@ import ( "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" "github.com/intelsdilabs/pulse/core/control_event" @@ -45,7 +46,12 @@ type routesToPlugins interface { } type runsPlugins interface { + Start() error + Stop() []error AvailablePlugins() *availablePlugins + AddDelegates(delegates ...gomit.Delegator) + SetMetricCatalog(c catalogsMetrics) + SetPluginManager(m managesPlugins) } type managesPlugins interface { @@ -53,6 +59,7 @@ type managesPlugins interface { UnloadPlugin(CatalogedPlugin) error LoadedPlugins() *loadedPlugins SetMetricCatalog(catalogsMetrics) + GenerateArgs() plugin.Arg } type catalogsMetrics interface { @@ -64,7 +71,7 @@ type catalogsMetrics interface { Subscribe([]string, int) error Unsubscribe([]string, int) error Table() map[string][]*metricType - resolvePlugin([]string, int) (*loadedPlugin, error) + GetPlugin([]string, int) (*loadedPlugin, error) } // Returns a new pluginControl instance @@ -72,27 +79,36 @@ func New() *pluginControl { c := &pluginControl{} // Initialize components // - // 1. Metric Catalog + // Event Manager + c.eventManager = gomit.NewEventController() + + // Metric Catalog c.metricCatalog = newMetricCatalog() - // 2. Plugin Runner + // Plugin Manager + c.pluginManager = newPluginManager() + // Plugin Manager needs a reference to the metric catalog + c.pluginManager.SetMetricCatalog(c.metricCatalog) + + // Plugin Runner c.pluginRunner = newRunner() + c.pluginRunner.AddDelegates(c.eventManager) + c.pluginRunner.SetMetricCatalog(c.metricCatalog) + c.pluginRunner.SetPluginManager(c.pluginManager) - // 3. Plugin Router + // Plugin Router c.pluginRouter = newPluginRouter() c.pluginRouter.SetRunner(c.pluginRunner) c.pluginRouter.SetMetricCatalog(c.metricCatalog) - // 3. Plugin Manager - c.pluginManager = newPluginManager() - // Plugin Manager needs a reference to the metric catalog - c.pluginManager.SetMetricCatalog(c.metricCatalog) - - // Event Manager - c.eventManager = gomit.NewEventController() - // Wire event manager + // Start stuff + err := c.pluginRunner.Start() + if err != nil { + panic(err) + } + // c.loadRequestsChan = make(chan LoadedPlugin) // privatekey, err := rsa.GenerateKey(rand.Reader, 4096) @@ -201,26 +217,19 @@ func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.Config return nil, subErrs } - fmt.Println(m) - - // fmt.Println(m.Plugin.) - // if m.policy == nil { - // panic("NIL!") - // } - // ncdTable, errs := m.policy.Process(cd.Table()) - // if errs != nil && errs.HasErrors() { - // return nil, errs - // } - - // panic(1) - // ncd := cdata.FromTable(*ncdTable) - - // m.config = cdata.FromTable(*ncdTable) + if m.policy == nil { + m.policy = cpolicy.NewPolicyNode() + } + ncdTable, errs := m.policy.Process(cd.Table()) + if errs != nil && errs.HasErrors() { + return nil, errs.Errors() + } + m.config = cdata.FromTable(*ncdTable) m.Subscribe() - e := &control_event.MetricSubscriptionEvent{ MetricNamespace: m.Namespace(), + Version: m.Version(), } defer p.eventManager.Emit(e) diff --git a/control/metrics.go b/control/metrics.go index 6b61190c2..65abb1ce1 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -90,16 +90,27 @@ func newMetricCatalog() *metricCatalog { } func (m *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.MetricType) { + if lp.ConfigPolicyTree == nil { + panic("NO") + } + newMt := metricType{ Plugin: lp, namespace: mt.Namespace(), lastAdvertisedTime: mt.LastAdvertisedTime(), + // This caches the config policy node within the metric type + // Disabled until ctree + cpolicy is RPC compatible + // policy: lp.ConfigPolicyTree.Get(mt.Namespace()), } m.Add(&newMt) } // adds a metricType pointer to the loadedPlugins table func (mc *metricCatalog) Add(m *metricType) { + // if m.policy == nil { + // // Set an empty config policy tree if not provided + // m.policy = cpolicy.NewTree() + // } mc.mutex.Lock() defer mc.mutex.Unlock() @@ -198,7 +209,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error { return m.Unsubscribe() } -func (mc *metricCatalog) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { +func (mc *metricCatalog) GetPlugin(mns []string, ver int) (*loadedPlugin, error) { m, err := mc.Get(mns, ver) if err != nil { return nil, err diff --git a/control/monitor.go b/control/monitor.go index 8ff13d657..3f9a8d177 100644 --- a/control/monitor.go +++ b/control/monitor.go @@ -1,12 +1,15 @@ package control -import "time" +import ( + "time" +) const ( MonitorStopped monitorState = iota - 1 // default is stopped MonitorStarted - DefaultMonitorDuration = time.Second * 60 + // Changed to one second until we get proper control of duration runtime into this. + DefaultMonitorDuration = time.Second * 1 ) type monitorState int diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index f426f361b..21b8d01ed 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -1,8 +1,6 @@ package plugin -// Config Policy -// task > control > default - +// WARNING! Do not import "fmt" and print from a plugin to stdout! import ( "crypto/rsa" "time" diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 7795c8904..6220a701f 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -115,7 +115,7 @@ func TestArg(t *testing.T) { func TestPlugin(t *testing.T) { Convey("Start", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicyTree := new(cpolicy.ConfigPolicyTree) + mockConfigPolicyTree := cpolicy.NewTree() var mockPluginArgs string = "{\"PluginLogPath\": \"/var/tmp/pulse_plugin.log\"}" err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldBeNil) @@ -123,7 +123,7 @@ func TestPlugin(t *testing.T) { }) Convey("Start with invalid args", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicyTree := new(cpolicy.ConfigPolicyTree) + mockConfigPolicyTree := cpolicy.NewTree() var mockPluginArgs string = "" err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldNotBeNil) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index dfad0fde3..725670669 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -13,6 +13,7 @@ import ( "github.com/intelsdilabs/pulse/control/plugin" "github.com/intelsdilabs/pulse/control/plugin/client" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -132,12 +133,13 @@ func (l *loadedPlugins) Next() bool { // the struct representing a plugin that is loaded into Pulse type loadedPlugin struct { - Meta plugin.PluginMeta - Path string - Type plugin.PluginType - State pluginState - Token string - LoadedTime time.Time + Meta plugin.PluginMeta + Path string + Type plugin.PluginType + State pluginState + Token string + LoadedTime time.Time + ConfigPolicyTree *cpolicy.ConfigPolicyTree } // returns plugin name @@ -207,7 +209,7 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { lPlugin.Path = path lPlugin.State = DetectedState - ePlugin, err := plugin.NewExecutablePlugin(p.generateArgs(), lPlugin.Path) + ePlugin, err := plugin.NewExecutablePlugin(p.GenerateArgs(), lPlugin.Path) if err != nil { log.Println(err) @@ -227,6 +229,9 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { return nil, err } + // Add config policy tree to loaded plugin + lPlugin.ConfigPolicyTree = &resp.ConfigPolicyTree + if resp.Type == plugin.CollectorPluginType { ap, err := newAvailablePlugin(resp) if err != nil { @@ -318,6 +323,6 @@ func (p *pluginManager) UnloadPlugin(pl CatalogedPlugin) error { return nil } -func (p *pluginManager) generateArgs() plugin.Arg { - return plugin.NewArg(p.pubKey, "/tmp/pulse-plugin.log") +func (p *pluginManager) GenerateArgs() plugin.Arg { + return plugin.NewArg(p.pubKey, "/tmp/pulse-plugin-foo.log") } diff --git a/control/router.go b/control/router.go index 59f7f1341..7284ee44c 100644 --- a/control/router.go +++ b/control/router.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/intelsdilabs/pulse/control/plugin/client" "github.com/intelsdilabs/pulse/control/routing" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" @@ -16,11 +17,11 @@ type RouterResponse interface { } type RoutingStrategy interface { + Select(routing.SelectablePluginPool, []routing.SelectablePlugin) (routing.SelectablePlugin, error) } type pluginRouter struct { - Strategy RoutingStrategy - SelectionTimeout time.Duration + Strategy RoutingStrategy metricCatalog catalogsMetrics pluginRunner runsPlugins @@ -28,8 +29,7 @@ type pluginRouter struct { func newPluginRouter() *pluginRouter { return &pluginRouter{ - Strategy: &routing.RoundRobinStrategy{}, - SelectionTimeout: time.Second * 30, + Strategy: &routing.RoundRobinStrategy{}, } } @@ -44,33 +44,19 @@ func (p *pluginCallSelection) Count() int { // Calls collector plugins for the metric types and returns collection response containing metrics. Blocking method. func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse, err error) { - // For each MT sort into plugin types we need to call - fmt.Println(metricTypes) - - fmt.Println("\nMetric Catalog\n*****") - fmt.Println(p.metricCatalog) - for k, m := range p.metricCatalog.Table() { - fmt.Println(k, m) - } - fmt.Println("\n") - pluginCallSelectionMap := make(map[string]pluginCallSelection) // For each plugin type select a matching available plugin to call for _, m := range metricTypes { // This is set to choose the newest and not pin version. TODO, be sure version is set to -1 if not provided by user on Task creation. - lp, err := p.metricCatalog.resolvePlugin(m.Namespace(), -1) - - // fmt.Println("\nMetric Catalog\n*****") - // for k, m := range p.metricCatalog.Table() { - // fmt.Println(k, m) - // } + lp, err := p.metricCatalog.GetPlugin(m.Namespace(), -1) - // TODO handle error here. Single error fails entire operation. + // Single error fails entire operation. if err != nil { - // can't find a matching plugin, fail - TODO + return nil, err } + // Single error fails entire operation. if lp == nil { return nil, errors.New(fmt.Sprintf("Metric missing: %s", strings.Join(m.Namespace(), "/"))) } @@ -82,9 +68,9 @@ func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.Conf pluginCallSelectionMap[lp.Key()] = x } - // For each available plugin call available plugin using RPC client and wait for response (goroutines) - fmt.Println("") + // For each available plugin call available plugin using RPC client and wait for response (goroutines) + var selectedAP *availablePlugin for pluginKey, metrics := range pluginCallSelectionMap { fmt.Printf("plugin: (%s) has (%d) metrics to gather\n", pluginKey, metrics.Count()) @@ -98,32 +84,34 @@ func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.Conf // Lock this apPool so we are the only one operating on it. if apPluginPool.Count() == 0 { // return error indicating we have no available plugins to call for Collect + return nil, errors.New(fmt.Sprintf("no available plugins for plugin type (%s)", pluginKey)) } + // Use a router strategy to select an available plugin from the pool - // This blocks on selection of a non-busy node or timeout expiring - ap, err := apPluginPool.SelectUsingStrategy(p.Strategy, p.SelectionTimeout) - // ap, err := p.Strategy.Select(apPluginPool, p.SelectionTimeout) - // Call CollectMetrics on the client - // metrics, err := ap.Client.CollectMetrics(metricTypes, config) - fmt.Println(ap, err) + fmt.Printf("%d available plugin in pool for (%s)\n", apPluginPool.Count(), pluginKey) + ap, err := apPluginPool.SelectUsingStrategy(p.Strategy) + if err != nil { - // We had an error on collection return + return nil, err } - } - - // fmt.Println(p.pluginRunner.AvailablePlugins().Collectors.Table()["dummy:1"]) - - // Wait for all responses(happy) or timeout(unhappy) - - // (happy)reduce responses into single collection response and return - // (unhappy)return response with timeout state - - return &collectionResponse{}, nil -} + if ap == nil { + return nil, errors.New(fmt.Sprintf("no available plugin selected (%s)", pluginKey)) + } + selectedAP = ap + } -type collectionResponse struct { - Errors []error + resp := newCollectionResponse() + // Attempt collection on selected available plugin + selectedAP.hitCount++ + selectedAP.lastHitTime = time.Now() + metrics, err := selectedAP.Client.(client.PluginCollectorClient).CollectMetrics(metricTypes) + if err != nil { + resp.Errors = append(resp.Errors, err) + return resp, nil + } + resp.Metrics = metrics + return resp, nil } func (p *pluginRouter) SetRunner(r runsPlugins) { @@ -133,3 +121,15 @@ func (p *pluginRouter) SetRunner(r runsPlugins) { func (p *pluginRouter) SetMetricCatalog(m catalogsMetrics) { p.metricCatalog = m } + +type collectionResponse struct { + Metrics []core.Metric + Errors []error +} + +func newCollectionResponse() *collectionResponse { + return &collectionResponse{ + Metrics: make([]core.Metric, 0), + Errors: make([]error, 0), + } +} diff --git a/control/router_test.go b/control/router_test.go index 167ef1804..4882ed718 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/intelsdilabs/pulse/control/plugin" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" . "github.com/smartystreets/goconvey/convey" @@ -40,19 +41,16 @@ func (m MockMetricType) Config() *cdata.ConfigDataNode { func TestRouter(t *testing.T) { Convey("given a new router", t, func() { + // adjust HB timeouts for test + plugin.PingTimeoutLimit = 1 + plugin.PingTimeoutDuration = time.Second * 1 + // Create controller c := New() + c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() // Load plugin c.Load(PluginPath) - // fmt.Println("\nPlugin Catalog\n*****") - // for _, p := range c.PluginCatalog() { - // fmt.Printf("%s %d\n", p.Name(), p.Version()) - // } - - // Create router - // r := newPluginRouter() - // r.metricCatalog = c.metricCatalog m := []core.MetricType{} m1 := MockMetricType{namespace: []string{"intel", "dummy", "foo"}} @@ -68,13 +66,20 @@ func TestRouter(t *testing.T) { // Subscribe a, b := c.SubscribeMetricType(m1, cd) fmt.Println(a, b) + time.Sleep(time.Millisecond * 100) + c.SubscribeMetricType(m2, cd) + time.Sleep(time.Millisecond * 200) // Call collect on router - cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) - if err != nil { - fmt.Println(err) - } - fmt.Printf("\nresponse: %+v\n", cr) + for x := 0; x < 5; x++ { + fmt.Println("\n * Calling Collect") + cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) + if err != nil { + fmt.Println(err) + } + fmt.Printf(" * Collect Response: %+v\n", cr) + } + time.Sleep(time.Millisecond * 200) }) } diff --git a/control/routing/round_robin.go b/control/routing/round_robin.go new file mode 100644 index 000000000..afab934dd --- /dev/null +++ b/control/routing/round_robin.go @@ -0,0 +1,35 @@ +package routing + +import ( + "errors" + "fmt" + "math/rand" +) + +type RoundRobinStrategy struct { +} + +func (r *RoundRobinStrategy) Select(spp SelectablePluginPool, spa []SelectablePlugin) (SelectablePlugin, error) { + var h int = -1 + var index int = -1 + fmt.Printf("Using round robin selection on pool of %d plugins\n", len(spa)) + for i, sp := range spa { + // look for the lowest hit count + if sp.HitCount() < h || h == -1 { + index = i + h = sp.HitCount() + } + // on a hitcount tie we randomly choose one + if sp.HitCount() == h { + if rand.Intn(1) == 1 { + index = i + h = sp.HitCount() + } + } + } + if index > -1 { + fmt.Printf("Selecting plugin at index (%d) with hitcount of (%d)\n", index, spa[index].HitCount()) + return spa[index], nil + } + return nil, errors.New("could not select a plugin (round robin strategy)") +} diff --git a/control/routing/routing.go b/control/routing/routing.go new file mode 100644 index 000000000..cd42bc04c --- /dev/null +++ b/control/routing/routing.go @@ -0,0 +1,13 @@ +package routing + +import ( + "time" +) + +type SelectablePluginPool interface { +} + +type SelectablePlugin interface { + HitCount() int + LastHit() time.Time +} diff --git a/control/runner.go b/control/runner.go index 7471cb555..48a396801 100644 --- a/control/runner.go +++ b/control/runner.go @@ -2,11 +2,16 @@ package control import ( "errors" + "fmt" + "sync" "time" + "strings" + "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core/control_event" ) const ( @@ -16,6 +21,9 @@ const ( PluginRunning availablePluginState = iota - 1 // Default value (0) is Running PluginStopped PluginDisabled + + // Until more advanced decisioning on starting exists this is the max number to spawn. + MaximumRunningPlugins = 3 ) // TBD @@ -30,16 +38,28 @@ type runner struct { delegates []gomit.Delegator monitor *monitor availablePlugins *availablePlugins + metricCatalog catalogsMetrics + pluginManager managesPlugins + mutex *sync.Mutex } func newRunner() *runner { r := &runner{ monitor: newMonitor(), availablePlugins: newAvailablePlugins(), + mutex: &sync.Mutex{}, } return r } +func (r *runner) SetMetricCatalog(c catalogsMetrics) { + r.metricCatalog = c +} + +func (r *runner) SetPluginManager(m managesPlugins) { + r.pluginManager = m +} + func (r *runner) AvailablePlugins() *availablePlugins { return r.availablePlugins } @@ -144,5 +164,44 @@ func (r *runner) stopPlugin(reason string, ap *availablePlugin) error { // Empty handler acting as placeholder until implementation. This helps tests // pass to ensure registration works. func (r *runner) HandleGomitEvent(e gomit.Event) { - // to do + + switch v := e.Body.(type) { + case *control_event.MetricSubscriptionEvent: + fmt.Println("runner") + fmt.Println(v.Namespace()) + fmt.Printf("Metric subscription (%s v%d)\n", strings.Join(v.MetricNamespace, "/"), v.Version) + + // Our logic here is simple for alpha. We should replace with parameter managed logic. + // + // 1. Get the loaded plugin for the subscription. + // 2. Check that at least one available plugin of that type is running + // 3. If not start one + + mt, err := r.metricCatalog.Get(v.MetricNamespace, v.Version) + if err != nil { + // log this error # TODO with logging + fmt.Println(err) + } + fmt.Printf("Plugin is (%s)\n", mt.Plugin.Key()) + + pool := r.availablePlugins.Collectors.GetPluginPool(mt.Plugin.Key()) + if pool != nil && pool.Count() >= MaximumRunningPlugins { + // if r.availablePlugins.Collectors.PluginPoolHasAP(mt.Plugin.Key()) { + fmt.Println("We have at least one running!") + return + } + + fmt.Println("No APs running!") + ePlugin, err := plugin.NewExecutablePlugin(r.pluginManager.GenerateArgs(), mt.Plugin.Path) + if err != nil { + fmt.Println(err) + } + ap, err := r.startPlugin(ePlugin) + if err != nil { + fmt.Println(err) + panic(err) + } + fmt.Println("NEW AP") + fmt.Println(ap) + } } diff --git a/core/control_event/control_event.go b/core/control_event/control_event.go index 1bcd37058..5fa75ea94 100644 --- a/core/control_event/control_event.go +++ b/core/control_event/control_event.go @@ -1,9 +1,5 @@ package control_event -import ( - "github.com/intelsdilabs/pulse/control/plugin" -) - const ( PluginLoaded = "Control.PluginLoaded" PluginDisabled = "Control.PluginDisabled" @@ -16,21 +12,21 @@ const ( type LoadPluginEvent struct{} -func (e *LoadPluginEvent) Namespace() string { +func (e LoadPluginEvent) Namespace() string { return PluginLoaded } type UnloadPluginEvent struct { } -func (e *UnloadPluginEvent) Namespace() string { +func (e UnloadPluginEvent) Namespace() string { return PluginUnloaded } type DisabledPluginEvent struct { Name string Version int - Type plugin.PluginType + Type int Key string Index int } @@ -41,15 +37,16 @@ func (e *DisabledPluginEvent) Namespace() string { type SwapPluginsEvent struct{} -func (s *SwapPluginsEvent) Namespace() string { +func (s SwapPluginsEvent) Namespace() string { return PluginsSwapped } type MetricSubscriptionEvent struct { MetricNamespace []string + Version int } -func (se *MetricSubscriptionEvent) Namespace() string { +func (se MetricSubscriptionEvent) Namespace() string { return MetricSubscribed } @@ -57,16 +54,16 @@ type MetricUnsubscriptionEvent struct { MetricNamespace []string } -func (ue *MetricUnsubscriptionEvent) Namespace() string { +func (ue MetricUnsubscriptionEvent) Namespace() string { return MetricUnsubscribed } type HealthCheckFailedEvent struct { Name string Version int - Type plugin.PluginType + Type int } -func (hfe *HealthCheckFailedEvent) Namespace() string { +func (hfe HealthCheckFailedEvent) Namespace() string { return HealthCheckFailed } diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index 74cdd5764..15519560e 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -33,5 +33,9 @@ func Meta() *plugin.PluginMeta { func ConfigPolicyTree() *cpolicy.ConfigPolicyTree { c := cpolicy.NewTree() + rule, _ := cpolicy.NewStringRule("name", false, "bob") + p := cpolicy.NewPolicyNode() + p.Add(rule) + c.Add([]string{"intel", "dummy"}, p) return c } diff --git a/schedule/schedule.go b/schedule/schedule.go index 1f9aa143b..9d73af2b7 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -3,7 +3,6 @@ package schedule import ( "time" - "github.com/intelsdilabs/pulse/control" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" ) @@ -34,7 +33,7 @@ type ScheduleResponse interface { // ManagesMetric is implemented by control // On startup a scheduler will be created and passed a reference to control type ManagesMetric interface { - SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, control.SubscriptionError) + SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) UnsubscribeMetricType(mt core.MetricType) } @@ -75,7 +74,7 @@ func (scheduler *scheduler) CreateTask(mts []core.MetricType, s Schedule, cdt *c //mtc = append(mtc, mt) subscriptions = append(subscriptions, mt) } else { - te.errs = append(te.errs, err.Errors()...) + te.errs = append(te.errs, err...) } } diff --git a/schedule/schedule_test.go b/schedule/schedule_test.go index f7b184ddf..88d9187b2 100644 --- a/schedule/schedule_test.go +++ b/schedule/schedule_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/intelsdilabs/pulse/control" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" "github.com/intelsdilabs/pulse/core/ctypes" @@ -19,16 +18,14 @@ type MockMetricManager struct { failuredSoFar int } -func (m *MockMetricManager) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, control.SubscriptionError) { +func (m *MockMetricManager) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) { if m.failValidatingMetrics { if m.failValidatingMetricsAfter > m.failuredSoFar { m.failuredSoFar++ return nil, nil } - return nil, &MockMetricManagerError{ - errs: []error{ - errors.New("metric validation error"), - }, + return nil, []error{ + errors.New("metric validation error"), } } return nil, nil @@ -42,10 +39,6 @@ type MockMetricManagerError struct { errs []error } -func (m *MockMetricManagerError) Errors() []error { - return m.errs -} - type MockMetricType struct { version int namespace []string From 4f408423548ffb42f503f8a02913dc23cc878939 Mon Sep 17 00:00:00 2001 From: Nicholas Weaver Date: Thu, 12 Mar 2015 18:17:15 -0700 Subject: [PATCH 14/15] Test fixes --- control/control_test.go | 12 ++++++++++-- control/metrics.go | 3 +++ control/monitor_test.go | 4 ++-- control/router_test.go | 8 -------- control/runner.go | 1 + core/metric.go | 3 ++- 6 files changed, 18 insertions(+), 13 deletions(-) diff --git a/control/control_test.go b/control/control_test.go index 7128613ee..a617a3e95 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -47,6 +47,10 @@ func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) { } +func (m *MockPluginManagerBadSwap) GenerateArgs() plugin.Arg { + return plugin.Arg{} +} + func TestControlNew(t *testing.T) { } @@ -274,6 +278,10 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { return nil, nil } +func (m *mc) GetPlugin([]string, int) (*loadedPlugin, error) { + return nil, nil +} + func (m *mc) Get(ns []string, ver int) (*metricType, error) { if m.e == 1 { return &metricType{ @@ -342,8 +350,8 @@ func TestSubscribeMetric(t *testing.T) { mtrc.e = 0 mt := newMetricType([]string{"nf"}, time.Now(), lp) _, err := c.SubscribeMetricType(mt, cd) - So(len(err.Errors()), ShouldEqual, 1) - So(err.Errors()[0], ShouldResemble, errMetricNotFound) + So(len(err), ShouldEqual, 1) + So(err[0], ShouldResemble, errMetricNotFound) }) // Refactoring (nweaver) // Convey("returns errors when processing fails", t, func() { diff --git a/control/metrics.go b/control/metrics.go index 65abb1ce1..eca69f73f 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -64,6 +64,9 @@ func (m *metricType) SubscriptionCount() int { } func (m *metricType) Version() int { + if m.Plugin == nil { + return -1 + } return m.Plugin.Version() } diff --git a/control/monitor_test.go b/control/monitor_test.go index cae02b3f4..e8b33115f 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -74,7 +74,7 @@ func TestMonitor(t *testing.T) { for aps.Collectors.Next() { _, item := aps.Collectors.Item() So(item, ShouldNotBeNil) - So((*item)[0].failedHealthChecks, ShouldBeGreaterThan, 3) + So((*(*item).Plugins)[0].failedHealthChecks, ShouldBeGreaterThan, 3) } }) }) @@ -90,7 +90,7 @@ func TestMonitor(t *testing.T) { oldOpt := m.Option(MonitorDuration(time.Millisecond * 200)) So(m.duration, ShouldResemble, time.Millisecond*200) m.Option(oldOpt) - So(m.duration, ShouldResemble, time.Second*60) + So(m.duration, ShouldResemble, time.Second*1) }) }) } diff --git a/control/router_test.go b/control/router_test.go index 4882ed718..202c62b53 100644 --- a/control/router_test.go +++ b/control/router_test.go @@ -2,8 +2,6 @@ package control import ( "fmt" - "os" - "path" "testing" "time" @@ -13,12 +11,6 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -var ( - PluginName = "pulse-collector-dummy" - PulsePath = os.Getenv("PULSE_PATH") - PluginPath = path.Join(PulsePath, "plugin", "collector", PluginName) -) - type MockMetricType struct { namespace []string } diff --git a/control/runner.go b/control/runner.go index 48a396801..332f5b085 100644 --- a/control/runner.go +++ b/control/runner.go @@ -181,6 +181,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { if err != nil { // log this error # TODO with logging fmt.Println(err) + return } fmt.Printf("Plugin is (%s)\n", mt.Plugin.Key()) diff --git a/core/metric.go b/core/metric.go index d3275a4cc..a48ced1ad 100644 --- a/core/metric.go +++ b/core/metric.go @@ -1,8 +1,9 @@ package core import ( - "github.com/intelsdilabs/pulse/core/cdata" "time" + + "github.com/intelsdilabs/pulse/core/cdata" ) type MetricType interface { From fe41e57ff2fe9232362e827a7e78c9cf77fb6208 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Mon, 16 Mar 2015 09:25:35 -0700 Subject: [PATCH 15/15] Added gob encoding for config and policy trees --- control/plugin/cpolicy/node.go | 17 +++++++++ control/plugin/cpolicy/string.go | 40 +++++++++++++++++++++ control/plugin/cpolicy/tree.go | 19 ++++++++++ control/plugin/cpolicy/tree_test.go | 21 +++++++++++ core/cdata/node.go | 18 ++++++++++ core/cdata/tree.go | 18 ++++++++++ core/cdata/tree_test.go | 31 ++++++++++++++++ pkg/ctree/tree.go | 56 +++++++++++++++++++++++++++++ pkg/ctree/tree_test.go | 38 ++++++++++++++++++++ 9 files changed, 258 insertions(+) diff --git a/control/plugin/cpolicy/node.go b/control/plugin/cpolicy/node.go index 27eb1913a..fa714dffb 100644 --- a/control/plugin/cpolicy/node.go +++ b/control/plugin/cpolicy/node.go @@ -1,6 +1,8 @@ package cpolicy import ( + "bytes" + "encoding/gob" "errors" "fmt" "sync" @@ -49,6 +51,21 @@ func NewPolicyNode() *ConfigPolicyNode { } } +func (c *ConfigPolicyNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(&c.rules); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigPolicyNode) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.rules) +} + // Adds a rule to this policy node func (p *ConfigPolicyNode) Add(rules ...Rule) { p.mutex.Lock() diff --git a/control/plugin/cpolicy/string.go b/control/plugin/cpolicy/string.go index 5dee799e8..824100d18 100644 --- a/control/plugin/cpolicy/string.go +++ b/control/plugin/cpolicy/string.go @@ -1,6 +1,9 @@ package cpolicy import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/core/ctypes" ) @@ -30,6 +33,43 @@ func NewStringRule(key string, req bool, opts ...string) (*stringRule, error) { }, nil } +func (s *stringRule) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(s.key); err != nil { + return nil, err + } + if err := encoder.Encode(s.required); err != nil { + return nil, err + } + if s.default_ == nil { + encoder.Encode(false) + } else { + encoder.Encode(true) + if err := encoder.Encode(&s.default_); err != nil { + return nil, err + } + } + return w.Bytes(), nil +} + +func (s *stringRule) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&s.key); err != nil { + return err + } + if err := decoder.Decode(&s.required); err != nil { + return err + } + var is_default_set bool + decoder.Decode(&is_default_set) + if is_default_set { + return decoder.Decode(&s.default_) + } + return nil +} + // Returns the key func (s *stringRule) Key() string { return s.key diff --git a/control/plugin/cpolicy/tree.go b/control/plugin/cpolicy/tree.go index d57543b1b..b8ffd1716 100644 --- a/control/plugin/cpolicy/tree.go +++ b/control/plugin/cpolicy/tree.go @@ -1,6 +1,9 @@ package cpolicy import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/pkg/ctree" ) @@ -17,6 +20,22 @@ func NewTree() *ConfigPolicyTree { } } +func (c *ConfigPolicyTree) GobEncode() ([]byte, error) { + //todo throw an error if not frozen + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.cTree); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigPolicyTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.cTree) +} + // Adds a ConfigDataNode at the provided namespace. func (c *ConfigPolicyTree) Add(ns []string, cpn *ConfigPolicyNode) { c.cTree.Add(ns, cpn) diff --git a/control/plugin/cpolicy/tree_test.go b/control/plugin/cpolicy/tree_test.go index 6492a93f4..e86edeec6 100644 --- a/control/plugin/cpolicy/tree_test.go +++ b/control/plugin/cpolicy/tree_test.go @@ -1,6 +1,7 @@ package cpolicy import ( + "encoding/gob" "testing" "github.com/intelsdilabs/pulse/core/ctypes" @@ -30,6 +31,26 @@ func TestConfigPolicyTree(t *testing.T) { So(gc.rules["username"].Default().(*ctypes.ConfigValueStr).Value, ShouldEqual, "root") So(gc.rules["password"].Required(), ShouldEqual, true) }) + Convey("encode & decode", func() { + gob.Register(NewPolicyNode()) + gob.Register(&stringRule{}) + buf, err := t.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + t2 := NewTree() + err = t2.GobDecode(buf) + So(err, ShouldBeNil) + So(t2.cTree, ShouldNotBeNil) + gc := t2.Get([]string{"one", "two", "potato"}) + So(gc, ShouldNotBeNil) + So(gc.rules["username"], ShouldNotBeNil) + So(gc.rules["username"].Required(), ShouldEqual, false) + So(gc.rules["password"].Required(), ShouldEqual, true) + So(gc.rules["username"].Default(), ShouldNotBeNil) + So(gc.rules["password"].Default(), ShouldBeNil) + So(gc.rules["username"].Default().(*ctypes.ConfigValueStr).Value, ShouldEqual, "root") + println(len(gc.rules)) + }) }) diff --git a/core/cdata/node.go b/core/cdata/node.go index 2d1d9aadb..b9d80bcb7 100644 --- a/core/cdata/node.go +++ b/core/cdata/node.go @@ -1,6 +1,8 @@ package cdata import ( + "bytes" + "encoding/gob" "sync" "github.com/intelsdilabs/pulse/core/ctypes" @@ -13,6 +15,22 @@ type ConfigDataNode struct { table map[string]ctypes.ConfigValue } +func (c *ConfigDataNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(&c.table); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigDataNode) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + c.mutex = new(sync.Mutex) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.table) +} + // Returns a new and empty node. func NewNode() *ConfigDataNode { return &ConfigDataNode{ diff --git a/core/cdata/tree.go b/core/cdata/tree.go index 71d6b357c..74278f897 100644 --- a/core/cdata/tree.go +++ b/core/cdata/tree.go @@ -1,6 +1,9 @@ package cdata import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/pkg/ctree" ) @@ -17,6 +20,21 @@ func NewTree() *ConfigDataTree { } } +func (c *ConfigDataTree) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.cTree); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigDataTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.cTree) +} + // Adds a ConfigDataNode at the provided namespace. func (c *ConfigDataTree) Add(ns []string, cdn *ConfigDataNode) { c.cTree.Add(ns, cdn) diff --git a/core/cdata/tree_test.go b/core/cdata/tree_test.go index f80f9e4d0..57d1e63b4 100644 --- a/core/cdata/tree_test.go +++ b/core/cdata/tree_test.go @@ -1,6 +1,8 @@ package cdata import ( + "encoding/gob" + "fmt" "testing" "github.com/intelsdilabs/pulse/core/ctypes" @@ -70,6 +72,35 @@ func TestConfigDataTree(t *testing.T) { So(t["i"].(ctypes.ConfigValueInt).Value, ShouldEqual, 1) So(t["f"].Type(), ShouldEqual, "float") So(t["f"].(ctypes.ConfigValueFloat).Value, ShouldEqual, 2.3) + + Convey("encode & decode", func() { + gob.Register(&ConfigDataNode{}) + gob.Register(ctypes.ConfigValueStr{}) + gob.Register(ctypes.ConfigValueInt{}) + gob.Register(ctypes.ConfigValueFloat{}) + buf, err := cdt.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + cdt2 := NewTree() + err = cdt2.GobDecode(buf) + So(err, ShouldBeNil) + So(cdt2.cTree, ShouldNotBeNil) + + a2 := cdt2.Get([]string{"1", "2"}) + So(a2, ShouldNotBeNil) + + t2 := a2.Table() + So(t2["s"].Type(), ShouldEqual, "string") + println(fmt.Sprintf("!!! %T\n %T\n", t2["s"], t["s"])) + So(t2["s"].(ctypes.ConfigValueStr).Value, ShouldEqual, "bar") + So(t2["x"].Type(), ShouldEqual, "string") + So(t2["x"].(ctypes.ConfigValueStr).Value, ShouldEqual, "wat") + So(t2["i"].Type(), ShouldEqual, "integer") + So(t2["i"].(ctypes.ConfigValueInt).Value, ShouldEqual, 1) + So(t2["f"].Type(), ShouldEqual, "float") + So(t2["f"].(ctypes.ConfigValueFloat).Value, ShouldEqual, 2.3) + + }) }) }) diff --git a/pkg/ctree/tree.go b/pkg/ctree/tree.go index 595e1c3d7..536ccfa3c 100644 --- a/pkg/ctree/tree.go +++ b/pkg/ctree/tree.go @@ -2,6 +2,7 @@ package ctree import ( "bytes" + "encoding/gob" "fmt" "log" "sync" @@ -28,6 +29,28 @@ func (c *ConfigTree) log(s string) { } } +func (c *ConfigTree) GobEncode() ([]byte, error) { + //todo throw an error if not frozen + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.root); err != nil { + return nil, err + } + if err := encoder.Encode(c.freezeFlag); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&c.root); err != nil { + return err + } + return decoder.Decode(&c.freezeFlag) +} + func (c *ConfigTree) Add(ns []string, inNode Node) { c.log(fmt.Sprintf("Adding %v at %s\n", inNode, ns)) c.mutex.Lock() @@ -142,6 +165,39 @@ type node struct { Node Node } +func (n *node) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(n.nodes); err != nil { + return nil, err + } + if err := encoder.Encode(n.keys); err != nil { + return nil, err + } + if err := encoder.Encode(n.keysBytes); err != nil { + return nil, err + } + if err := encoder.Encode(&n.Node); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (n *node) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&n.nodes); err != nil { + return err + } + if err := decoder.Decode(&n.keys); err != nil { + return err + } + if err := decoder.Decode(&n.keysBytes); err != nil { + return err + } + return decoder.Decode(&n.Node) +} + func (n *node) setKeys(k []string) { n.keys = k n.keysBytes = nsToByteArray(n.keys) diff --git a/pkg/ctree/tree_test.go b/pkg/ctree/tree_test.go index 6d00d471e..2229a13a7 100644 --- a/pkg/ctree/tree_test.go +++ b/pkg/ctree/tree_test.go @@ -1,6 +1,8 @@ package ctree import ( + "bytes" + "encoding/gob" "fmt" "testing" @@ -16,6 +18,27 @@ func (d dummyNode) Merge(dn Node) Node { return d } +func (n *dummyNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + + if err := encoder.Encode(n.data); err != nil { + return nil, err + } + + return w.Bytes(), nil +} + +func (n *dummyNode) GobDecode(buf []byte) error { + if len(buf) == 0 { + //there is nothing to do + return nil + } + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&n.data) +} + func newDummyNode() *dummyNode { return new(dummyNode) } @@ -55,6 +78,21 @@ func TestConfigTree(t *testing.T) { g := c.Get([]string{"intel", "foo", "sdilabs", "joel", "dan", "nick", "justin", "sarah"}) So(g, ShouldNotBeNil) So(g.(dummyNode).data, ShouldResemble, "b/a") + + Convey("GobEncode/GobDecode", func() { + gob.Register(&dummyNode{}) + buf, err := c.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + c2 := New() + err = c2.GobDecode(buf) + So(err, ShouldBeNil) + So(c2.root, ShouldNotBeNil) + So(c2.root.keys, ShouldNotBeEmpty) + g2 := c2.Get([]string{"intel", "foo", "sdilabs", "joel", "dan", "nick", "justin", "sarah"}) + So(g2, ShouldNotBeNil) + So(g2.(dummyNode).data, ShouldResemble, "b/a") + }) }) Convey("single item ns", func() {