Skip to content

Commit

Permalink
(SDI-1827): Fix intelsdi-x#1128 Loading collector twice causes task f…
Browse files Browse the repository at this point in the history
…ailure
  • Loading branch information
marcin-krolik committed Oct 28, 2016
1 parent ca32c9a commit 0c63595
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 33 deletions.
22 changes: 11 additions & 11 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type catalogsMetrics interface {
Keys() []string
Subscribe([]string, int) error
Unsubscribe([]string, int) error
GetPlugin(core.Namespace, int) (*loadedPlugin, error)
GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error)
}

type managesSigning interface {
Expand Down Expand Up @@ -715,26 +715,26 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric

// apply defaults to the metric that may be present in the plugins
// configpolicy
if pluginCfg := mt.Plugin.ConfigPolicy.Get(mt.Namespace().Strings()); pluginCfg != nil {
if pluginCfg := mt.Plugin.Policy().Get(mt.Namespace().Strings()); pluginCfg != nil {
mt.config.ApplyDefaults(pluginCfg.Defaults())
}

// loaded plugin which exposes the metric
lp := mt.Plugin
key := lp.Key()
// cataloged plugin which exposes the metric
cp := mt.Plugin
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())

// groups metricTypes by a plugin.Key()
pmt, _ := newMetricsGroupedByPlugin[key]

// pmt (plugin-metric-type) contains plugin and metrics types grouped to this plugin
pmt.plugin = lp
pmt.plugin = cp
pmt.metricTypes = append(pmt.metricTypes, mt)
newMetricsGroupedByPlugin[key] = pmt

plugin := subscribedPlugin{
name: lp.Name(),
typeName: lp.TypeName(),
version: lp.Version(),
name: cp.Name(),
typeName: cp.TypeName(),
version: cp.Version(),
config: cdata.NewNode(),
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode {

// just a tuple of loadedPlugin and metricType slice
type metricTypes struct {
plugin *loadedPlugin
plugin core.CatalogedPlugin
metricTypes []core.Metric
}

Expand All @@ -1058,7 +1058,7 @@ func (mts metricTypes) Metrics() []core.Metric {
return mts.metricTypes
}

func (mts metricTypes) Plugin() *loadedPlugin {
func (mts metricTypes) Plugin() core.CatalogedPlugin {
return mts.plugin
}

Expand Down
2 changes: 1 addition & 1 deletion control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) {
return nil, nil
}

func (m *mc) GetPlugin(core.Namespace, int) (*loadedPlugin, error) {
func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) {
return nil, nil
}

Expand Down
92 changes: 80 additions & 12 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *metricCatalogItem) Versions() map[int]core.Metric {
}

type metricType struct {
Plugin *loadedPlugin
Plugin core.CatalogedPlugin
namespace core.Namespace
version int
lastAdvertisedTime time.Time
Expand Down Expand Up @@ -163,12 +163,24 @@ func (m *metric) Version() int {
return m.version
}

func (m *metric) Data() interface{} { return nil }
func (m *metric) Description() string { return "" }
func (m *metric) Unit() string { return "" }
func (m *metric) Tags() map[string]string { return nil }
func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) }
func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) }
func (m *metric) Data() interface{} {
return nil
}
func (m *metric) Description() string {
return ""
}
func (m *metric) Unit() string {
return ""
}
func (m *metric) Tags() map[string]string {
return nil
}
func (m *metric) LastAdvertisedTime() time.Time {
return time.Unix(0, 0)
}
func (m *metric) Timestamp() time.Time {
return time.Unix(0, 0)
}

type processesConfigData interface {
Process(map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *cpolicy.ProcessingErrors)
Expand Down Expand Up @@ -250,6 +262,62 @@ func (m *metricType) Unit() string {
return m.unit
}

type catalogedPlugin struct {
name string
version int
signed bool
typeName plugin.PluginType
state pluginState
path string
loadedTime time.Time
configPolicy cpolicy.ConfigPolicy
}

func (cp *catalogedPlugin) TypeName() string {
return cp.typeName.String()
}

func (cp *catalogedPlugin) Name() string {
return cp.name
}

func (cp *catalogedPlugin) Version() int {
return cp.version
}

func (cp *catalogedPlugin) IsSigned() bool {
return cp.signed
}

func (cp *catalogedPlugin) Status() string {
return string(cp.state)
}

func (cp *catalogedPlugin) PluginPath() string {
return cp.path
}

func (cp *catalogedPlugin) LoadedTimestamp() *time.Time {
return &cp.loadedTime
}

func (cp *catalogedPlugin) Policy() *cpolicy.ConfigPolicy {
return &cp.configPolicy
}

func newCatalogedPlugin(lp *loadedPlugin) core.CatalogedPlugin {
return &catalogedPlugin{
name: lp.Name(),
version: lp.Version(),
signed: lp.IsSigned(),
typeName: lp.Type,
state: lp.State,
path: lp.PluginPath(),
loadedTime: lp.LoadedTime,
configPolicy: *lp.Policy(),
}
}

type metricCatalog struct {
tree *MTTrie
mutex *sync.Mutex
Expand Down Expand Up @@ -288,8 +356,9 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e
}).Error("error adding loaded metric type")
return err
}

