Skip to content

Commit

Permalink
(SDI-1827): Fix intelsdi-x#1128 Loading collector twice causes task f…
Browse files Browse the repository at this point in the history
…ailure

Proper copy of config policy

Deep copy of node rules

removed rules slice allocation and index
  • Loading branch information
marcin-krolik authored and jcooklin committed Nov 17, 2016
1 parent df1e290 commit 710751c
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 33 deletions.
22 changes: 11 additions & 11 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type catalogsMetrics interface {
Keys() []string
Subscribe([]string, int) error
Unsubscribe([]string, int) error
GetPlugin(core.Namespace, int) (*loadedPlugin, error)
GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error)
}

type managesSigning interface {
Expand Down Expand Up @@ -715,26 +715,26 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric

// apply defaults to the metric that may be present in the plugins
// configpolicy
if pluginCfg := mt.Plugin.ConfigPolicy.Get(mt.Namespace().Strings()); pluginCfg != nil {
if pluginCfg := mt.Plugin.Policy().Get(mt.Namespace().Strings()); pluginCfg != nil {
mt.config.ApplyDefaults(pluginCfg.Defaults())
}

// loaded plugin which exposes the metric
lp := mt.Plugin
key := lp.Key()
// cataloged plugin which exposes the metric
cp := mt.Plugin
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())

// groups metricTypes by a plugin.Key()
pmt, _ := newMetricsGroupedByPlugin[key]

// pmt (plugin-metric-type) contains plugin and metrics types grouped to this plugin
pmt.plugin = lp
pmt.plugin = cp
pmt.metricTypes = append(pmt.metricTypes, mt)
newMetricsGroupedByPlugin[key] = pmt

plugin := subscribedPlugin{
name: lp.Name(),
typeName: lp.TypeName(),
version: lp.Version(),
name: cp.Name(),
typeName: cp.TypeName(),
version: cp.Version(),
config: cdata.NewNode(),
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode {

// just a tuple of loadedPlugin and metricType slice
type metricTypes struct {
plugin *loadedPlugin
plugin core.CatalogedPlugin
metricTypes []core.Metric
}

Expand All @@ -1058,7 +1058,7 @@ func (mts metricTypes) Metrics() []core.Metric {
return mts.metricTypes
}

func (mts metricTypes) Plugin() *loadedPlugin {
func (mts metricTypes) Plugin() core.CatalogedPlugin {
return mts.plugin
}

Expand Down
2 changes: 1 addition & 1 deletion control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) {
return nil, nil
}

func (m *mc) GetPlugin(core.Namespace, int) (*loadedPlugin, error) {
func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) {
return nil, nil
}

Expand Down
108 changes: 96 additions & 12 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *metricCatalogItem) Versions() map[int]core.Metric {
}

type metricType struct {
Plugin *loadedPlugin
Plugin core.CatalogedPlugin
namespace core.Namespace
version int
lastAdvertisedTime time.Time
Expand Down Expand Up @@ -163,12 +163,24 @@ func (m *metric) Version() int {
return m.version
}

func (m *metric) Data() interface{} { return nil }
func (m *metric) Description() string { return "" }
func (m *metric) Unit() string { return "" }
func (m *metric) Tags() map[string]string { return nil }
func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) }
func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) }
func (m *metric) Data() interface{} {
return nil
}
func (m *metric) Description() string {
return ""
}
func (m *metric) Unit() string {
return ""
}
func (m *metric) Tags() map[string]string {
return nil
}
func (m *metric) LastAdvertisedTime() time.Time {
return time.Unix(0, 0)
}
func (m *metric) Timestamp() time.Time {
return time.Unix(0, 0)
}

type processesConfigData interface {
Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors)
Expand Down Expand Up @@ -250,6 +262,78 @@ func (m *metricType) Unit() string {
return m.unit
}

type catalogedPlugin struct {
name string
version int
signed bool
typeName plugin.PluginType
state pluginState
path string
loadedTime time.Time
configPolicy *cpolicy.ConfigPolicy
}

func (cp *catalogedPlugin) TypeName() string {
return cp.typeName.String()
}

func (cp *catalogedPlugin) Name() string {
return cp.name
}

func (cp *catalogedPlugin) Version() int {
return cp.version
}

func (cp *catalogedPlugin) IsSigned() bool {
return cp.signed
}

func (cp *catalogedPlugin) Status() string {
return string(cp.state)
}

func (cp *catalogedPlugin) PluginPath() string {
return cp.path
}

func (cp *catalogedPlugin) LoadedTimestamp() *time.Time {
return &cp.loadedTime
}

func (cp *catalogedPlugin) Policy() *cpolicy.ConfigPolicy {
return cp.configPolicy
}

func newCatalogedPlugin(lp *loadedPlugin) core.CatalogedPlugin {
cp := cpolicy.New()
for _, keyNode := range lp.Policy().GetAll() {
node := cpolicy.NewPolicyNode()
rules, err := keyNode.ConfigPolicyNode.CopyRules()
if err != nil {
log.WithFields(log.Fields{
"_module": "control",
"_file": "metrics.go,",
"_block": "newCatalogedPlugin",
"error": err.Error(),
}).Error("Unable to copy rules")
return nil
}
node.Add(rules...)
cp.Add(keyNode.Key, node)
}
return &catalogedPlugin{
name: lp.Name(),
version: lp.Version(),
signed: lp.IsSigned(),
typeName: lp.Type,
state: lp.State,
path: lp.PluginPath(),
loadedTime: lp.LoadedTime,
configPolicy: cp,
}
}

