Skip to content

Commit

Permalink
Add dynamic tagging to cisco_telemetry_gnmi
Browse files Browse the repository at this point in the history
  • Loading branch information
bewing committed May 11, 2020
1 parent 48b8357 commit ffa3f74
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 2 deletions.
16 changes: 14 additions & 2 deletions plugins/inputs/cisco_telemetry_gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,22 @@ It has been optimized to support GNMI telemetry as produced by Cisco IOS XR (64-

## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"

[[inputs.cisco_telemetry_gnmi.subscription]]
name = "descr"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"

## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
tag_only = true
```

### Example Output
```
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115,descr_description=Foo in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115,descr_description=Bar out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
```
59 changes: 59 additions & 0 deletions plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type CiscoTelemetryGNMI struct {
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
// Lookup/device/name/key/value
lookup map[string]map[string]map[string]map[string]interface{}

Log telegraf.Logger
}
Expand All @@ -72,6 +74,9 @@ type Subscription struct {
// Duplicate suppression
SuppressRedundant bool `toml:"suppress_redundant"`
HeartbeatInterval internal.Duration `toml:"heartbeat_interval"`

// Tag-only identification
TagOnly bool `toml:"tag_only"`
}

// Start the http listener service
Expand All @@ -82,6 +87,7 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error {
var request *gnmi.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
c.lookup = make(map[string]map[string]map[string]map[string]interface{})

// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
Expand Down Expand Up @@ -126,6 +132,11 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error {
c.aliases[longPath] = name
c.aliases[shortPath] = name
}

if subscription.TagOnly {
// Create the top-level lookup for this tag
c.lookup[name] = make(map[string]map[string]map[string]interface{})
}
}
for alias, path := range c.Aliases {
c.aliases[path] = alias
Expand All @@ -134,6 +145,11 @@ func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error {
// Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses {
// Update the lookup table with this address
for lu := range c.lookup {
hostname, _, _ := net.SplitHostPort(addr)
c.lookup[lu][hostname] = make(map[string]map[string]interface{})
}
go func(address string) {
defer c.wg.Done()
for ctx.Err() == nil {
Expand Down Expand Up @@ -280,6 +296,24 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi
}
}

// Update tag lookups and discard rest of update
if lu, ok := c.lookup[name]; ok {
if err := updateLookups(lu[tags["source"]], tags, fields); err != nil {
c.Log.Debugf("Error updating lookups")
}
continue
}

// Apply lookups if present
for k, v := range c.lookup {
if t, ok := v[tags["source"]][tags["name"]]; ok {
for name, val := range t {
tagName := fmt.Sprintf("%s_%s", k, name)
tags[tagName] = val.(string)
}
}
}

// Group metrics
for k, v := range fields {
key := k
Expand Down Expand Up @@ -312,6 +346,19 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi
}
}

func updateLookups(lu map[string]map[string]interface{}, tags map[string]string, fields map[string]interface{}) error {
name, ok := lu[tags["name"]]
if !ok {
name = make(map[string]interface{})
lu[tags["name"]] = name
}
for k, v := range fields {
shortName := path.Base(k)
name[shortName] = v
}
return nil
}

// HandleTelemetryField and add it to a measurement
func (c *CiscoTelemetryGNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
path, aliasPath := c.handlePath(update.Path, tags, prefix)
Expand Down Expand Up @@ -529,6 +576,18 @@ const sampleConfig = `
## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"
[[inputs.cisco_telemetry_gnmi.subscription]]
name = "descr"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"
## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
tag_only = true
`

// SampleConfig of plugin
Expand Down
117 changes: 117 additions & 0 deletions plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,123 @@ func TestNotification(t *testing.T) {
),
},
},
{
name: "tagged update pair",
plugin: &CiscoTelemetryGNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second},
Subscriptions: []Subscription{
{
Name: "oc-intf-desc",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/description",
SubscriptionMode: "on_change",
TagOnly: true,
},
{
Name: "oc-intf-counters",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/counters",
SubscriptionMode: "sample",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
tagResponse := &gnmi.SubscribeResponse{
Response: &gnmi.SubscribeResponse_Update{
Update: &gnmi.Notification{
Timestamp: 1543236571000000000,
Prefix: &gnmi.Path{},
Update: []*gnmi.Update{
{
Path: &gnmi.Path{
Origin: "",
Elem: []*gnmi.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "description",
},
},
Target: "",
},
Val: &gnmi.TypedValue{
Value: &gnmi.TypedValue_StringVal{StringVal: "foo"},
},
},
},
},
},
}
server.Send(tagResponse)
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}})
taggedResponse := &gnmi.SubscribeResponse{
Response: &gnmi.SubscribeResponse_Update{
Update: &gnmi.Notification{
Timestamp: 1543236572000000000,
Prefix: &gnmi.Path{},
Update: []*gnmi.Update{
{
Path: &gnmi.Path{
Origin: "",
Elem: []*gnmi.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "counters",
},
{
Name: "in-broadcast-pkts",
},
},
Target: "",
},
Val: &gnmi.TypedValue{
Value: &gnmi.TypedValue_IntVal{IntVal: 42},
},
},
},
},
},
}
server.Send(taggedResponse)
return nil
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"oc-intf-counters",
map[string]string{
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc_description": "foo",
},
map[string]interface{}{
"in_broadcast_pkts": 42,
},
time.Unix(0, 0),
),
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit ffa3f74

Please sign in to comment.