newMt := metricType{
Plugin: lp,
Plugin: newCatalogedPlugin(lp),
namespace: mt.Namespace(),
version: mt.Version(),
lastAdvertisedTime: mt.LastAdvertisedTime(),
Expand Down Expand Up @@ -326,7 +395,6 @@ func (mc *metricCatalog) Add(m *metricType) {

// adding key as a cataloged keys (mc.keys)
mc.keys = appendIfMissing(mc.keys, key)

mc.tree.Add(m)
}

Expand Down Expand Up @@ -366,7 +434,7 @@ func (mc *metricCatalog) GetMetric(requested core.Namespace, version int) (*metr
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -414,7 +482,7 @@ func (mc *metricCatalog) GetMetrics(requested core.Namespace, version int) ([]*m
version: catalogedmt.Version(),
lastAdvertisedTime: catalogedmt.LastAdvertisedTime(),
tags: catalogedmt.Tags(),
policy: catalogedmt.Plugin.ConfigPolicy.Get(catalogedmt.Namespace().Strings()),
policy: catalogedmt.Plugin.Policy().Get(catalogedmt.Namespace().Strings()),
config: catalogedmt.Config(),
unit: catalogedmt.Unit(),
description: catalogedmt.Description(),
Expand Down Expand Up @@ -510,7 +578,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error {
return m.Unsubscribe()
}

func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (*loadedPlugin, error) {
func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedPlugin, error) {
mt, err := mc.tree.GetMetric(mns.Strings(), ver)
if err != nil {
log.WithFields(log.Fields{
Expand Down
11 changes: 8 additions & 3 deletions control/mttrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"sort"
"strings"

"github.com/intelsdi-x/snap/core"
)

/*
Expand Down Expand Up @@ -72,7 +74,8 @@ func NewMTTrie() *MTTrie {
func (m *MTTrie) String() string {
out := ""
for _, mt := range m.gatherMetricTypes() {
out += fmt.Sprintf("%s => %s\n", mt.Key(), mt.Plugin.Key())
pluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
out += fmt.Sprintf("%s => %s\n", mt.Key(), pluginKey)
}
return out
}
Expand All @@ -92,9 +95,11 @@ func (m *MTTrie) gatherMetricTypes() []metricType {
}

// DeleteByPlugin removes all metrics from the catalog if they match a loadedPlugin
func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) {
func (m *MTTrie) DeleteByPlugin(cp core.CatalogedPlugin) {
for _, mt := range m.gatherMetricTypes() {
if mt.Plugin.Key() == lp.Key() {
mtPluginKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", mt.Plugin.TypeName(), mt.Plugin.Name(), mt.Plugin.Version())
cpKey := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())
if mtPluginKey == cpKey {
// remove this metric
m.RemoveMetric(mt)
}
Expand Down
21 changes: 15 additions & 6 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(err)
}

key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
if _, exists := p.loadedPlugins.table[key]; exists {
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
"plugin-name": resp.Meta.Name,
"plugin-version": resp.Meta.Version,
"plugin-type": resp.Type.String(),
})
}

ap, err := newAvailablePlugin(resp, emitter, ePlugin)
if err != nil {
pmLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -354,7 +363,13 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}).Error("error in getting config policy")
return nil, serror.New(err)
}

lPlugin.ConfigPolicy = cp
lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

if resp.Type == plugin.CollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)
Expand Down Expand Up @@ -486,12 +501,6 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(e)
}

lPlugin.Meta = resp.Meta
lPlugin.Type = resp.Type
lPlugin.Token = resp.Token
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

aErr := p.loadedPlugins.add(lPlugin)
if aErr != nil {
pmLogger.WithFields(log.Fields{
Expand Down

0 comments on commit 0c63595

Please sign in to comment.