diff --git a/control/available_plugin.go b/control/available_plugin.go index a084aa4ce..02a5fb627 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -10,6 +10,7 @@ import ( "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" "github.com/intelsdilabs/pulse/control/plugin/client" + "github.com/intelsdilabs/pulse/control/routing" "github.com/intelsdilabs/pulse/core/control_event" ) @@ -30,6 +31,8 @@ type availablePlugin struct { Client client.PluginClient Index int + hitCount int + lastHitTime time.Time eventManager *gomit.EventController failedHealthChecks int healthChan chan error @@ -45,12 +48,13 @@ func newAvailablePlugin(resp *plugin.Response) (*availablePlugin, error) { eventManager: new(gomit.EventController), healthChan: make(chan error, 1), + lastHitTime: time.Now(), } // Create RPC Client switch resp.Type { case plugin.CollectorPluginType: - c, e := client.NewCollectorClient(resp.ListenAddress, DefaultClientTimeout) + c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout) ap.Client = c if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) @@ -94,7 +98,7 @@ func (ap *availablePlugin) healthCheckFailed() { pde := &control_event.DisabledPluginEvent{ Name: ap.Name, Version: ap.Version, - Type: ap.Type, + Type: int(ap.Type), Key: ap.Key, Index: ap.Index, } @@ -103,11 +107,19 @@ func (ap *availablePlugin) healthCheckFailed() { hcfe := &control_event.HealthCheckFailedEvent{ Name: ap.Name, Version: ap.Version, - Type: ap.Type, + Type: int(ap.Type), } defer ap.eventManager.Emit(hcfe) } +func (a *availablePlugin) HitCount() int { + return a.hitCount +} + +func (a *availablePlugin) LastHit() time.Time { + return a.lastHitTime +} + // makeKey creates the ap.Key from the ap.Name and ap.Version func (ap *availablePlugin) makeKey() { s := []string{ap.Name, strconv.Itoa(ap.Version)} @@ -116,7 +128,7 @@ func (ap *availablePlugin) makeKey() { // apCollection is a collection of availablePlugin type apCollection struct { - table *map[string]*[]*availablePlugin + table *map[string]*availablePluginPool mutex *sync.Mutex keys *[]string currentIter int @@ -124,7 +136,7 @@ type apCollection struct { // newAPCollection returns an apCollection capable of storing availblePlugin func newAPCollection() *apCollection { - m := make(map[string]*[]*availablePlugin) + m := make(map[string]*availablePluginPool) var k []string return &apCollection{ table: &m, @@ -134,6 +146,24 @@ func newAPCollection() *apCollection { } } +func (c *apCollection) GetPluginPool(key string) *availablePluginPool { + c.Lock() + defer c.Unlock() + + if ap, ok := (*c.table)[key]; ok { + return ap + } + return nil +} + +func (c *apCollection) PluginPoolHasAP(key string) bool { + a := c.GetPluginPool(key) + if a != nil && a.Count() > 0 { + return true + } + return false +} + // Table returns a copy of the apCollection table func (c *apCollection) Table() map[string][]*availablePlugin { c.Lock() @@ -141,7 +171,7 @@ func (c *apCollection) Table() map[string][]*availablePlugin { m := make(map[string][]*availablePlugin) for k, v := range *c.table { - m[k] = *v + m[k] = *v.Plugins } return m } @@ -157,23 +187,14 @@ func (c *apCollection) Add(ap *availablePlugin) error { if (*c.table)[ap.Key] != nil { // make sure we don't already have a pointer to this plugin in the table - if exist, i := c.exists(ap); exist { + if exist, i := c.Exists(ap); exist { return errors.New("plugin instance already available at index " + strconv.Itoa(i)) } } else { - var col []*availablePlugin - (*c.table)[ap.Key] = &col + (*c.table)[ap.Key] = newAvailablePluginPool() } - // tell ap its index in the table - ap.Index = len(*(*c.table)[ap.Key]) - - // append - newCollection := append(*(*c.table)[ap.Key], ap) - - // overwrite - (*c.table)[ap.Key] = &newCollection - + (*c.table)[ap.Key].Add(ap) return nil } @@ -181,15 +202,12 @@ func (c *apCollection) Add(ap *availablePlugin) error { func (c *apCollection) Remove(ap *availablePlugin) error { c.Lock() defer c.Unlock() - if exists, _ := c.exists(ap); !exists { + + if exists, _ := c.Exists(ap); !exists { return errors.New("Warning: plugin does not exist in table") } - splicedColl := append((*(*c.table)[ap.Key])[:ap.Index], (*(*c.table)[ap.Key])[ap.Index+1:]...) - (*c.table)[ap.Key] = &splicedColl - //reset indexes - for i, ap := range *(*c.table)[ap.Key] { - ap.Index = i - } + + (*c.table)[ap.Key].Remove(ap) return nil } @@ -204,7 +222,7 @@ func (c *apCollection) Unlock() { } // Item returns the item at current position in the apCollection table -func (c *apCollection) Item() (string, *[]*availablePlugin) { +func (c *apCollection) Item() (string, *availablePluginPool) { key := (*c.keys)[c.currentIter-1] return key, (*c.table)[key] } @@ -221,13 +239,8 @@ func (c *apCollection) Next() bool { // exists checks the table to see if a pointer for the availablePlugin specified // already exists -func (c *apCollection) exists(ap *availablePlugin) (bool, int) { - for i, _ap := range *(*c.table)[ap.Key] { - if ap == _ap { - return true, i - } - } - return false, -1 +func (c *apCollection) Exists(ap *availablePlugin) (bool, int) { + return (*c.table)[ap.Key].Exists(ap) } // availablePlugins is a collection of availablePlugins by type @@ -277,3 +290,81 @@ func (a *availablePlugins) Remove(ap *availablePlugin) error { return errors.New("cannot remove from available plugins, unknown plugin type") } } + +type availablePluginPool struct { + Plugins *[]*availablePlugin + + mutex *sync.Mutex +} + +func newAvailablePluginPool() *availablePluginPool { + app := make([]*availablePlugin, 0) + return &availablePluginPool{ + Plugins: &app, + mutex: &sync.Mutex{}, + } +} + +func (a *availablePluginPool) Lock() { + a.mutex.Lock() +} + +func (a *availablePluginPool) Unlock() { + a.mutex.Unlock() +} + +func (a *availablePluginPool) Count() int { + return len((*a.Plugins)) +} + +func (a *availablePluginPool) Add(ap *availablePlugin) { + a.mutex.Lock() + defer a.mutex.Unlock() + // tell ap its index in the table + ap.Index = len((*a.Plugins)) + // append + newCollection := append((*a.Plugins), ap) + // overwrite + a.Plugins = &newCollection +} + +func (a *availablePluginPool) Remove(ap *availablePlugin) { + a.Lock() + defer a.Unlock() + // Place nil here to allow GC per : https://github.com/golang/go/wiki/SliceTricks + (*a.Plugins)[ap.Index] = nil + splicedColl := append((*a.Plugins)[:ap.Index], (*a.Plugins)[ap.Index+1:]...) + a.Plugins = &splicedColl + //reset indexes + a.resetIndexes() +} + +func (a *availablePluginPool) Exists(ap *availablePlugin) (bool, int) { + for i, _ap := range *a.Plugins { + if ap == _ap { + return true, i + } + } + return false, -1 +} + +func (a *availablePluginPool) resetIndexes() { + for i, ap := range *a.Plugins { + ap.Index = i + } +} + +func (a *availablePluginPool) SelectUsingStrategy(strat RoutingStrategy) (*availablePlugin, error) { + a.Lock() + defer a.Unlock() + + sp := make([]routing.SelectablePlugin, len(*a.Plugins)) + for i, _ := range *a.Plugins { + sp[i] = (*a.Plugins)[i] + } + sap, err := strat.Select(a, sp) + if err != nil || sap == nil { + return nil, err + } + return sap.(*availablePlugin), err +} diff --git a/control/control.go b/control/control.go index c30c05c1f..c7ddccbb1 100644 --- a/control/control.go +++ b/control/control.go @@ -3,10 +3,13 @@ package control import ( "crypto/rsa" "errors" + "fmt" + "time" "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" "github.com/intelsdilabs/pulse/core/control_event" @@ -29,46 +32,81 @@ type pluginControl struct { controlPrivKey *rsa.PrivateKey controlPubKey *rsa.PublicKey eventManager *gomit.EventController - pluginManager managesPlugins - metricCatalog catalogsMetrics + + pluginManager managesPlugins + metricCatalog catalogsMetrics + pluginRunner runsPlugins + pluginRouter routesToPlugins +} + +type routesToPlugins interface { + Collect([]core.MetricType, *cdata.ConfigDataNode, time.Time) (*collectionResponse, error) + SetRunner(runsPlugins) + SetMetricCatalog(catalogsMetrics) +} + +type runsPlugins interface { + Start() error + Stop() []error + AvailablePlugins() *availablePlugins + AddDelegates(delegates ...gomit.Delegator) + SetMetricCatalog(c catalogsMetrics) + SetPluginManager(m managesPlugins) } type managesPlugins interface { LoadPlugin(string) (*loadedPlugin, error) UnloadPlugin(CatalogedPlugin) error LoadedPlugins() *loadedPlugins + SetMetricCatalog(catalogsMetrics) + GenerateArgs() plugin.Arg } type catalogsMetrics interface { Get([]string, int) (*metricType, error) Add(*metricType) + AddLoadedMetricType(*loadedPlugin, core.MetricType) Item() (string, []*metricType) Next() bool Subscribe([]string, int) error Unsubscribe([]string, int) error -} - -// an interface used to polymorph the return from -// SubscribeMetric. Processing config data returns -// a type which holds a collection of errors. -type SubscriptionError interface { - Errors() []error -} - -type subGetError struct { - errs []error -} - -func (s *subGetError) Errors() []error { - return s.errs + Table() map[string][]*metricType + GetPlugin([]string, int) (*loadedPlugin, error) } // Returns a new pluginControl instance func New() *pluginControl { - c := &pluginControl{ - eventManager: gomit.NewEventController(), - pluginManager: newPluginManager(), - metricCatalog: newMetricCatalog(), + c := &pluginControl{} + // Initialize components + // + // Event Manager + c.eventManager = gomit.NewEventController() + + // Metric Catalog + c.metricCatalog = newMetricCatalog() + + // Plugin Manager + c.pluginManager = newPluginManager() + // Plugin Manager needs a reference to the metric catalog + c.pluginManager.SetMetricCatalog(c.metricCatalog) + + // Plugin Runner + c.pluginRunner = newRunner() + c.pluginRunner.AddDelegates(c.eventManager) + c.pluginRunner.SetMetricCatalog(c.metricCatalog) + c.pluginRunner.SetPluginManager(c.pluginManager) + + // Plugin Router + c.pluginRouter = newPluginRouter() + c.pluginRouter.SetRunner(c.pluginRunner) + c.pluginRouter.SetMetricCatalog(c.metricCatalog) + + // Wire event manager + + // Start stuff + err := c.pluginRunner.Start() + if err != nil { + panic(err) } // c.loadRequestsChan = make(chan LoadedPlugin) @@ -165,24 +203,34 @@ func (p *pluginControl) generateArgs() plugin.Arg { // SubscribeMetricType validates the given config data, and if valid // returns a MetricType with a config. On error a collection of errors is returned // either from config data processing, or the inability to find the metric. -func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, SubscriptionError) { +func (p *pluginControl) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) { + subErrs := make([]error, 0) m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) if err != nil { - return nil, &subGetError{errs: []error{err}} + subErrs = append(subErrs, err) + return nil, subErrs + } + + // No metric found return error. + if m == nil { + subErrs = append(subErrs, errors.New(fmt.Sprintf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version()))) + return nil, subErrs } + if m.policy == nil { + m.policy = cpolicy.NewPolicyNode() + } ncdTable, errs := m.policy.Process(cd.Table()) if errs != nil && errs.HasErrors() { - return nil, errs + return nil, errs.Errors() } - m.config = cdata.FromTable(*ncdTable) m.Subscribe() - e := &control_event.MetricSubscriptionEvent{ MetricNamespace: m.Namespace(), + Version: m.Version(), } defer p.eventManager.Emit(e) @@ -203,14 +251,6 @@ func (p *pluginControl) UnsubscribeMetricType(mt core.MetricType) { p.eventManager.Emit(e) } -func (p *pluginControl) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { - m, err := p.metricCatalog.Get(mns, ver) - if err != nil { - return nil, err - } - return m.Plugin, nil -} - // the public interface for a plugin // this should be the contract for // how mgmt modules know a plugin diff --git a/control/control_test.go b/control/control_test.go index df943e3a4..f8d133dee 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -8,8 +8,9 @@ import ( "time" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" + "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" - "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" . "github.com/smartystreets/goconvey/convey" @@ -42,6 +43,14 @@ func (m *MockPluginManagerBadSwap) LoadedPlugins() *loadedPlugins { return nil } +func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) { + +} + +func (m *MockPluginManagerBadSwap) GenerateArgs() plugin.Arg { + return plugin.Arg{} +} + func TestControlNew(t *testing.T) { } @@ -266,6 +275,14 @@ type mc struct { e int } +func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { + return nil, nil +} + +func (m *mc) GetPlugin([]string, int) (*loadedPlugin, error) { + return nil, nil +} + func (m *mc) Get(ns []string, ver int) (*metricType, error) { if m.e == 1 { return &metricType{ @@ -301,6 +318,10 @@ func (m *mc) Next() bool { return false } +func (m *mc) AddLoadedMetricType(*loadedPlugin, core.MetricType) { + +} + type mockCDProc struct { } @@ -322,26 +343,27 @@ func TestSubscribeMetric(t *testing.T) { Convey("does not return errors when metricCatalog.Subscribe() does not return an error", t, func() { cd.AddItem("key", &ctypes.ConfigValueStr{Value: "value"}) mtrc.e = 1 - mt := newMetricType([]string{""}, -1, lp) + mt := newMetricType([]string{""}, time.Now(), lp) _, err := c.SubscribeMetricType(mt, cd) So(err, ShouldBeNil) }) Convey("returns errors when metricCatalog.Subscribe() returns an error", t, func() { mtrc.e = 0 - mt := newMetricType([]string{"nf"}, -1, lp) + mt := newMetricType([]string{"nf"}, time.Now(), lp) _, err := c.SubscribeMetricType(mt, cd) - So(len(err.Errors()), ShouldEqual, 1) - So(err.Errors()[0], ShouldResemble, errMetricNotFound) - }) - Convey("returns errors when processing fails", t, func() { - cd := cdata.NewNode() - cd.AddItem("fail", &ctypes.ConfigValueStr{Value: "value"}) - mtrc.e = 1 - mt := newMetricType([]string{""}, -1, lp) - _, errs := c.SubscribeMetricType(mt, cd) - So(len(errs.Errors()), ShouldEqual, 1) - So(errs.Errors()[0], ShouldResemble, errors.New("test fail")) + So(len(err), ShouldEqual, 1) + So(err[0], ShouldResemble, errMetricNotFound) }) + // Refactoring (nweaver) + // Convey("returns errors when processing fails", t, func() { + // cd := cdata.NewNode() + // cd.AddItem("fail", &ctypes.ConfigValueStr{Value: "value"}) + // mtrc.e = 1 + // mt := newMetricType([]string{""}, time.Now(), lp) + // _, errs := c.SubscribeMetricType(mt, cd) + // So(len(errs.Errors()), ShouldEqual, 1) + // So(errs.Errors()[0], ShouldResemble, errors.New("test fail")) + // }) } @@ -351,44 +373,45 @@ func TestUnsubscribeMetric(t *testing.T) { lp := new(loadedPlugin) Convey("When an error is returned", t, func() { Convey("it panics", func() { - mt := newMetricType([]string{"nf"}, -1, lp) + mt := newMetricType([]string{"nf"}, time.Now(), lp) So(func() { c.UnsubscribeMetricType(mt) }, ShouldPanic) - mt = newMetricType([]string{"nf"}, -1, lp) + mt = newMetricType([]string{"nf"}, time.Now(), lp) So(func() { c.UnsubscribeMetricType(mt) }, ShouldPanic) }) }) Convey("When no error is returned", t, func() { Convey("it doesn't panic", func() { - mt := newMetricType([]string{"hello"}, -1, lp) + mt := newMetricType([]string{"hello"}, time.Now(), lp) So(func() { c.UnsubscribeMetricType(mt) }, ShouldNotPanic) }) }) } -func TestResolvePlugin(t *testing.T) { - Convey(".resolvePlugin()", t, func() { - c := New() - lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), lp) - c.metricCatalog.Add(mt) - Convey("it resolves the plugin", func() { - p, err := c.resolvePlugin([]string{"foo", "bar"}, -1) - So(err, ShouldBeNil) - So(p, ShouldEqual, lp) - }) - Convey("it returns an error if the metricType cannot be found", func() { - p, err := c.resolvePlugin([]string{"baz", "qux"}, -1) - So(p, ShouldBeNil) - So(err, ShouldResemble, errors.New("metric not found")) - }) - }) -} +// TODO move to metricCatalog +// func TestResolvePlugin(t *testing.T) { +// Convey(".resolvePlugin()", t, func() { +// c := New() +// lp := &loadedPlugin{} +// mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) +// c.metricCatalog.Add(mt) +// Convey("it resolves the plugin", func() { +// p, err := c.resolvePlugin([]string{"foo", "bar"}, -1) +// So(err, ShouldBeNil) +// So(p, ShouldEqual, lp) +// }) +// Convey("it returns an error if the metricType cannot be found", func() { +// p, err := c.resolvePlugin([]string{"baz", "qux"}, -1) +// So(p, ShouldBeNil) +// So(err, ShouldResemble, errors.New("metric not found")) +// }) +// }) +// } func TestExportedMetricCatalog(t *testing.T) { Convey(".MetricCatalog()", t, func() { c := New() lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), lp) + mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) c.metricCatalog.Add(mt) Convey("it returns a collection of core.MetricTypes", func() { t := c.MetricCatalog() diff --git a/control/metrics.go b/control/metrics.go index 686f5d366..eca69f73f 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -4,9 +4,11 @@ import ( "errors" "strings" "sync" + "time" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" + "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" - "github.com/intelsdilabs/pulse/core/cpolicy" "github.com/intelsdilabs/pulse/core/ctypes" ) @@ -16,26 +18,24 @@ var ( ) type metricType struct { - Plugin *loadedPlugin - - namespace []string - lastAdvertisedTimestamp int64 - subscriptions int - policy processesConfigData - config *cdata.ConfigDataNode + Plugin *loadedPlugin + namespace []string + lastAdvertisedTime time.Time + subscriptions int + policy processesConfigData + config *cdata.ConfigDataNode } type processesConfigData interface { Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors) } -func newMetricType(ns []string, last int64, plugin *loadedPlugin) *metricType { +func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricType { return &metricType{ Plugin: plugin, - namespace: ns, - lastAdvertisedTimestamp: last, - policy: cpolicy.NewPolicyNode(), + namespace: ns, + lastAdvertisedTime: last, } } @@ -43,8 +43,8 @@ func (m *metricType) Namespace() []string { return m.namespace } -func (m *metricType) LastAdvertisedTimestamp() int64 { - return m.lastAdvertisedTimestamp +func (m *metricType) LastAdvertisedTime() time.Time { + return m.lastAdvertisedTime } func (m *metricType) Subscribe() { @@ -64,6 +64,9 @@ func (m *metricType) SubscriptionCount() int { } func (m *metricType) Version() int { + if m.Plugin == nil { + return -1 + } return m.Plugin.Version() } @@ -89,8 +92,28 @@ func newMetricCatalog() *metricCatalog { } } +func (m *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.MetricType) { + if lp.ConfigPolicyTree == nil { + panic("NO") + } + + newMt := metricType{ + Plugin: lp, + namespace: mt.Namespace(), + lastAdvertisedTime: mt.LastAdvertisedTime(), + // This caches the config policy node within the metric type + // Disabled until ctree + cpolicy is RPC compatible + // policy: lp.ConfigPolicyTree.Get(mt.Namespace()), + } + m.Add(&newMt) +} + // adds a metricType pointer to the loadedPlugins table func (mc *metricCatalog) Add(m *metricType) { + // if m.policy == nil { + // // Set an empty config policy tree if not provided + // m.policy = cpolicy.NewTree() + // } mc.mutex.Lock() defer mc.mutex.Unlock() @@ -189,6 +212,14 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error { return m.Unsubscribe() } +func (mc *metricCatalog) GetPlugin(mns []string, ver int) (*loadedPlugin, error) { + m, err := mc.Get(mns, ver) + if err != nil { + return nil, err + } + return m.Plugin, nil +} + func (mc *metricCatalog) get(key string, ver int) (*metricType, error) { var ( ok bool diff --git a/control/metrics_test.go b/control/metrics_test.go index e5e7d3a4f..5b80709cf 100644 --- a/control/metrics_test.go +++ b/control/metrics_test.go @@ -13,14 +13,14 @@ import ( func TestMetricType(t *testing.T) { Convey("newMetricType()", t, func() { Convey("returns a metricType", func() { - mt := newMetricType([]string{"test"}, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType([]string{"test"}, time.Now(), new(loadedPlugin)) So(mt, ShouldHaveSameTypeAs, new(metricType)) }) }) Convey("metricType.Namespace()", t, func() { Convey("returns the namespace of a metricType", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) So(mt.Namespace(), ShouldHaveSameTypeAs, ns) So(mt.Namespace(), ShouldResemble, ns) }) @@ -29,16 +29,16 @@ func TestMetricType(t *testing.T) { Convey("returns the namespace of a metricType", func() { ns := []string{"test"} lp := &loadedPlugin{Meta: plugin.PluginMeta{Version: 1}} - mt := newMetricType(ns, time.Now().Unix(), lp) + mt := newMetricType(ns, time.Now(), lp) So(mt.Version(), ShouldEqual, 1) }) }) Convey("metricType.LastAdvertisedTimestamp()", t, func() { Convey("returns the LastAdvertisedTimestamp for the metricType", func() { - ts := time.Now().Unix() + ts := time.Now() mt := newMetricType([]string{"test"}, ts, new(loadedPlugin)) - So(mt.LastAdvertisedTimestamp(), ShouldHaveSameTypeAs, ts) - So(mt.LastAdvertisedTimestamp(), ShouldResemble, ts) + So(mt.LastAdvertisedTime(), ShouldHaveSameTypeAs, ts) + So(mt.LastAdvertisedTime(), ShouldResemble, ts) }) }) } @@ -53,7 +53,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Add()", t, func() { Convey("adds a metricType to the metricCatalog", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() mc.Add(mt) _mt, err := mc.Get(ns, -1) @@ -63,7 +63,7 @@ func TestMetricCatalog(t *testing.T) { }) Convey("metricCatalog.Get()", t, func() { mc := newMetricCatalog() - ts := time.Now().Unix() + ts := time.Now() Convey("add multiple metricTypes and get them back", func() { ns := [][]string{ []string{"test1"}, @@ -128,7 +128,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Table()", t, func() { Convey("returns a copy of the table", func() { mc := newMetricCatalog() - mt := newMetricType([]string{"foo", "bar"}, time.Now().Unix(), &loadedPlugin{}) + mt := newMetricType([]string{"foo", "bar"}, time.Now(), &loadedPlugin{}) mc.Add(mt) So(mc.Table(), ShouldHaveSameTypeAs, map[string][]*metricType{}) So(mc.Table()["foo.bar"], ShouldResemble, []*metricType{mt}) @@ -137,7 +137,7 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Remove()", t, func() { Convey("removes a metricType from the catalog", func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() mc.Add(mt) mc.Remove(ns) @@ -148,7 +148,7 @@ func TestMetricCatalog(t *testing.T) { }) Convey("metricCatalog.Next()", t, func() { ns := []string{"test"} - mt := newMetricType(ns, time.Now().Unix(), new(loadedPlugin)) + mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() Convey("returns false on empty table", func() { ok := mc.Next() @@ -167,7 +167,7 @@ func TestMetricCatalog(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - t := time.Now().Unix() + t := time.Now() mt := []*metricType{ newMetricType(ns[0], t, lp), newMetricType(ns[1], t, lp), @@ -208,7 +208,7 @@ func TestSubscribe(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - ts := time.Now().Unix() + ts := time.Now() mt := []*metricType{ newMetricType(ns[0], ts, lp), newMetricType(ns[1], ts, lp), @@ -242,7 +242,7 @@ func TestUnsubscribe(t *testing.T) { []string{"test3"}, } lp := new(loadedPlugin) - ts := time.Now().Unix() + ts := time.Now() mt := []*metricType{ newMetricType(ns[0], ts, lp), newMetricType(ns[1], ts, lp), @@ -278,7 +278,7 @@ func TestUnsubscribe(t *testing.T) { } func TestSubscriptionCount(t *testing.T) { - m := newMetricType([]string{"test"}, time.Now().Unix(), &loadedPlugin{}) + m := newMetricType([]string{"test"}, time.Now(), &loadedPlugin{}) Convey("it returns the subscription count", t, func() { m.Subscribe() So(m.SubscriptionCount(), ShouldEqual, 1) diff --git a/control/monitor.go b/control/monitor.go index 0969ea87b..3f9a8d177 100644 --- a/control/monitor.go +++ b/control/monitor.go @@ -1,12 +1,15 @@ package control -import "time" +import ( + "time" +) const ( MonitorStopped monitorState = iota - 1 // default is stopped MonitorStarted - DefaultMonitorDuration = time.Second * 60 + // Changed to one second until we get proper control of duration runtime into this. + DefaultMonitorDuration = time.Second * 1 ) type monitorState int @@ -60,7 +63,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Collectors.Lock() for availablePlugins.Collectors.Next() { _, apc := availablePlugins.Collectors.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } @@ -70,7 +73,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Publishers.Lock() for availablePlugins.Publishers.Next() { _, apc := availablePlugins.Publishers.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } @@ -80,7 +83,7 @@ func (m *monitor) Start(availablePlugins *availablePlugins) { availablePlugins.Processors.Lock() for availablePlugins.Processors.Next() { _, apc := availablePlugins.Processors.Item() - for _, ap := range *apc { + for _, ap := range *apc.Plugins { go ap.CheckHealth() } } diff --git a/control/monitor_test.go b/control/monitor_test.go index aa01a25ac..e8b33115f 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -13,8 +13,13 @@ import ( type mockPluginClient struct{} -func (mp *mockPluginClient) Ping() error { return nil } -func (mp *mockPluginClient) Kill(r string) error { return nil } +func (mp *mockPluginClient) Ping() error { + return nil +} + +func (mp *mockPluginClient) Kill(r string) error { + return nil +} func TestMonitor(t *testing.T) { Convey("monitor", t, func() { @@ -69,7 +74,7 @@ func TestMonitor(t *testing.T) { for aps.Collectors.Next() { _, item := aps.Collectors.Item() So(item, ShouldNotBeNil) - So((*item)[0].failedHealthChecks, ShouldBeGreaterThan, 3) + So((*(*item).Plugins)[0].failedHealthChecks, ShouldBeGreaterThan, 3) } }) }) @@ -85,7 +90,7 @@ func TestMonitor(t *testing.T) { oldOpt := m.Option(MonitorDuration(time.Millisecond * 200)) So(m.duration, ShouldResemble, time.Millisecond*200) m.Option(oldOpt) - So(m.duration, ShouldResemble, time.Second*60) + So(m.duration, ShouldResemble, time.Second*1) }) }) } diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 05296ed45..7eb61d414 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -1,11 +1,8 @@ package client import ( - "net" - "net/rpc" - "time" - - "github.com/intelsdilabs/pulse/control/plugin" + // "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" ) // A client providing common plugin method calls. @@ -17,8 +14,8 @@ type PluginClient interface { // A client providing collector specific plugin method calls. type PluginCollectorClient interface { PluginClient - Collect(plugin.CollectorArgs, *plugin.CollectorReply) error - GetMetricTypes(plugin.GetMetricTypesArgs, *plugin.GetMetricTypesReply) error + CollectMetrics([]core.MetricType) ([]core.Metric, error) + GetMetricTypes() ([]core.MetricType, error) } // A client providing processor specific plugin method calls. @@ -26,44 +23,3 @@ type PluginProcessorClient interface { PluginClient ProcessMetricData() } - -// Native clients use golang net/rpc for communication to a native rpc server. -type PluginNativeClient struct { - connection *rpc.Client -} - -func (p *PluginNativeClient) Ping() error { - a := plugin.PingArgs{} - b := true - err := p.connection.Call("SessionState.Ping", a, &b) - return err -} - -func (p *PluginNativeClient) Kill(reason string) error { - a := plugin.KillArgs{Reason: reason} - b := true - err := p.connection.Call("SessionState.Kill", a, &b) - return err -} - -func (p *PluginNativeClient) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - err := p.connection.Call("Collector.Collect", args, reply) - return err -} - -func (p *PluginNativeClient) GetMetricTypes(args plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - err := p.connection.Call("Collector.GetMetricTypes", args, reply) - return err -} - -func NewCollectorClient(address string, timeout time.Duration) (PluginCollectorClient, error) { - // Attempt to dial address error on timeout or problem - conn, err := net.DialTimeout("tcp", address, timeout) - // Return nil RPCClient and err if encoutered - if err != nil { - return nil, err - } - r := rpc.NewClient(conn) - p := &PluginNativeClient{connection: r} - return p, nil -} diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go new file mode 100644 index 000000000..e80cce9aa --- /dev/null +++ b/control/plugin/client/native.go @@ -0,0 +1,75 @@ +package client + +import ( + "net" + "net/rpc" + "time" + + "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" +) + +// Native clients use golang net/rpc for communication to a native rpc server. +type PluginNativeClient struct { + connection *rpc.Client +} + +func NewCollectorNativeClient(address string, timeout time.Duration) (PluginCollectorClient, error) { + // Attempt to dial address error on timeout or problem + conn, err := net.DialTimeout("tcp", address, timeout) + // Return nil RPCClient and err if encoutered + if err != nil { + return nil, err + } + r := rpc.NewClient(conn) + p := &PluginNativeClient{connection: r} + return p, nil +} + +func (p *PluginNativeClient) Ping() error { + a := plugin.PingArgs{} + b := true + err := p.connection.Call("SessionState.Ping", a, &b) + return err +} + +func (p *PluginNativeClient) Kill(reason string) error { + a := plugin.KillArgs{Reason: reason} + var b bool + err := p.connection.Call("SessionState.Kill", a, &b) + return err +} + +func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.MetricType) ([]core.Metric, error) { + // Convert core.MetricType slice into plugin.PluginMetricType slice as we have + // to send structs over RPC + pluginMetricTypes := make([]plugin.PluginMetricType, len(coreMetricTypes)) + for i, _ := range coreMetricTypes { + pluginMetricTypes[i] = *plugin.NewPluginMetricType(coreMetricTypes[i].Namespace()) + } + + // TODO return err if mts is empty + args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes} + reply := plugin.CollectMetricsReply{} + + err := p.connection.Call("Collector.CollectMetrics", args, &reply) + + retMetrics := make([]core.Metric, len(reply.PluginMetrics)) + for i, _ := range reply.PluginMetrics { + retMetrics[i] = reply.PluginMetrics[i] + } + return retMetrics, err +} + +func (p *PluginNativeClient) GetMetricTypes() ([]core.MetricType, error) { + args := plugin.GetMetricTypesArgs{} + reply := plugin.GetMetricTypesReply{} + + err := p.connection.Call("Collector.GetMetricTypes", args, &reply) + + retMetricTypes := make([]core.MetricType, len(reply.PluginMetricTypes)) + for i, _ := range reply.PluginMetricTypes { + retMetricTypes[i] = reply.PluginMetricTypes[i] + } + return retMetricTypes, err +} diff --git a/control/plugin/collector.go b/control/plugin/collector.go index b5d0a1cf3..e161f0565 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector.go @@ -7,28 +7,14 @@ import ( "net/rpc" ) +// Acts as a proxy for RPC calls to a CollectorPlugin. This helps keep the function signature simple +// within plugins vs. having to match required RPC patterns. + // Collector plugin type CollectorPlugin interface { Plugin - Collect(CollectorArgs, *CollectorReply) error - GetMetricTypes(GetMetricTypesArgs, *GetMetricTypesReply) error -} - -// Arguments passed to Collect() for a Collector implementation -type CollectorArgs struct { -} - -// Reply assigned by a Collector implementation using Collect() -type CollectorReply struct { -} - -// GetMetricTypesArgs args passed to GetMetricTypes -type GetMetricTypesArgs struct { -} - -// GetMetricTypesReply assigned by GetMetricTypes() implementation -type GetMetricTypesReply struct { - MetricTypes []*MetricType + CollectMetrics([]PluginMetricType) ([]PluginMetric, error) + GetMetricTypes() ([]PluginMetricType, error) } // Execution method for a Collector plugin. Error and exit code (int) returned. @@ -44,9 +30,13 @@ func StartCollector(c CollectorPlugin, s Session, r *Response) (error, int) { s.Logger().Printf("Listening %s\n", l.Addr()) s.Logger().Printf("Session token %s\n", s.Token()) - // We register RPC even in non-daemon mode to ensure it would be successful. - // Register the collector RPC methods from plugin implementation - rpc.Register(c) + // Create our proxy + proxy := &collectorPluginProxy{ + Plugin: c, + Session: s, + } + // Register the proxy under the "Collector" namespace + rpc.RegisterName("Collector", proxy) // Register common plugin methods used for utility reasons e := rpc.Register(s) if e != nil { diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go new file mode 100644 index 000000000..ddc7be501 --- /dev/null +++ b/control/plugin/collector_proxy.go @@ -0,0 +1,54 @@ +package plugin + +import ( + "errors" + "fmt" +) + +// Arguments passed to CollectMetrics() for a Collector implementation +type CollectMetricsArgs struct { + PluginMetricTypes []PluginMetricType +} + +// Reply assigned by a Collector implementation using CollectMetrics() +type CollectMetricsReply struct { + PluginMetrics []PluginMetric +} + +// GetMetricTypesArgs args passed to GetMetricTypes +type GetMetricTypesArgs struct { +} + +// GetMetricTypesReply assigned by GetMetricTypes() implementation +type GetMetricTypesReply struct { + PluginMetricTypes []PluginMetricType +} + +type collectorPluginProxy struct { + Plugin CollectorPlugin + Session Session +} + +func (c *collectorPluginProxy) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { + c.Session.Logger().Println("GetMetricTypes called") + // Reset heartbeat + c.Session.ResetHeartbeat() + mts, err := c.Plugin.GetMetricTypes() + if err != nil { + return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) + } + reply.PluginMetricTypes = mts + return nil +} + +func (c *collectorPluginProxy) CollectMetrics(args CollectMetricsArgs, reply *CollectMetricsReply) error { + c.Session.Logger().Println("CollectMetrics called") + // Reset heartbeat + c.Session.ResetHeartbeat() + ms, err := c.Plugin.CollectMetrics(args.PluginMetricTypes) + if err != nil { + return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) + } + reply.PluginMetrics = ms + return nil +} diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 2392cced0..396ceeaa8 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -48,6 +48,10 @@ func (s *MockSessionState) Token() string { return s.token } +func (m *MockSessionState) ResetHeartbeat() { + +} + func (s *MockSessionState) KillChan() chan int { return s.killChan } @@ -66,19 +70,17 @@ func (s *MockSessionState) heartbeatWatch(killChan chan int) { } type MockPlugin struct { - Meta PluginMeta - Policy ConfigPolicy + Meta PluginMeta } -func (f *MockPlugin) Collect(args CollectorArgs, reply *CollectorReply) error { - return nil +func (f *MockPlugin) CollectMetrics(_ []PluginMetricType) ([]PluginMetric, error) { + return []PluginMetric{}, nil } -func (c *MockPlugin) GetMetricTypes(args GetMetricTypesArgs, reply *GetMetricTypesReply) error { - reply.MetricTypes = []*MetricType{ - NewMetricType([]string{"org", "some_metric"}, time.Now().Unix()), - } - return nil +func (c *MockPlugin) GetMetricTypes() ([]PluginMetricType, error) { + return []PluginMetricType{ + *NewPluginMetricType([]string{"org", "some_metric"}), + }, nil } func TestStartCollector(t *testing.T) { diff --git a/core/cpolicy/float.go b/control/plugin/cpolicy/float.go similarity index 100% rename from core/cpolicy/float.go rename to control/plugin/cpolicy/float.go diff --git a/core/cpolicy/float_test.go b/control/plugin/cpolicy/float_test.go similarity index 100% rename from core/cpolicy/float_test.go rename to control/plugin/cpolicy/float_test.go diff --git a/core/cpolicy/integer.go b/control/plugin/cpolicy/integer.go similarity index 100% rename from core/cpolicy/integer.go rename to control/plugin/cpolicy/integer.go diff --git a/core/cpolicy/integer_test.go b/control/plugin/cpolicy/integer_test.go similarity index 100% rename from core/cpolicy/integer_test.go rename to control/plugin/cpolicy/integer_test.go diff --git a/core/cpolicy/node.go b/control/plugin/cpolicy/node.go similarity index 86% rename from core/cpolicy/node.go rename to control/plugin/cpolicy/node.go index 27eb1913a..fa714dffb 100644 --- a/core/cpolicy/node.go +++ b/control/plugin/cpolicy/node.go @@ -1,6 +1,8 @@ package cpolicy import ( + "bytes" + "encoding/gob" "errors" "fmt" "sync" @@ -49,6 +51,21 @@ func NewPolicyNode() *ConfigPolicyNode { } } +func (c *ConfigPolicyNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(&c.rules); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigPolicyNode) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.rules) +} + // Adds a rule to this policy node func (p *ConfigPolicyNode) Add(rules ...Rule) { p.mutex.Lock() diff --git a/core/cpolicy/node_test.go b/control/plugin/cpolicy/node_test.go similarity index 100% rename from core/cpolicy/node_test.go rename to control/plugin/cpolicy/node_test.go diff --git a/core/cpolicy/rule.go b/control/plugin/cpolicy/rule.go similarity index 100% rename from core/cpolicy/rule.go rename to control/plugin/cpolicy/rule.go diff --git a/core/cpolicy/string.go b/control/plugin/cpolicy/string.go similarity index 58% rename from core/cpolicy/string.go rename to control/plugin/cpolicy/string.go index 5dee799e8..824100d18 100644 --- a/core/cpolicy/string.go +++ b/control/plugin/cpolicy/string.go @@ -1,6 +1,9 @@ package cpolicy import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/core/ctypes" ) @@ -30,6 +33,43 @@ func NewStringRule(key string, req bool, opts ...string) (*stringRule, error) { }, nil } +func (s *stringRule) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(s.key); err != nil { + return nil, err + } + if err := encoder.Encode(s.required); err != nil { + return nil, err + } + if s.default_ == nil { + encoder.Encode(false) + } else { + encoder.Encode(true) + if err := encoder.Encode(&s.default_); err != nil { + return nil, err + } + } + return w.Bytes(), nil +} + +func (s *stringRule) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&s.key); err != nil { + return err + } + if err := decoder.Decode(&s.required); err != nil { + return err + } + var is_default_set bool + decoder.Decode(&is_default_set) + if is_default_set { + return decoder.Decode(&s.default_) + } + return nil +} + // Returns the key func (s *stringRule) Key() string { return s.key diff --git a/core/cpolicy/string_test.go b/control/plugin/cpolicy/string_test.go similarity index 100% rename from core/cpolicy/string_test.go rename to control/plugin/cpolicy/string_test.go diff --git a/core/cpolicy/tree.go b/control/plugin/cpolicy/tree.go similarity index 72% rename from core/cpolicy/tree.go rename to control/plugin/cpolicy/tree.go index d57543b1b..b8ffd1716 100644 --- a/core/cpolicy/tree.go +++ b/control/plugin/cpolicy/tree.go @@ -1,6 +1,9 @@ package cpolicy import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/pkg/ctree" ) @@ -17,6 +20,22 @@ func NewTree() *ConfigPolicyTree { } } +func (c *ConfigPolicyTree) GobEncode() ([]byte, error) { + //todo throw an error if not frozen + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.cTree); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigPolicyTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.cTree) +} + // Adds a ConfigDataNode at the provided namespace. func (c *ConfigPolicyTree) Add(ns []string, cpn *ConfigPolicyNode) { c.cTree.Add(ns, cpn) diff --git a/core/cpolicy/tree_test.go b/control/plugin/cpolicy/tree_test.go similarity index 76% rename from core/cpolicy/tree_test.go rename to control/plugin/cpolicy/tree_test.go index 6492a93f4..e86edeec6 100644 --- a/core/cpolicy/tree_test.go +++ b/control/plugin/cpolicy/tree_test.go @@ -1,6 +1,7 @@ package cpolicy import ( + "encoding/gob" "testing" "github.com/intelsdilabs/pulse/core/ctypes" @@ -30,6 +31,26 @@ func TestConfigPolicyTree(t *testing.T) { So(gc.rules["username"].Default().(*ctypes.ConfigValueStr).Value, ShouldEqual, "root") So(gc.rules["password"].Required(), ShouldEqual, true) }) + Convey("encode & decode", func() { + gob.Register(NewPolicyNode()) + gob.Register(&stringRule{}) + buf, err := t.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + t2 := NewTree() + err = t2.GobDecode(buf) + So(err, ShouldBeNil) + So(t2.cTree, ShouldNotBeNil) + gc := t2.Get([]string{"one", "two", "potato"}) + So(gc, ShouldNotBeNil) + So(gc.rules["username"], ShouldNotBeNil) + So(gc.rules["username"].Required(), ShouldEqual, false) + So(gc.rules["password"].Required(), ShouldEqual, true) + So(gc.rules["username"].Default(), ShouldNotBeNil) + So(gc.rules["password"].Default(), ShouldBeNil) + So(gc.rules["username"].Default().(*ctypes.ConfigValueStr).Value, ShouldEqual, "root") + println(len(gc.rules)) + }) }) diff --git a/control/plugin/metric.go b/control/plugin/metric.go new file mode 100644 index 000000000..0180a8c5c --- /dev/null +++ b/control/plugin/metric.go @@ -0,0 +1,86 @@ +package plugin + +import ( + "time" + + "github.com/intelsdilabs/pulse/core/cdata" +) + +// Represents a collected metric. Only used within plugins and across plugin calls. +// Converted to core.Metric before being used within modules. +type PluginMetric struct { + Namespace_ []string + Data_ interface{} +} + +func (p PluginMetric) Namespace() []string { + return p.Namespace_ +} + +func (p PluginMetric) Data() interface{} { + return nil +} + +// Represents a metric type. Only used within plugins and across plugin calls. +// Converted to core.MetricType before being used within modules. +type PluginMetricType struct { + Namespace_ []string + LastAdvertisedTime_ time.Time + Version_ int +} + +// Returns the namespace. +func (p PluginMetricType) Namespace() []string { + return p.Namespace_ +} + +// Returns the last time this metric type was received from the plugin. +func (p PluginMetricType) LastAdvertisedTime() time.Time { + return p.LastAdvertisedTime_ +} + +// Returns the namespace. +func (p PluginMetricType) Version() int { + return p.Version_ +} + +// This version of MetricType never implements cdata.ConfigDataNode +func (p PluginMetricType) Config() *cdata.ConfigDataNode { + return nil +} + +func NewPluginMetricType(ns []string) *PluginMetricType { + return &PluginMetricType{ + Namespace_: ns, + LastAdvertisedTime_: time.Now(), + } +} + +/* + +core.Metric(i) (used by pulse modules) +core.MetricType(i) (used by pulse modules) + +PluginMetric (created by plugins and returned, goes over RPC) +PLuginMetricType (created by plugins and returned, goes over RPC) + +Get + +agent - call Get +client - call Get +plugin - builds array of PluginMetricTypes +plugin - return array of PluginMetricTypes +client - returns array of PluginMetricTypes through MetricType interface +agent - receives MetricTypes + +Collect + +agent - call Collect pass MetricTypes +client - call Collect, convert MetricTypes into new (plugin) PluginMetricTypes struct +plugin - build array of PluginMetric based on (plugin) MetricTypes +plugin - return array of PluginMetrics +client - return array of PluginMetrics through core.Metrics interface +agent - receive array of core.Metrics + + +*/ diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index 24d47854d..21b8d01ed 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -1,18 +1,11 @@ package plugin -// Config Policy -// task > control > default - +// WARNING! Do not import "fmt" and print from a plugin to stdout! import ( - "crypto/rand" "crypto/rsa" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "log" - "os" "time" + + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) var ( @@ -46,26 +39,6 @@ var ( } ) -type MetricType struct { - namespace []string - lastAdvertisedTimestamp int64 -} - -func (m *MetricType) Namespace() []string { - return m.namespace -} - -func (m *MetricType) LastAdvertisedTimestamp() int64 { - return m.lastAdvertisedTimestamp -} - -func NewMetricType(ns []string, last int64) *MetricType { - return &MetricType{ - namespace: ns, - lastAdvertisedTimestamp: last, - } -} - type PluginResponseState int type PluginType int @@ -79,33 +52,6 @@ func (p PluginType) String() string { return types[p] } -// Session interface -type Session interface { - Ping(arg PingArgs, b *bool) error - Kill(arg KillArgs, b *bool) error - Logger() *log.Logger - ListenAddress() string - SetListenAddress(string) - ListenPort() string - Token() string - KillChan() chan int - - generateResponse(r *Response) []byte - heartbeatWatch(killChan chan int) - isDaemon() bool -} - -// Started plugin session state -type SessionState struct { - *Arg - LastPing time.Time - - token string - listenAddress string - killChan chan int - logger *log.Logger -} - // Arguments passed to startup of Plugin type Arg struct { // Plugin file path to binary @@ -125,28 +71,22 @@ func NewArg(pubkey *rsa.PublicKey, logpath string) Arg { } } -// Arguments passed to ping -type PingArgs struct{} - -type KillArgs struct { - Reason string -} - // Response from started plugin type Response struct { - Meta PluginMeta - ListenAddress string - Token string - Type PluginType + Meta PluginMeta + ListenAddress string + Token string + Type PluginType + ConfigPolicyTree cpolicy.ConfigPolicyTree // State is a signal from plugin to control that it passed // its own loading requirements State PluginResponseState ErrorMessage string } -// ConfigPolicy for plugin -type ConfigPolicy struct { -} +// // ConfigPolicy for plugin +// type ConfigPolicy struct { +// } // PluginMeta for plugin type PluginMeta struct { @@ -164,136 +104,8 @@ func NewPluginMeta(name string, version int, pluginType PluginType) *PluginMeta } } -// Ping returns nothing in normal operation -func (s *SessionState) Ping(arg PingArgs, b *bool) error { - // For now we return nil. We can return an error if we are shutting - // down or otherwise in a state we should signal poor health. - // Reply should contain any context. - s.LastPing = time.Now() - s.logger.Println("Ping received") - return nil -} - -// Kill will stop a running plugin -func (s *SessionState) Kill(arg KillArgs, b *bool) error { - // Right now we have no coordination needed. In the future we should - // add control to wait on a lock before halting. - s.logger.Printf("Kill called by agent, reason: %s\n", arg.Reason) - go func() { - time.Sleep(time.Second * 2) - s.killChan <- 0 - }() - return nil -} - -// Logger gets the SessionState logger -func (s *SessionState) Logger() *log.Logger { - return s.logger -} - -// ListenAddress gets the SessionState listen address -func (s *SessionState) ListenAddress() string { - return s.listenAddress -} - -//ListenPort gets the SessionState listen port -func (s *SessionState) ListenPort() string { - return s.listenPort -} - -// SetListenAddress sets SessionState listen address -func (s *SessionState) SetListenAddress(a string) { - s.listenAddress = a -} - -// Token gets the SessionState token -func (s *SessionState) Token() string { - return s.token -} - -// KillChan gets the SessionState killchan -func (s *SessionState) KillChan() chan int { - return s.killChan -} - -func (s *SessionState) isDaemon() bool { - return !s.NoDaemon -} - -func (s *SessionState) generateResponse(r *Response) []byte { - // Add common plugin response properties - r.ListenAddress = s.listenAddress - r.Token = s.token - rs, _ := json.Marshal(r) - return rs -} - -func (s *SessionState) heartbeatWatch(killChan chan int) { - s.logger.Println("Heartbeat started") - count := 0 - for { - if time.Now().Sub(s.LastPing) >= PingTimeoutDuration { - count++ - if count >= PingTimeoutLimit { - s.logger.Println("Heartbeat timeout expired") - defer close(killChan) - return - } - } else { - s.logger.Println("Heartbeat timeout reset") - // Reset count - count = 0 - } - time.Sleep(PingTimeoutDuration) - } -} - -// NewSessionState takes the plugin args and returns a SessionState -func NewSessionState(pluginArgsMsg string) (*SessionState, error, int) { - pluginArg := &Arg{} - err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) - if err != nil { - return nil, err, 2 - } - - // If no port was provided we let the OS select a port for us. - // This is safe as address is returned in the Response and keep - // alive prevents unattended plugins. - if pluginArg.listenPort == "" { - pluginArg.listenPort = "0" - } - - // Generate random token for this session - rb := make([]byte, 32) - rand.Read(rb) - rs := base64.URLEncoding.EncodeToString(rb) - - var logger *log.Logger - switch lp := pluginArg.PluginLogPath; lp { - case "", "/tmp": - // Empty means use default tmp log (needs to be removed post-alpha) - lf, err := os.OpenFile("/tmp/pulse_plugin.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 - } - logger = log.New(lf, ">>>", log.Ldate|log.Ltime) - default: - lf, err := os.OpenFile(lp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 - } - logger = log.New(lf, ">>>", log.Ldate|log.Ltime) - } - - return &SessionState{ - Arg: pluginArg, - token: rs, - killChan: make(chan int), - logger: logger}, nil, 0 -} - // Start starts a plugin -func Start(m *PluginMeta, c Plugin, p *ConfigPolicy, requestString string) (error, int) { +func Start(m *PluginMeta, c Plugin, t *cpolicy.ConfigPolicyTree, requestString string) (error, int) { sessionState, sErr, retCode := NewSessionState(requestString) if sErr != nil { @@ -306,9 +118,10 @@ func Start(m *PluginMeta, c Plugin, p *ConfigPolicy, requestString string) (erro switch m.Type { case CollectorPluginType: r := &Response{ - Type: CollectorPluginType, - State: PluginSuccess, - Meta: *m, + Type: CollectorPluginType, + State: PluginSuccess, + Meta: *m, + ConfigPolicyTree: *t, } err, retCode := StartCollector(c.(CollectorPlugin), sessionState, r) if err != nil { diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 84d2a18f2..6220a701f 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" + . "github.com/smartystreets/goconvey/convey" ) @@ -21,16 +23,17 @@ func TestPluginType(t *testing.T) { func TestMetricType(t *testing.T) { Convey("MetricType", t, func() { - now := time.Now().Unix() - m := NewMetricType([]string{"foo", "bar"}, now) + now := time.Now() + m := NewPluginMetricType([]string{"foo", "bar"}) Convey("New", func() { - So(m, ShouldHaveSameTypeAs, &MetricType{}) + So(m, ShouldHaveSameTypeAs, &PluginMetricType{}) }) Convey("Get Namespace", func() { So(m.Namespace(), ShouldResemble, []string{"foo", "bar"}) }) Convey("Get LastAdvertisedTimestamp", func() { - So(m.LastAdvertisedTimestamp(), ShouldEqual, now) + So(m.LastAdvertisedTime().Unix(), ShouldBeGreaterThan, now.Unix()-2) + So(m.LastAdvertisedTime().Unix(), ShouldBeLessThan, now.Unix()+2) }) }) } @@ -112,17 +115,17 @@ func TestArg(t *testing.T) { func TestPlugin(t *testing.T) { Convey("Start", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicy := new(ConfigPolicy) + mockConfigPolicyTree := cpolicy.NewTree() var mockPluginArgs string = "{\"PluginLogPath\": \"/var/tmp/pulse_plugin.log\"}" - err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicy, mockPluginArgs) + err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldBeNil) So(rc, ShouldEqual, 0) }) Convey("Start with invalid args", t, func() { mockPluginMeta := NewPluginMeta("test", 1, CollectorPluginType) - mockConfigPolicy := new(ConfigPolicy) + mockConfigPolicyTree := cpolicy.NewTree() var mockPluginArgs string = "" - err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicy, mockPluginArgs) + err, rc := Start(mockPluginMeta, new(MockPlugin), mockConfigPolicyTree, mockPluginArgs) So(err, ShouldNotBeNil) So(rc, ShouldNotEqual, 0) }) diff --git a/control/plugin/session.go b/control/plugin/session.go new file mode 100644 index 000000000..85324f197 --- /dev/null +++ b/control/plugin/session.go @@ -0,0 +1,179 @@ +package plugin + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "time" +) + +// Session interface +type Session interface { + Ping(arg PingArgs, b *bool) error + Kill(arg KillArgs, b *bool) error + Logger() *log.Logger + ListenAddress() string + SetListenAddress(string) + ListenPort() string + Token() string + KillChan() chan int + ResetHeartbeat() + + generateResponse(r *Response) []byte + heartbeatWatch(killChan chan int) + isDaemon() bool +} + +// Arguments passed to ping +type PingArgs struct{} + +type KillArgs struct { + Reason string +} + +// Started plugin session state +type SessionState struct { + *Arg + LastPing time.Time + + token string + listenAddress string + killChan chan int + logger *log.Logger +} + +// Ping returns nothing in normal operation +func (s *SessionState) Ping(arg PingArgs, b *bool) error { + // For now we return nil. We can return an error if we are shutting + // down or otherwise in a state we should signal poor health. + // Reply should contain any context. + s.ResetHeartbeat() + s.logger.Println("Ping received") + return nil +} + +// Kill will stop a running plugin +func (s *SessionState) Kill(arg KillArgs, b *bool) error { + // Right now we have no coordination needed. In the future we should + // add control to wait on a lock before halting. + s.logger.Printf("Kill called by agent, reason: %s\n", arg.Reason) + go func() { + time.Sleep(time.Second * 2) + s.killChan <- 0 + }() + return nil +} + +// Logger gets the SessionState logger +func (s *SessionState) Logger() *log.Logger { + return s.logger +} + +// ListenAddress gets the SessionState listen address +func (s *SessionState) ListenAddress() string { + return s.listenAddress +} + +//ListenPort gets the SessionState listen port +func (s *SessionState) ListenPort() string { + return s.listenPort +} + +// SetListenAddress sets SessionState listen address +func (s *SessionState) SetListenAddress(a string) { + s.listenAddress = a +} + +func (s *SessionState) ResetHeartbeat() { + s.LastPing = time.Now() +} + +// Token gets the SessionState token +func (s *SessionState) Token() string { + return s.token +} + +// KillChan gets the SessionState killchan +func (s *SessionState) KillChan() chan int { + return s.killChan +} + +func (s *SessionState) isDaemon() bool { + return !s.NoDaemon +} + +func (s *SessionState) generateResponse(r *Response) []byte { + // Add common plugin response properties + r.ListenAddress = s.listenAddress + r.Token = s.token + rs, _ := json.Marshal(r) + return rs +} + +func (s *SessionState) heartbeatWatch(killChan chan int) { + s.logger.Println("Heartbeat started") + count := 0 + for { + if time.Now().Sub(s.LastPing) >= PingTimeoutDuration { + count++ + if count >= PingTimeoutLimit { + s.logger.Println("Heartbeat timeout expired") + defer close(killChan) + return + } + } else { + s.logger.Println("Heartbeat timeout reset") + // Reset count + count = 0 + } + time.Sleep(PingTimeoutDuration) + } +} + +// NewSessionState takes the plugin args and returns a SessionState +func NewSessionState(pluginArgsMsg string) (*SessionState, error, int) { + pluginArg := &Arg{} + err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) + if err != nil { + return nil, err, 2 + } + + // If no port was provided we let the OS select a port for us. + // This is safe as address is returned in the Response and keep + // alive prevents unattended plugins. + if pluginArg.listenPort == "" { + pluginArg.listenPort = "0" + } + + // Generate random token for this session + rb := make([]byte, 32) + rand.Read(rb) + rs := base64.URLEncoding.EncodeToString(rb) + + var logger *log.Logger + switch lp := pluginArg.PluginLogPath; lp { + case "", "/tmp": + // Empty means use default tmp log (needs to be removed post-alpha) + lf, err := os.OpenFile("/tmp/pulse_plugin.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 + } + logger = log.New(lf, ">>>", log.Ldate|log.Ltime) + default: + lf, err := os.OpenFile(lp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + if err != nil { + return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 + } + logger = log.New(lf, ">>>", log.Ldate|log.Ltime) + } + + return &SessionState{ + Arg: pluginArg, + token: rs, + killChan: make(chan int), + logger: logger}, nil, 0 +} diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 7eb5af61d..725670669 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -12,6 +12,8 @@ import ( "time" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/client" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -131,12 +133,13 @@ func (l *loadedPlugins) Next() bool { // the struct representing a plugin that is loaded into Pulse type loadedPlugin struct { - Meta plugin.PluginMeta - Path string - Type plugin.PluginType - State pluginState - Token string - LoadedTime time.Time + Meta plugin.PluginMeta + Path string + Type plugin.PluginType + State pluginState + Token string + LoadedTime time.Time + ConfigPolicyTree *cpolicy.ConfigPolicyTree } // returns plugin name @@ -145,6 +148,10 @@ func (lp *loadedPlugin) Name() string { return lp.Meta.Name } +func (l *loadedPlugin) Key() string { + return fmt.Sprintf("%s:%d", l.Name(), l.Version()) +} + // returns plugin version // implements the CatalogedPlugin interface func (lp *loadedPlugin) Version() int { @@ -172,10 +179,10 @@ func (lp *loadedPlugin) LoadedTimestamp() int64 { // the struct representing the object responsible for // loading and unloading plugins type pluginManager struct { + metricCatalog catalogsMetrics loadedPlugins *loadedPlugins - - privKey *rsa.PrivateKey - pubKey *rsa.PublicKey + privKey *rsa.PrivateKey + pubKey *rsa.PublicKey } func newPluginManager() *pluginManager { @@ -185,6 +192,10 @@ func newPluginManager() *pluginManager { return p } +func (p *pluginManager) SetMetricCatalog(mc catalogsMetrics) { + p.metricCatalog = mc +} + // Returns loaded plugins func (p *pluginManager) LoadedPlugins() *loadedPlugins { return p.loadedPlugins @@ -198,7 +209,7 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { lPlugin.Path = path lPlugin.State = DetectedState - ePlugin, err := plugin.NewExecutablePlugin(p.generateArgs(), lPlugin.Path) + ePlugin, err := plugin.NewExecutablePlugin(p.GenerateArgs(), lPlugin.Path) if err != nil { log.Println(err) @@ -217,17 +228,28 @@ func (p *pluginManager) LoadPlugin(path string) (*loadedPlugin, error) { log.Println(err) return nil, err } + + // Add config policy tree to loaded plugin + lPlugin.ConfigPolicyTree = &resp.ConfigPolicyTree + if resp.Type == plugin.CollectorPluginType { ap, err := newAvailablePlugin(resp) if err != nil { log.Println(err.Error()) return nil, err } - col := ap.Client.(plugin.CollectorPlugin) - args := new(plugin.GetMetricTypesArgs) - var reply *plugin.GetMetricTypesReply - //TODO update metric catalog with metric types below - _ = col.GetMetricTypes(*args, reply) + colClient := ap.Client.(client.PluginCollectorClient) + metricTypes, err := colClient.GetMetricTypes() + if err != nil { + log.Println(err) + return nil, err + } + // Add metric types to metric catalog + // + // For each MT returned we pair the loaded plugin and call AddLoadedMetricType to add + for _, mt := range metricTypes { + p.metricCatalog.AddLoadedMetricType(lPlugin, mt) + } } err = ePlugin.Kill() @@ -301,6 +323,6 @@ func (p *pluginManager) UnloadPlugin(pl CatalogedPlugin) error { return nil } -func (p *pluginManager) generateArgs() plugin.Arg { - return plugin.NewArg(p.pubKey, "/tmp/pulse-plugin.log") +func (p *pluginManager) GenerateArgs() plugin.Arg { + return plugin.NewArg(p.pubKey, "/tmp/pulse-plugin-foo.log") } diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 78dbac420..68a8e8178 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -63,6 +63,7 @@ func TestLoadPlugin(t *testing.T) { Convey("loads plugin successfully", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath) So(lp, ShouldHaveSameTypeAs, new(loadedPlugin)) @@ -91,6 +92,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a loaded plugin is unloaded", func() { Convey("then it is removed from the loadedPlugins", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) num_plugins_loaded := len(p.LoadedPlugins().Table()) @@ -105,6 +107,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a loaded plugin is not in a PluginLoaded state", func() { Convey("then an error is thrown", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) lp, _ := p.LoadedPlugins().Get(0) lp.State = DetectedState @@ -116,6 +119,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a plugin is already unloaded", func() { Convey("then an error is thrown", func() { p := newPluginManager() + p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath) plugin, _ := p.LoadedPlugins().Get(0) diff --git a/control/router.go b/control/router.go new file mode 100644 index 000000000..7284ee44c --- /dev/null +++ b/control/router.go @@ -0,0 +1,135 @@ +// Router is the entry point for execution commands and routing to plugins +package control + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/intelsdilabs/pulse/control/plugin/client" + "github.com/intelsdilabs/pulse/control/routing" + "github.com/intelsdilabs/pulse/core" + "github.com/intelsdilabs/pulse/core/cdata" +) + +type RouterResponse interface { +} + +type RoutingStrategy interface { + Select(routing.SelectablePluginPool, []routing.SelectablePlugin) (routing.SelectablePlugin, error) +} + +type pluginRouter struct { + Strategy RoutingStrategy + + metricCatalog catalogsMetrics + pluginRunner runsPlugins +} + +func newPluginRouter() *pluginRouter { + return &pluginRouter{ + Strategy: &routing.RoundRobinStrategy{}, + } +} + +type pluginCallSelection struct { + Plugin *loadedPlugin + MetricTypes []core.MetricType +} + +func (p *pluginCallSelection) Count() int { + return len(p.MetricTypes) +} + +// Calls collector plugins for the metric types and returns collection response containing metrics. Blocking method. +func (p *pluginRouter) Collect(metricTypes []core.MetricType, config *cdata.ConfigDataNode, deadline time.Time) (response *collectionResponse, err error) { + pluginCallSelectionMap := make(map[string]pluginCallSelection) + // For each plugin type select a matching available plugin to call + for _, m := range metricTypes { + + // This is set to choose the newest and not pin version. TODO, be sure version is set to -1 if not provided by user on Task creation. + lp, err := p.metricCatalog.GetPlugin(m.Namespace(), -1) + + // Single error fails entire operation. + if err != nil { + return nil, err + } + + // Single error fails entire operation. + if lp == nil { + return nil, errors.New(fmt.Sprintf("Metric missing: %s", strings.Join(m.Namespace(), "/"))) + } + + fmt.Printf("Found plugin (%s v%d) for metric (%s)\n", lp.Name(), lp.Version(), strings.Join(m.Namespace(), "/")) + x, _ := pluginCallSelectionMap[lp.Key()] + x.Plugin = lp + x.MetricTypes = append(x.MetricTypes, m) + pluginCallSelectionMap[lp.Key()] = x + + } + + // For each available plugin call available plugin using RPC client and wait for response (goroutines) + var selectedAP *availablePlugin + for pluginKey, metrics := range pluginCallSelectionMap { + fmt.Printf("plugin: (%s) has (%d) metrics to gather\n", pluginKey, metrics.Count()) + + apPluginPool := p.pluginRunner.AvailablePlugins().Collectors.GetPluginPool(pluginKey) + + if apPluginPool == nil { + // return error because this plugin has no pool + return nil, errors.New(fmt.Sprintf("no available plugins for plugin type (%s)", pluginKey)) + } + + // Lock this apPool so we are the only one operating on it. + if apPluginPool.Count() == 0 { + // return error indicating we have no available plugins to call for Collect + return nil, errors.New(fmt.Sprintf("no available plugins for plugin type (%s)", pluginKey)) + } + + // Use a router strategy to select an available plugin from the pool + fmt.Printf("%d available plugin in pool for (%s)\n", apPluginPool.Count(), pluginKey) + ap, err := apPluginPool.SelectUsingStrategy(p.Strategy) + + if err != nil { + return nil, err + } + + if ap == nil { + return nil, errors.New(fmt.Sprintf("no available plugin selected (%s)", pluginKey)) + } + selectedAP = ap + } + + resp := newCollectionResponse() + // Attempt collection on selected available plugin + selectedAP.hitCount++ + selectedAP.lastHitTime = time.Now() + metrics, err := selectedAP.Client.(client.PluginCollectorClient).CollectMetrics(metricTypes) + if err != nil { + resp.Errors = append(resp.Errors, err) + return resp, nil + } + resp.Metrics = metrics + return resp, nil +} + +func (p *pluginRouter) SetRunner(r runsPlugins) { + p.pluginRunner = r +} + +func (p *pluginRouter) SetMetricCatalog(m catalogsMetrics) { + p.metricCatalog = m +} + +type collectionResponse struct { + Metrics []core.Metric + Errors []error +} + +func newCollectionResponse() *collectionResponse { + return &collectionResponse{ + Metrics: make([]core.Metric, 0), + Errors: make([]error, 0), + } +} diff --git a/control/router_test.go b/control/router_test.go new file mode 100644 index 000000000..202c62b53 --- /dev/null +++ b/control/router_test.go @@ -0,0 +1,77 @@ +package control + +import ( + "fmt" + "testing" + "time" + + "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core" + "github.com/intelsdilabs/pulse/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +type MockMetricType struct { + namespace []string +} + +func (m MockMetricType) Namespace() []string { + return m.namespace +} + +func (m MockMetricType) LastAdvertisedTime() time.Time { + return time.Now() +} + +func (m MockMetricType) Version() int { + return 1 +} + +func (m MockMetricType) Config() *cdata.ConfigDataNode { + return nil +} + +func TestRouter(t *testing.T) { + Convey("given a new router", t, func() { + // adjust HB timeouts for test + plugin.PingTimeoutLimit = 1 + plugin.PingTimeoutDuration = time.Second * 1 + + // Create controller + c := New() + c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 + c.Start() + // Load plugin + c.Load(PluginPath) + + m := []core.MetricType{} + m1 := MockMetricType{namespace: []string{"intel", "dummy", "foo"}} + m2 := MockMetricType{namespace: []string{"intel", "dummy", "bar"}} + // m3 := MockMetricType{namespace: []string{"intel", "dummy", "baz"}} + m = append(m, m1) + m = append(m, m2) + // m = append(m, m3) + cd := cdata.NewNode() + fmt.Println(cd.Table()) + + fmt.Println(m1.Namespace(), m1.Version(), cd) + // Subscribe + a, b := c.SubscribeMetricType(m1, cd) + fmt.Println(a, b) + time.Sleep(time.Millisecond * 100) + c.SubscribeMetricType(m2, cd) + time.Sleep(time.Millisecond * 200) + + // Call collect on router + + for x := 0; x < 5; x++ { + fmt.Println("\n * Calling Collect") + cr, err := c.pluginRouter.Collect(m, cd, time.Now().Add(time.Second*60)) + if err != nil { + fmt.Println(err) + } + fmt.Printf(" * Collect Response: %+v\n", cr) + } + time.Sleep(time.Millisecond * 200) + }) +} diff --git a/control/routing/round_robin.go b/control/routing/round_robin.go new file mode 100644 index 000000000..afab934dd --- /dev/null +++ b/control/routing/round_robin.go @@ -0,0 +1,35 @@ +package routing + +import ( + "errors" + "fmt" + "math/rand" +) + +type RoundRobinStrategy struct { +} + +func (r *RoundRobinStrategy) Select(spp SelectablePluginPool, spa []SelectablePlugin) (SelectablePlugin, error) { + var h int = -1 + var index int = -1 + fmt.Printf("Using round robin selection on pool of %d plugins\n", len(spa)) + for i, sp := range spa { + // look for the lowest hit count + if sp.HitCount() < h || h == -1 { + index = i + h = sp.HitCount() + } + // on a hitcount tie we randomly choose one + if sp.HitCount() == h { + if rand.Intn(1) == 1 { + index = i + h = sp.HitCount() + } + } + } + if index > -1 { + fmt.Printf("Selecting plugin at index (%d) with hitcount of (%d)\n", index, spa[index].HitCount()) + return spa[index], nil + } + return nil, errors.New("could not select a plugin (round robin strategy)") +} diff --git a/control/routing/routing.go b/control/routing/routing.go new file mode 100644 index 000000000..cd42bc04c --- /dev/null +++ b/control/routing/routing.go @@ -0,0 +1,13 @@ +package routing + +import ( + "time" +) + +type SelectablePluginPool interface { +} + +type SelectablePlugin interface { + HitCount() int + LastHit() time.Time +} diff --git a/control/runner.go b/control/runner.go index 975fadba0..332f5b085 100644 --- a/control/runner.go +++ b/control/runner.go @@ -2,11 +2,16 @@ package control import ( "errors" + "fmt" + "sync" "time" + "strings" + "github.com/intelsdilabs/gomit" "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/core/control_event" ) const ( @@ -16,6 +21,9 @@ const ( PluginRunning availablePluginState = iota - 1 // Default value (0) is Running PluginStopped PluginDisabled + + // Until more advanced decisioning on starting exists this is the max number to spawn. + MaximumRunningPlugins = 3 ) // TBD @@ -30,16 +38,32 @@ type runner struct { delegates []gomit.Delegator monitor *monitor availablePlugins *availablePlugins + metricCatalog catalogsMetrics + pluginManager managesPlugins + mutex *sync.Mutex } func newRunner() *runner { r := &runner{ monitor: newMonitor(), availablePlugins: newAvailablePlugins(), + mutex: &sync.Mutex{}, } return r } +func (r *runner) SetMetricCatalog(c catalogsMetrics) { + r.metricCatalog = c +} + +func (r *runner) SetPluginManager(m managesPlugins) { + r.pluginManager = m +} + +func (r *runner) AvailablePlugins() *availablePlugins { + return r.availablePlugins +} + // Adds Delegates (gomit.Delegator) for adding Runner handlers to on Start and // unregistration on Stop. func (r *runner) AddDelegates(delegates ...gomit.Delegator) { @@ -140,5 +164,45 @@ func (r *runner) stopPlugin(reason string, ap *availablePlugin) error { // Empty handler acting as placeholder until implementation. This helps tests // pass to ensure registration works. func (r *runner) HandleGomitEvent(e gomit.Event) { - // to do + + switch v := e.Body.(type) { + case *control_event.MetricSubscriptionEvent: + fmt.Println("runner") + fmt.Println(v.Namespace()) + fmt.Printf("Metric subscription (%s v%d)\n", strings.Join(v.MetricNamespace, "/"), v.Version) + + // Our logic here is simple for alpha. We should replace with parameter managed logic. + // + // 1. Get the loaded plugin for the subscription. + // 2. Check that at least one available plugin of that type is running + // 3. If not start one + + mt, err := r.metricCatalog.Get(v.MetricNamespace, v.Version) + if err != nil { + // log this error # TODO with logging + fmt.Println(err) + return + } + fmt.Printf("Plugin is (%s)\n", mt.Plugin.Key()) + + pool := r.availablePlugins.Collectors.GetPluginPool(mt.Plugin.Key()) + if pool != nil && pool.Count() >= MaximumRunningPlugins { + // if r.availablePlugins.Collectors.PluginPoolHasAP(mt.Plugin.Key()) { + fmt.Println("We have at least one running!") + return + } + + fmt.Println("No APs running!") + ePlugin, err := plugin.NewExecutablePlugin(r.pluginManager.GenerateArgs(), mt.Plugin.Path) + if err != nil { + fmt.Println(err) + } + ap, err := r.startPlugin(ePlugin) + if err != nil { + fmt.Println(err) + panic(err) + } + fmt.Println("NEW AP") + fmt.Println(ap) + } } diff --git a/core/cdata/node.go b/core/cdata/node.go index 2d1d9aadb..b9d80bcb7 100644 --- a/core/cdata/node.go +++ b/core/cdata/node.go @@ -1,6 +1,8 @@ package cdata import ( + "bytes" + "encoding/gob" "sync" "github.com/intelsdilabs/pulse/core/ctypes" @@ -13,6 +15,22 @@ type ConfigDataNode struct { table map[string]ctypes.ConfigValue } +func (c *ConfigDataNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(&c.table); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigDataNode) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + c.mutex = new(sync.Mutex) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.table) +} + // Returns a new and empty node. func NewNode() *ConfigDataNode { return &ConfigDataNode{ diff --git a/core/cdata/tree.go b/core/cdata/tree.go index 71d6b357c..74278f897 100644 --- a/core/cdata/tree.go +++ b/core/cdata/tree.go @@ -1,6 +1,9 @@ package cdata import ( + "bytes" + "encoding/gob" + "github.com/intelsdilabs/pulse/pkg/ctree" ) @@ -17,6 +20,21 @@ func NewTree() *ConfigDataTree { } } +func (c *ConfigDataTree) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.cTree); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigDataTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&c.cTree) +} + // Adds a ConfigDataNode at the provided namespace. func (c *ConfigDataTree) Add(ns []string, cdn *ConfigDataNode) { c.cTree.Add(ns, cdn) diff --git a/core/cdata/tree_test.go b/core/cdata/tree_test.go index f80f9e4d0..57d1e63b4 100644 --- a/core/cdata/tree_test.go +++ b/core/cdata/tree_test.go @@ -1,6 +1,8 @@ package cdata import ( + "encoding/gob" + "fmt" "testing" "github.com/intelsdilabs/pulse/core/ctypes" @@ -70,6 +72,35 @@ func TestConfigDataTree(t *testing.T) { So(t["i"].(ctypes.ConfigValueInt).Value, ShouldEqual, 1) So(t["f"].Type(), ShouldEqual, "float") So(t["f"].(ctypes.ConfigValueFloat).Value, ShouldEqual, 2.3) + + Convey("encode & decode", func() { + gob.Register(&ConfigDataNode{}) + gob.Register(ctypes.ConfigValueStr{}) + gob.Register(ctypes.ConfigValueInt{}) + gob.Register(ctypes.ConfigValueFloat{}) + buf, err := cdt.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + cdt2 := NewTree() + err = cdt2.GobDecode(buf) + So(err, ShouldBeNil) + So(cdt2.cTree, ShouldNotBeNil) + + a2 := cdt2.Get([]string{"1", "2"}) + So(a2, ShouldNotBeNil) + + t2 := a2.Table() + So(t2["s"].Type(), ShouldEqual, "string") + println(fmt.Sprintf("!!! %T\n %T\n", t2["s"], t["s"])) + So(t2["s"].(ctypes.ConfigValueStr).Value, ShouldEqual, "bar") + So(t2["x"].Type(), ShouldEqual, "string") + So(t2["x"].(ctypes.ConfigValueStr).Value, ShouldEqual, "wat") + So(t2["i"].Type(), ShouldEqual, "integer") + So(t2["i"].(ctypes.ConfigValueInt).Value, ShouldEqual, 1) + So(t2["f"].Type(), ShouldEqual, "float") + So(t2["f"].(ctypes.ConfigValueFloat).Value, ShouldEqual, 2.3) + + }) }) }) diff --git a/core/control_event/control_event.go b/core/control_event/control_event.go index 1bcd37058..5fa75ea94 100644 --- a/core/control_event/control_event.go +++ b/core/control_event/control_event.go @@ -1,9 +1,5 @@ package control_event -import ( - "github.com/intelsdilabs/pulse/control/plugin" -) - const ( PluginLoaded = "Control.PluginLoaded" PluginDisabled = "Control.PluginDisabled" @@ -16,21 +12,21 @@ const ( type LoadPluginEvent struct{} -func (e *LoadPluginEvent) Namespace() string { +func (e LoadPluginEvent) Namespace() string { return PluginLoaded } type UnloadPluginEvent struct { } -func (e *UnloadPluginEvent) Namespace() string { +func (e UnloadPluginEvent) Namespace() string { return PluginUnloaded } type DisabledPluginEvent struct { Name string Version int - Type plugin.PluginType + Type int Key string Index int } @@ -41,15 +37,16 @@ func (e *DisabledPluginEvent) Namespace() string { type SwapPluginsEvent struct{} -func (s *SwapPluginsEvent) Namespace() string { +func (s SwapPluginsEvent) Namespace() string { return PluginsSwapped } type MetricSubscriptionEvent struct { MetricNamespace []string + Version int } -func (se *MetricSubscriptionEvent) Namespace() string { +func (se MetricSubscriptionEvent) Namespace() string { return MetricSubscribed } @@ -57,16 +54,16 @@ type MetricUnsubscriptionEvent struct { MetricNamespace []string } -func (ue *MetricUnsubscriptionEvent) Namespace() string { +func (ue MetricUnsubscriptionEvent) Namespace() string { return MetricUnsubscribed } type HealthCheckFailedEvent struct { Name string Version int - Type plugin.PluginType + Type int } -func (hfe *HealthCheckFailedEvent) Namespace() string { +func (hfe HealthCheckFailedEvent) Namespace() string { return HealthCheckFailed } diff --git a/core/metric.go b/core/metric.go index f22d98733..a48ced1ad 100644 --- a/core/metric.go +++ b/core/metric.go @@ -1,11 +1,15 @@ package core -import "github.com/intelsdilabs/pulse/core/cdata" +import ( + "time" + + "github.com/intelsdilabs/pulse/core/cdata" +) type MetricType interface { Version() int Namespace() []string - LastAdvertisedTimestamp() int64 + LastAdvertisedTime() time.Time Config() *cdata.ConfigDataNode } diff --git a/pkg/ctree/tree.go b/pkg/ctree/tree.go index 595e1c3d7..536ccfa3c 100644 --- a/pkg/ctree/tree.go +++ b/pkg/ctree/tree.go @@ -2,6 +2,7 @@ package ctree import ( "bytes" + "encoding/gob" "fmt" "log" "sync" @@ -28,6 +29,28 @@ func (c *ConfigTree) log(s string) { } } +func (c *ConfigTree) GobEncode() ([]byte, error) { + //todo throw an error if not frozen + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(c.root); err != nil { + return nil, err + } + if err := encoder.Encode(c.freezeFlag); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (c *ConfigTree) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&c.root); err != nil { + return err + } + return decoder.Decode(&c.freezeFlag) +} + func (c *ConfigTree) Add(ns []string, inNode Node) { c.log(fmt.Sprintf("Adding %v at %s\n", inNode, ns)) c.mutex.Lock() @@ -142,6 +165,39 @@ type node struct { Node Node } +func (n *node) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(n.nodes); err != nil { + return nil, err + } + if err := encoder.Encode(n.keys); err != nil { + return nil, err + } + if err := encoder.Encode(n.keysBytes); err != nil { + return nil, err + } + if err := encoder.Encode(&n.Node); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (n *node) GobDecode(buf []byte) error { + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + if err := decoder.Decode(&n.nodes); err != nil { + return err + } + if err := decoder.Decode(&n.keys); err != nil { + return err + } + if err := decoder.Decode(&n.keysBytes); err != nil { + return err + } + return decoder.Decode(&n.Node) +} + func (n *node) setKeys(k []string) { n.keys = k n.keysBytes = nsToByteArray(n.keys) diff --git a/pkg/ctree/tree_test.go b/pkg/ctree/tree_test.go index 6d00d471e..2229a13a7 100644 --- a/pkg/ctree/tree_test.go +++ b/pkg/ctree/tree_test.go @@ -1,6 +1,8 @@ package ctree import ( + "bytes" + "encoding/gob" "fmt" "testing" @@ -16,6 +18,27 @@ func (d dummyNode) Merge(dn Node) Node { return d } +func (n *dummyNode) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + + if err := encoder.Encode(n.data); err != nil { + return nil, err + } + + return w.Bytes(), nil +} + +func (n *dummyNode) GobDecode(buf []byte) error { + if len(buf) == 0 { + //there is nothing to do + return nil + } + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + return decoder.Decode(&n.data) +} + func newDummyNode() *dummyNode { return new(dummyNode) } @@ -55,6 +78,21 @@ func TestConfigTree(t *testing.T) { g := c.Get([]string{"intel", "foo", "sdilabs", "joel", "dan", "nick", "justin", "sarah"}) So(g, ShouldNotBeNil) So(g.(dummyNode).data, ShouldResemble, "b/a") + + Convey("GobEncode/GobDecode", func() { + gob.Register(&dummyNode{}) + buf, err := c.GobEncode() + So(err, ShouldBeNil) + So(buf, ShouldNotBeNil) + c2 := New() + err = c2.GobDecode(buf) + So(err, ShouldBeNil) + So(c2.root, ShouldNotBeNil) + So(c2.root.keys, ShouldNotBeEmpty) + g2 := c2.Get([]string{"intel", "foo", "sdilabs", "joel", "dan", "nick", "justin", "sarah"}) + So(g2, ShouldNotBeNil) + So(g2.(dummyNode).data, ShouldResemble, "b/a") + }) }) Convey("single item ns", func() { diff --git a/plugin/collector/pulse-collector-dummy/dummy/dummy.go b/plugin/collector/pulse-collector-dummy/dummy/dummy.go index a1ff09b8c..15519560e 100644 --- a/plugin/collector/pulse-collector-dummy/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy/dummy/dummy.go @@ -1,9 +1,8 @@ package dummy import ( - "time" - "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -16,22 +15,27 @@ const ( type Dummy struct { } -func (f *Dummy) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - return nil +func (f *Dummy) CollectMetrics([]plugin.PluginMetricType) ([]plugin.PluginMetric, error) { + m := plugin.PluginMetric{Namespace_: []string{"intel", "dummy", "foo"}, Data_: 1} + ms := []plugin.PluginMetric{m} + return ms, nil } -func (f *Dummy) GetMetricTypes(_ plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - reply.MetricTypes = []*plugin.MetricType{ - plugin.NewMetricType([]string{"foo", "bar"}, time.Now().Unix()), - } - return nil +func (f *Dummy) GetMetricTypes() ([]plugin.PluginMetricType, error) { + m1 := plugin.NewPluginMetricType([]string{"intel", "dummy", "foo"}) + m2 := plugin.NewPluginMetricType([]string{"intel", "dummy", "bar"}) + return []plugin.PluginMetricType{*m1, *m2}, nil } func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type) } -func ConfigPolicy() *plugin.ConfigPolicy { - c := new(plugin.ConfigPolicy) +func ConfigPolicyTree() *cpolicy.ConfigPolicyTree { + c := cpolicy.NewTree() + rule, _ := cpolicy.NewStringRule("name", false, "bob") + p := cpolicy.NewPolicyNode() + p.Add(rule) + c.Add([]string{"intel", "dummy"}, p) return c } diff --git a/plugin/collector/pulse-collector-dummy/main.go b/plugin/collector/pulse-collector-dummy/main.go index 3ddfcec95..8307764cb 100644 --- a/plugin/collector/pulse-collector-dummy/main.go +++ b/plugin/collector/pulse-collector-dummy/main.go @@ -15,12 +15,12 @@ func main() { // the implementation satfiying plugin.CollectorPlugin // the collector configuration policy satifying plugin.ConfigRules - // Define default policy - policy := dummy.ConfigPolicy() + // Define default policy tree + policyTree := dummy.ConfigPolicyTree() // Define metadata about Plugin meta := dummy.Meta() // Start a collector - plugin.Start(meta, new(dummy.Dummy), policy, os.Args[1]) + plugin.Start(meta, new(dummy.Dummy), policyTree, os.Args[1]) } diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go index fe5f85123..77ee8fd25 100644 --- a/plugin/collector/pulse-collector-facter/facter/facter.go +++ b/plugin/collector/pulse-collector-facter/facter/facter.go @@ -2,6 +2,7 @@ package facter import ( "github.com/intelsdilabs/pulse/control/plugin" + "github.com/intelsdilabs/pulse/control/plugin/cpolicy" ) const ( @@ -13,20 +14,22 @@ const ( type Facter struct { } -func (f *Facter) GetMetricTypes(_ plugin.GetMetricTypesArgs, reply *plugin.GetMetricTypesReply) error { - //reply *[]*plugin.MetricType - return nil +func (f *Facter) CollectMetrics([]plugin.PluginMetricType) ([]plugin.PluginMetric, error) { + m := plugin.PluginMetric{Namespace_: []string{"intel", "facter", "foo"}, Data_: 1} + ms := []plugin.PluginMetric{m} + return ms, nil } -func (f *Facter) Collect(args plugin.CollectorArgs, reply *plugin.CollectorReply) error { - return nil +func (f *Facter) GetMetricTypes() ([]plugin.PluginMetricType, error) { + m := plugin.NewPluginMetricType([]string{"intel", "facter", "foo"}) + return []plugin.PluginMetricType{*m}, nil } func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type) } -func ConfigPolicy() *plugin.ConfigPolicy { - c := new(plugin.ConfigPolicy) +func ConfigPolicyTree() *cpolicy.ConfigPolicyTree { + c := cpolicy.NewTree() return c } diff --git a/plugin/collector/pulse-collector-facter/main.go b/plugin/collector/pulse-collector-facter/main.go index ec65701a4..d0804a040 100644 --- a/plugin/collector/pulse-collector-facter/main.go +++ b/plugin/collector/pulse-collector-facter/main.go @@ -16,12 +16,12 @@ func main() { // the collector configuration policy satifying plugin.ConfigRules // Define default policy - policy := facter.ConfigPolicy() + policyTree := facter.ConfigPolicyTree() // Define metadata about Plugin meta := facter.Meta() // Start a collector //plugin.StartCollector(meta, new(facter.Facter), policy, os.Args[0], os.Args[1]) - plugin.Start(meta, new(facter.Facter), policy, os.Args[1]) + plugin.Start(meta, new(facter.Facter), policyTree, os.Args[1]) } diff --git a/schedule/schedule.go b/schedule/schedule.go index 093ffb931..7ea5b1f30 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -4,7 +4,6 @@ import ( "errors" "time" - "github.com/intelsdilabs/pulse/control" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" ) @@ -47,7 +46,7 @@ type ScheduleResponse interface { // ManagesMetric is implemented by control // On startup a scheduler will be created and passed a reference to control type managesMetric interface { - SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, control.SubscriptionError) + SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) UnsubscribeMetricType(mt core.MetricType) } @@ -99,7 +98,7 @@ func (scheduler *scheduler) CreateTask(mts []core.MetricType, s Schedule, cdt *c //mtc = append(mtc, mt) subscriptions = append(subscriptions, mt) } else { - te.errs = append(te.errs, err.Errors()...) + te.errs = append(te.errs, err...) } } diff --git a/schedule/schedule_test.go b/schedule/schedule_test.go index 7ef0fc9e8..1918c4cc0 100644 --- a/schedule/schedule_test.go +++ b/schedule/schedule_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/intelsdilabs/pulse/control" "github.com/intelsdilabs/pulse/core" "github.com/intelsdilabs/pulse/core/cdata" "github.com/intelsdilabs/pulse/core/ctypes" @@ -19,16 +18,14 @@ type MockMetricManager struct { failuredSoFar int } -func (m *MockMetricManager) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, control.SubscriptionError) { +func (m *MockMetricManager) SubscribeMetricType(mt core.MetricType, cd *cdata.ConfigDataNode) (core.MetricType, []error) { if m.failValidatingMetrics { if m.failValidatingMetricsAfter > m.failuredSoFar { m.failuredSoFar++ return nil, nil } - return nil, &MockMetricManagerError{ - errs: []error{ - errors.New("metric validation error"), - }, + return nil, []error{ + errors.New("metric validation error"), } } return nil, nil @@ -42,15 +39,11 @@ type MockMetricManagerError struct { errs []error } -func (m *MockMetricManagerError) Errors() []error { - return m.errs -} - type MockMetricType struct { - version int - namespace []string - lastAdvertisedTimestamp int64 - config *cdata.ConfigDataNode + version int + namespace []string + lastAdvertisedTime time.Time + config *cdata.ConfigDataNode } func (m MockMetricType) Version() int { @@ -61,8 +54,8 @@ func (m MockMetricType) Namespace() []string { return m.namespace } -func (m MockMetricType) LastAdvertisedTimestamp() int64 { - return m.lastAdvertisedTimestamp +func (m MockMetricType) LastAdvertisedTime() time.Time { + return m.lastAdvertisedTime } func (m MockMetricType) Config() *cdata.ConfigDataNode { @@ -88,19 +81,19 @@ func TestScheduler(t *testing.T) { } mt := []core.MetricType{ &MockMetricType{ - namespace: []string{"foo", "bar"}, - version: 1, - lastAdvertisedTimestamp: 0, + namespace: []string{"foo", "bar"}, + version: 1, + lastAdvertisedTime: time.Now(), }, &MockMetricType{ - namespace: []string{"foo2", "bar2"}, - version: 1, - lastAdvertisedTimestamp: 0, + namespace: []string{"foo2", "bar2"}, + version: 1, + lastAdvertisedTime: time.Now(), }, &MockMetricType{ - namespace: []string{"foo2", "bar2"}, - version: 1, - lastAdvertisedTimestamp: 0, + namespace: []string{"foo2", "bar2"}, + version: 1, + lastAdvertisedTime: time.Now(), }, } scheduler := New(1, 5)