type metricCatalog struct {
tree *MTTrie
mutex *sync.Mutex
Expand Down Expand Up @@ -288,8 +372,9 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e
}).Error("error adding loaded metric type")
return err
}

newMt := metricType{
Plugin: lp,
Plugin: newCatalogedPlugin(lp),
namespace: mt.Namespace(),
version: mt.Version(),
lastAdvertisedTime: mt.LastAdvertisedTime(),
Expand Down Expand Up @@ -326,7 +411,6 @@ func (mc *metricCatalog) Add(m *metricType) {

// adding key as a cataloged keys (mc.keys)
mc.keys = appendIfMissing(mc.keys, key)

mc.tree.Add(m)
}

Expand Down Expand Up @@ -366,7 +450,7 @@ func (mc *metricCatalog) GetMetric(requested core.Namespace, version int) (*metr
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -414,7 +498,7 @@ func (mc *metricCatalog) GetMetrics(requested core.Namespace, version int) ([]*m
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -510,7 +594,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error {
return m.Unsubscribe()
}

func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (*loadedPlugin, error) {
func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedPlugin, error) {
mt, err := mc.tree.GetMetric(mns.Strings(), ver)
if err != nil {
log.WithFields(log.Fields{
Expand Down
11 changes: 8 additions & 3 deletions control/mttrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"sort"
"strings"

"github.com/intelsdi-x/snap/core"
)

/*
Expand Down Expand Up @@ -72,7 +74,8 @@ func NewMTTrie() *MTTrie {
func (m *MTTrie) String() string {
out := ""
for _, mt := range m.gatherMetricTypes() {
out += fmt.Sprintf("%s => %s\n", mt.Key(), mt.Plugin.Key())
pluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
out += fmt.Sprintf("%s => %s\n", mt.Key(), pluginKey)
}
return out
}
Expand All @@ -92,9 +95,11 @@ func (m *MTTrie) gatherMetricTypes() []metricType {
}

// DeleteByPlugin removes all metrics from the catalog if they match a loadedPlugin
func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) {
func (m *MTTrie) DeleteByPlugin(cp core.CatalogedPlugin) {
for _, mt := range m.gatherMetricTypes() {
if mt.Plugin.Key() == lp.Key() {
mtPluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
cpKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())
if mtPluginKey == cpKey {
// remove this metric
m.RemoveMetric(mt)
}
Expand Down
48 changes: 48 additions & 0 deletions control/plugin/cpolicy/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ func NewPolicyNode() *ConfigPolicyNode {
}
}

func (c *ConfigPolicyNode) CopyRules() ([]Rule, error) {
rules := []Rule{}
for _, rule := range c.rules {
var err error
switch rule.(type) {
case *BoolRule:
var newBoolRule *BoolRule
if rule.Default() != nil {
newBoolRule, err = NewBoolRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueBool).Value)
} else {
newBoolRule, err = NewBoolRule(rule.Key(), rule.Required())
}
rules = append(rules, newBoolRule)
case *StringRule:
var newStringRule *StringRule
if rule.Default() != nil {
newStringRule, err = NewStringRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueStr).Value)
} else {
newStringRule, err = NewStringRule(rule.Key(), rule.Required())
}
rules = append(rules, newStringRule)
case *FloatRule:
var newFloatRule *FloatRule
if rule.Default() != nil {
newFloatRule, err = NewFloatRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueFloat).Value)
} else {
newFloatRule, err = NewFloatRule(rule.Key(), rule.Required())
}
rules = append(rules, newFloatRule)
case *IntRule:
var newIntRule *IntRule
if rule.Default() != nil {
newIntRule, err = NewIntegerRule(rule.Key(), rule.Required(), rule.Default().(ctypes.ConfigValueInt).Value)
} else {
newIntRule, err = NewIntegerRule(rule.Key(), rule.Required())
}
rules = append(rules, newIntRule)
default:
return []Rule{}, errors.New(fmt.Sprint("Unknown rule type"))
}

if err != nil {
return []Rule{}, errors.New(fmt.Sprintf("Could not create rule %s type %s ", rule.Key(), rule.Type()))
}
}
return rules, nil
}

// UnmarshalJSON unmarshals JSON into a ConfigPolicyNode
func (c *ConfigPolicyNode) UnmarshalJSON(data []byte) error {
m := map[string]interface{}{}
Expand Down
23 changes: 17 additions & 6 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,18 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}).Error("load plugin error when starting plugin")
return nil, serror.New(err)
}

ePlugin.SetName(resp.Meta.Name)

key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
if _, exists := p.loadedPlugins.table[key]; exists {
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
"plugin-name": resp.Meta.Name,
"plugin-version": resp.Meta.Version,
"plugin-type": resp.Type.String(),
})
}

ap, err := newAvailablePlugin(resp, emitter, ePlugin)
if err != nil {
pmLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -371,7 +382,13 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}).Error("error in getting config policy")
return nil, serror.New(err)
}

lPlugin.ConfigPolicy = cp
lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

if resp.Type == plugin.CollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)
Expand Down Expand Up @@ -503,12 +520,6 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(e)
}

lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

aErr := p.loadedPlugins.add(lPlugin)
if aErr != nil {
pmLogger.WithFields(log.Fields{
Expand Down

0 comments on commit 710751c

Please sign in to comment.