Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #61 from lynxbat/SDI-98
Browse files Browse the repository at this point in the history
SDI-98
  • Loading branch information
pittma committed Mar 16, 2015
2 parents 6d61c16 + fa21543 commit a21fa58
Show file tree
Hide file tree
Showing 47 changed files with 1,473 additions and 514 deletions.
157 changes: 124 additions & 33 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/intelsdilabs/gomit"
"github.com/intelsdilabs/pulse/control/plugin"
"github.com/intelsdilabs/pulse/control/plugin/client"
"github.com/intelsdilabs/pulse/control/routing"
"github.com/intelsdilabs/pulse/core/control_event"
)

Expand All @@ -30,6 +31,8 @@ type availablePlugin struct {
Client client.PluginClient
Index int

hitCount int
lastHitTime time.Time
eventManager *gomit.EventController
failedHealthChecks int
healthChan chan error
Expand All @@ -45,12 +48,13 @@ func newAvailablePlugin(resp *plugin.Response) (*availablePlugin, error) {

eventManager: new(gomit.EventController),
healthChan: make(chan error, 1),
lastHitTime: time.Now(),
}

// Create RPC Client
switch resp.Type {
case plugin.CollectorPluginType:
c, e := client.NewCollectorClient(resp.ListenAddress, DefaultClientTimeout)
c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout)
ap.Client = c
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
Expand Down Expand Up @@ -94,7 +98,7 @@ func (ap *availablePlugin) healthCheckFailed() {
pde := &control_event.DisabledPluginEvent{
Name: ap.Name,
Version: ap.Version,
Type: ap.Type,
Type: int(ap.Type),
Key: ap.Key,
Index: ap.Index,
}
Expand All @@ -103,11 +107,19 @@ func (ap *availablePlugin) healthCheckFailed() {
hcfe := &control_event.HealthCheckFailedEvent{
Name: ap.Name,
Version: ap.Version,
Type: ap.Type,
Type: int(ap.Type),
}
defer ap.eventManager.Emit(hcfe)
}

func (a *availablePlugin) HitCount() int {
return a.hitCount
}

func (a *availablePlugin) LastHit() time.Time {
return a.lastHitTime
}

// makeKey creates the ap.Key from the ap.Name and ap.Version
func (ap *availablePlugin) makeKey() {
s := []string{ap.Name, strconv.Itoa(ap.Version)}
Expand All @@ -116,15 +128,15 @@ func (ap *availablePlugin) makeKey() {

// apCollection is a collection of availablePlugin
type apCollection struct {
table *map[string]*[]*availablePlugin
table *map[string]*availablePluginPool
mutex *sync.Mutex
keys *[]string
currentIter int
}

// newAPCollection returns an apCollection capable of storing availblePlugin
func newAPCollection() *apCollection {
m := make(map[string]*[]*availablePlugin)
m := make(map[string]*availablePluginPool)
var k []string
return &apCollection{
table: &m,
Expand All @@ -134,14 +146,32 @@ func newAPCollection() *apCollection {
}
}

func (c *apCollection) GetPluginPool(key string) *availablePluginPool {
c.Lock()
defer c.Unlock()

if ap, ok := (*c.table)[key]; ok {
return ap
}
return nil
}

func (c *apCollection) PluginPoolHasAP(key string) bool {
a := c.GetPluginPool(key)
if a != nil && a.Count() > 0 {
return true
}
return false
}

// Table returns a copy of the apCollection table
func (c *apCollection) Table() map[string][]*availablePlugin {
c.Lock()
defer c.Unlock()

m := make(map[string][]*availablePlugin)
for k, v := range *c.table {
m[k] = *v
m[k] = *v.Plugins
}
return m
}
Expand All @@ -157,39 +187,27 @@ func (c *apCollection) Add(ap *availablePlugin) error {

if (*c.table)[ap.Key] != nil {
// make sure we don't already have a pointer to this plugin in the table
if exist, i := c.exists(ap); exist {
if exist, i := c.Exists(ap); exist {
return errors.New("plugin instance already available at index " + strconv.Itoa(i))
}
} else {
var col []*availablePlugin
(*c.table)[ap.Key] = &col
(*c.table)[ap.Key] = newAvailablePluginPool()
}

// tell ap its index in the table
ap.Index = len(*(*c.table)[ap.Key])

// append
newCollection := append(*(*c.table)[ap.Key], ap)

// overwrite
(*c.table)[ap.Key] = &newCollection

(*c.table)[ap.Key].Add(ap)
return nil
}

// Remove removes an availablePlugin from the apCollection table
func (c *apCollection) Remove(ap *availablePlugin) error {
c.Lock()
defer c.Unlock()
if exists, _ := c.exists(ap); !exists {

if exists, _ := c.Exists(ap); !exists {
return errors.New("Warning: plugin does not exist in table")
}
splicedColl := append((*(*c.table)[ap.Key])[:ap.Index], (*(*c.table)[ap.Key])[ap.Index+1:]...)
(*c.table)[ap.Key] = &splicedColl
//reset indexes
for i, ap := range *(*c.table)[ap.Key] {
ap.Index = i
}

(*c.table)[ap.Key].Remove(ap)
return nil
}

Expand All @@ -204,7 +222,7 @@ func (c *apCollection) Unlock() {
}

// Item returns the item at current position in the apCollection table
func (c *apCollection) Item() (string, *[]*availablePlugin) {
func (c *apCollection) Item() (string, *availablePluginPool) {
key := (*c.keys)[c.currentIter-1]
return key, (*c.table)[key]
}
Expand All @@ -221,13 +239,8 @@ func (c *apCollection) Next() bool {

// exists checks the table to see if a pointer for the availablePlugin specified
// already exists
func (c *apCollection) exists(ap *availablePlugin) (bool, int) {
for i, _ap := range *(*c.table)[ap.Key] {
if ap == _ap {
return true, i
}
}
return false, -1
func (c *apCollection) Exists(ap *availablePlugin) (bool, int) {
return (*c.table)[ap.Key].Exists(ap)
}

// availablePlugins is a collection of availablePlugins by type
Expand Down Expand Up @@ -277,3 +290,81 @@ func (a *availablePlugins) Remove(ap *availablePlugin) error {
return errors.New("cannot remove from available plugins, unknown plugin type")
}
}

type availablePluginPool struct {
Plugins *[]*availablePlugin

mutex *sync.Mutex
}

func newAvailablePluginPool() *availablePluginPool {
app := make([]*availablePlugin, 0)
return &availablePluginPool{
Plugins: &app,
mutex: &sync.Mutex{},
}
}

func (a *availablePluginPool) Lock() {
a.mutex.Lock()
}

func (a *availablePluginPool) Unlock() {
a.mutex.Unlock()
}

func (a *availablePluginPool) Count() int {
return len((*a.Plugins))
}

func (a *availablePluginPool) Add(ap *availablePlugin) {
a.mutex.Lock()
defer a.mutex.Unlock()
// tell ap its index in the table
ap.Index = len((*a.Plugins))
// append
newCollection := append((*a.Plugins), ap)
// overwrite
a.Plugins = &newCollection
}

func (a *availablePluginPool) Remove(ap *availablePlugin) {
a.Lock()
defer a.Unlock()
// Place nil here to allow GC per : https://github.com/golang/go/wiki/SliceTricks
(*a.Plugins)[ap.Index] = nil
splicedColl := append((*a.Plugins)[:ap.Index], (*a.Plugins)[ap.Index+1:]...)
a.Plugins = &splicedColl
//reset indexes
a.resetIndexes()
}

func (a *availablePluginPool) Exists(ap *availablePlugin) (bool, int) {
for i, _ap := range *a.Plugins {
if ap == _ap {
return true, i
}
}
return false, -1
}

func (a *availablePluginPool) resetIndexes() {
for i, ap := range *a.Plugins {
ap.Index = i
}
}

func (a *availablePluginPool) SelectUsingStrategy(strat RoutingStrategy) (*availablePlugin, error) {
a.Lock()
defer a.Unlock()

sp := make([]routing.SelectablePlugin, len(*a.Plugins))
for i, _ := range *a.Plugins {
sp[i] = (*a.Plugins)[i]
}
sap, err := strat.Select(a, sp)
if err != nil || sap == nil {
return nil, err
}
return sap.(*availablePlugin), err
}
Loading

0 comments on commit a21fa58

Please sign in to comment.