diff --git a/control/control.go b/control/control.go index 5b7a668cf..8cafb0055 100644 --- a/control/control.go +++ b/control/control.go @@ -141,7 +141,7 @@ type catalogsMetrics interface { Keys() []string Subscribe([]string, int) error Unsubscribe([]string, int) error - GetPlugin(core.Namespace, int) (*loadedPlugin, error) + GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) } type managesSigning interface { @@ -715,26 +715,26 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric // apply defaults to the metric that may be present in the plugins // configpolicy - if pluginCfg := mt.Plugin.ConfigPolicy.Get(mt.Namespace().Strings()); pluginCfg != nil { + if pluginCfg := mt.Plugin.Policy().Get(mt.Namespace().Strings()); pluginCfg != nil { mt.config.ApplyDefaults(pluginCfg.Defaults()) } - // loaded plugin which exposes the metric - lp := mt.Plugin - key := lp.Key() + // cataloged plugin which exposes the metric + cp := mt.Plugin + key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version()) // groups metricTypes by a plugin.Key() pmt, _ := newMetricsGroupedByPlugin[key] // pmt (plugin-metric-type) contains plugin and metrics types grouped to this plugin - pmt.plugin = lp + pmt.plugin = cp pmt.metricTypes = append(pmt.metricTypes, mt) newMetricsGroupedByPlugin[key] = pmt plugin := subscribedPlugin{ - name: lp.Name(), - typeName: lp.TypeName(), - version: lp.Version(), + name: cp.Name(), + typeName: cp.TypeName(), + version: cp.Version(), config: cdata.NewNode(), } @@ -1046,7 +1046,7 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode { // just a tuple of loadedPlugin and metricType slice type metricTypes struct { - plugin *loadedPlugin + plugin core.CatalogedPlugin metricTypes []core.Metric } @@ -1058,7 +1058,7 @@ func (mts metricTypes) Metrics() []core.Metric { return mts.metricTypes } -func (mts metricTypes) Plugin() *loadedPlugin { +func (mts metricTypes) Plugin() core.CatalogedPlugin { return mts.plugin } diff --git a/control/control_test.go b/control/control_test.go index 71d9cf4f6..dc69b17a6 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -680,7 +680,7 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { return nil, nil } -func (m *mc) GetPlugin(core.Namespace, int) (*loadedPlugin, error) { +func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) { return nil, nil } diff --git a/control/metrics.go b/control/metrics.go index 3bea73456..2d5c6a997 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -131,7 +131,7 @@ func (m *metricCatalogItem) Versions() map[int]core.Metric { } type metricType struct { - Plugin *loadedPlugin + Plugin core.CatalogedPlugin namespace core.Namespace version int lastAdvertisedTime time.Time @@ -163,12 +163,24 @@ func (m *metric) Version() int { return m.version } -func (m *metric) Data() interface{} { return nil } -func (m *metric) Description() string { return "" } -func (m *metric) Unit() string { return "" } -func (m *metric) Tags() map[string]string { return nil } -func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) } -func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) } +func (m *metric) Data() interface{} { + return nil +} +func (m *metric) Description() string { + return "" +} +func (m *metric) Unit() string { + return "" +} +func (m *metric) Tags() map[string]string { + return nil +} +func (m *metric) LastAdvertisedTime() time.Time { + return time.Unix(0, 0) +} +func (m *metric) Timestamp() time.Time { + return time.Unix(0, 0) +} type processesConfigData interface { Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors) @@ -250,6 +262,78 @@ func (m *metricType) Unit() string { return m.unit } +type catalogedPlugin struct { + name string + version int + signed bool + typeName plugin.PluginType + state pluginState + path string + loadedTime time.Time + configPolicy *cpolicy.ConfigPolicy +} + +func (cp *catalogedPlugin) TypeName() string { + return cp.typeName.String() +} + +func (cp *catalogedPlugin) Name() string { + return cp.name +} + +func (cp *catalogedPlugin) Version() int { + return cp.version +} + +func (cp *catalogedPlugin) IsSigned() bool { + return cp.signed +} + +func (cp *catalogedPlugin) Status() string { + return string(cp.state) +} + +func (cp *catalogedPlugin) PluginPath() string { + return cp.path +} + +func (cp *catalogedPlugin) LoadedTimestamp() *time.Time { + return &cp.loadedTime +} + +func (cp *catalogedPlugin) Policy() *cpolicy.ConfigPolicy { + return cp.configPolicy +} + +func newCatalogedPlugin(lp *loadedPlugin) core.CatalogedPlugin { + cp := cpolicy.New() + for _, keyNode := range lp.Policy().GetAll() { + node := cpolicy.NewPolicyNode() + rules, err := keyNode.ConfigPolicyNode.CopyRules() + if err != nil { + log.WithFields(log.Fields{ + "_module": "control", + "_file": "metrics.go,", + "_block": "newCatalogedPlugin", + "error": err.Error(), + }).Error("Unable to copy rules") + return nil + } + node.Add(rules...) + cp.Add(keyNode.Key, node) + } + return &catalogedPlugin{ + name: lp.Name(), + version: lp.Version(), + signed: lp.IsSigned(), + typeName: lp.Type, + state: lp.State, + path: lp.PluginPath(), + loadedTime: lp.LoadedTime, + configPolicy: cp, + } +} + type metricCatalog struct { tree *MTTrie mutex *sync.Mutex @@ -288,8 +372,9 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e }).Error("error adding loaded metric type") return err } + newMt := metricType{ - Plugin: lp, + Plugin: newCatalogedPlugin(lp), namespace: mt.Namespace(), version: mt.Version(), lastAdvertisedTime: mt.LastAdvertisedTime(), @@ -326,7 +411,6 @@ func (mc *metricCatalog) Add(m *metricType) { // adding key as a cataloged keys (mc.keys) mc.keys = appendIfMissing(mc.keys, key) - mc.tree.Add(m) } @@ -366,7 +450,7 @@ func (mc *metricCatalog) GetMetric(requested core.Namespace, version int) (*metr version: catalogedmt.Version(), lastAdvertisedTime: catalogedmt.LastAdvertisedTime(), tags: catalogedmt.Tags(), - policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()), + policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()), config: catalogedmt.Config(), unit: catalogedmt.Unit(), description: catalogedmt.Description(), @@ -414,7 +498,7 @@ func (mc *metricCatalog) GetMetrics(requested core.Namespace, version int) ([]*m version: catalogedmt.Version(), lastAdvertisedTime: catalogedmt.LastAdvertisedTime(), tags: catalogedmt.Tags(), - policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()), + policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()), config: catalogedmt.Config(), unit: catalogedmt.Unit(), description: catalogedmt.Description(), @@ -510,7 +594,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error { return m.Unsubscribe() } -func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (*loadedPlugin, error) { +func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedPlugin, error) { mt, err := mc.tree.GetMetric(mns.Strings(), ver) if err != nil { log.WithFields(log.Fields{ diff --git a/control/mttrie.go b/control/mttrie.go index d17625570..a6cdbd15d 100644 --- a/control/mttrie.go +++ b/control/mttrie.go @@ -23,6 +23,8 @@ import ( "fmt" "sort" "strings" + + "github.com/intelsdi-x/snap/core" ) /* @@ -72,7 +74,8 @@ func NewMTTrie() *MTTrie { func (m *MTTrie) String() string { out := "" for _, mt := range m.gatherMetricTypes() { - out += fmt.Sprintf("%s => %s\n", mt.Key(), mt.Plugin.Key()) + pluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version()) + out += fmt.Sprintf("%s => %s\n", mt.Key(), pluginKey) } return out } @@ -92,9 +95,11 @@ func (m *MTTrie) gatherMetricTypes() []metricType { } // DeleteByPlugin removes all metrics from the catalog if they match a loadedPlugin -func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) { +func (m *MTTrie) DeleteByPlugin(cp core.CatalogedPlugin) { for _, mt := range m.gatherMetricTypes() { - if mt.Plugin.Key() == lp.Key() { + mtPluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version()) + cpKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version()) + if mtPluginKey == cpKey { // remove this metric m.RemoveMetric(mt) } diff --git a/control/plugin/cpolicy/node.go b/control/plugin/cpolicy/node.go index 9a2df01de..cf096776a 100644 --- a/control/plugin/cpolicy/node.go +++ b/control/plugin/cpolicy/node.go @@ -71,6 +71,54 @@ func NewPolicyNode() *ConfigPolicyNode { } } +func (c *ConfigPolicyNode) CopyRules() ([]Rule, error) { + rules := []Rule{} + for _, rule := range c.rules { + var err error + switch rule.(type) { + case *BoolRule: + var newBoolRule *BoolRule + if rule.Default() != nil { + newBoolRule, err = NewBoolRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueBool).Value) + } else { + newBoolRule, err = NewBoolRule(rule.Key(), rule.Required()) + } + rules = append(rules, newBoolRule) + case *StringRule: + var newStringRule *StringRule + if rule.Default() != nil { + newStringRule, err = NewStringRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueStr).Value) + } else { + newStringRule, err = NewStringRule(rule.Key(), rule.Required()) + } + rules = append(rules, newStringRule) + case *FloatRule: + var newFloatRule *FloatRule + if rule.Default() != nil { + newFloatRule, err = NewFloatRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueFloat).Value) + } else { + newFloatRule, err = NewFloatRule(rule.Key(), rule.Required()) + } + rules = append(rules, newFloatRule) + case *IntRule: + var newIntRule *IntRule + if rule.Default() != nil { + newIntRule, err = NewIntegerRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueInt).Value) + } else { + newIntRule, err = NewIntegerRule(rule.Key(), rule.Required()) + } + rules = append(rules, newIntRule) + default: + return []Rule{}, errors.New(fmt.Sprint("Unknown rule type")) + } + + if err != nil { + return []Rule{}, errors.New(fmt.Sprintf("Could not create rule %s type %s ", rule.Key(), rule.Type())) + } + } + return rules, nil +} + // UnmarshalJSON unmarshals JSON into a ConfigPolicyNode func (c *ConfigPolicyNode) UnmarshalJSON(data []byte) error { m := map[string]interface{}{} diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 50091f90a..af90603e9 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -322,6 +322,15 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter return nil, serror.New(err) } + key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version) + if _, exists := p.loadedPlugins.table[key]; exists { + return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{ + "plugin-name": resp.Meta.Name, + "plugin-version": resp.Meta.Version, + "plugin-type": resp.Type.String(), + }) + } + ap, err := newAvailablePlugin(resp, emitter, ePlugin) if err != nil { pmLogger.WithFields(log.Fields{ @@ -362,7 +371,13 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter }).Error("error in getting config policy") return nil, serror.New(err) } + lPlugin.ConfigPolicy = cp + lPlugin.Meta = resp.Meta + lPlugin.Type = resp.Type + lPlugin.Token = resp.Token + lPlugin.LoadedTime = time.Now() + lPlugin.State = LoadedState if resp.Type == plugin.CollectorPluginType { cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version) @@ -494,12 +509,6 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter return nil, serror.New(e) } - lPlugin.Meta = resp.Meta - lPlugin.Type = resp.Type - lPlugin.Token = resp.Token - lPlugin.LoadedTime = time.Now() - lPlugin.State = LoadedState - aErr := p.loadedPlugins.add(lPlugin) if aErr != nil { pmLogger.WithFields(log.Fields{