From f6b44922e6eb4921dc6049d32986f07cd402e4c7 Mon Sep 17 00:00:00 2001 From: Brandon Ewing Date: Fri, 8 May 2020 16:23:26 -0500 Subject: [PATCH] Add dynamic tagging to gnmi plugin --- plugins/inputs/gnmi/README.md | 16 ++++- plugins/inputs/gnmi/gnmi.go | 46 ++++++++++++ plugins/inputs/gnmi/gnmi_test.go | 120 +++++++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/gnmi/README.md b/plugins/inputs/gnmi/README.md index e7bbee0ea71dd..2da0ad7c9e51e 100644 --- a/plugins/inputs/gnmi/README.md +++ b/plugins/inputs/gnmi/README.md @@ -64,11 +64,23 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR (64- ## If suppression is enabled, send updates at least every X seconds anyway # heartbeat_interval = "60s" + + # [[inputs.gnmi.subscription]] + # name = "descr" + # origin = "openconfig-interfaces" + # path = "/interfaces/interface/state/description" + # subscription_mode = "on_change" + + # ## If tag_only is set, the subscription in question will be utilized to maintain a map of + # ## tags to apply to other measurements emitted by the plugin, by matching path keys + # ## All fields from the tag-only subscription will be applied as tags to other readings, + # ## in the format _. + # tag_only = true ``` ## Example Output ```shell -ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000 -ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000 +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115,descr/description=Foo in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000 +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115,descr/description=Bar out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000 ``` diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index a6a3c3a2c6ef3..fa05e3120d349 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -56,6 +56,8 @@ type GNMI struct { acc telegraf.Accumulator cancel context.CancelFunc wg sync.WaitGroup + // Lookup/device+name/key/value + lookup map[string]map[string]map[string]interface{} Log telegraf.Logger } @@ -73,6 +75,9 @@ type Subscription struct { // Duplicate suppression SuppressRedundant bool `toml:"suppress_redundant"` HeartbeatInterval config.Duration `toml:"heartbeat_interval"` + + // Mark this subscription as a tag-only lookup source, not emitting any metric + TagOnly bool `toml:"tag_only"` } // Start the http listener service @@ -83,6 +88,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { var request *gnmiLib.SubscribeRequest c.acc = acc ctx, c.cancel = context.WithCancel(context.Background()) + c.lookup = make(map[string]map[string]map[string]interface{}) // Validate configuration if request, err = c.newSubscribeRequest(); err != nil { @@ -133,6 +139,11 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { c.internalAliases[longPath] = name c.internalAliases[shortPath] = name } + + if subscription.TagOnly { + // Create the top-level lookup for this tag + c.lookup[name] = make(map[string]map[string]interface{}) + } } for alias, encodingPath := range c.Aliases { c.internalAliases[encodingPath] = alias @@ -297,6 +308,29 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S } } + // Update tag lookups and discard rest of update + subscriptionKey := tags["source"] + "/" + tags["name"] + if _, ok := c.lookup[name]; ok { + // We are subscribed to this, so add the fields to the lookup-table + if _, ok := c.lookup[name][subscriptionKey]; !ok { + c.lookup[name][subscriptionKey] = make(map[string]interface{}) + } + for k, v := range fields { + c.lookup[name][subscriptionKey][path.Base(k)] = v + } + // Do not process the data further as we only subscribed here for the lookup table + continue + } + + // Apply lookups if present + for subscriptionName, values := range c.lookup { + if annotations, ok := values[subscriptionKey]; ok { + for k, v := range annotations { + tags[subscriptionName+"/"+k] = v.(string) + } + } + } + // Group metrics for k, v := range fields { key := k @@ -559,6 +593,18 @@ const sampleConfig = ` ## If suppression is enabled, send updates at least every X seconds anyway # heartbeat_interval = "60s" + + # [[inputs.gnmi.subscription]] + # name = "tag" # tag name will be name + "_" + shortPath ie "tag_description" + # origin = "openconfig-interfaces" + # path = "/interfaces/interface/state/description" + # subscription_mode = "on_change" + + # ## If tag_only is set, the subscription in question will be utilized to maintain a map of + # ## tags to apply to other measurements emitted by the plugin, by matching path keys + # ## All fields from the tag-only subscription will be applied as tags to other readings, + # ## in the format _. + # tag_only = true ` // SampleConfig of plugin diff --git a/plugins/inputs/gnmi/gnmi_test.go b/plugins/inputs/gnmi/gnmi_test.go index 17a955c4875dc..9d7173d4b4d7c 100644 --- a/plugins/inputs/gnmi/gnmi_test.go +++ b/plugins/inputs/gnmi/gnmi_test.go @@ -371,6 +371,126 @@ func TestNotification(t *testing.T) { ), }, }, + { + name: "tagged update pair", + plugin: &GNMI{ + Log: testutil.Logger{}, + Encoding: "proto", + Redial: config.Duration(1 * time.Second), + Subscriptions: []Subscription{ + { + Name: "oc-intf-desc", + Origin: "openconfig-interfaces", + Path: "/interfaces/interface/state/description", + SubscriptionMode: "on_change", + TagOnly: true, + }, + { + Name: "oc-intf-counters", + Origin: "openconfig-interfaces", + Path: "/interfaces/interface/state/counters", + SubscriptionMode: "sample", + }, + }, + }, + server: &MockServer{ + SubscribeF: func(server gnmiLib.GNMI_SubscribeServer) error { + tagResponse := &gnmiLib.SubscribeResponse{ + Response: &gnmiLib.SubscribeResponse_Update{ + Update: &gnmiLib.Notification{ + Timestamp: 1543236571000000000, + Prefix: &gnmiLib.Path{}, + Update: []*gnmiLib.Update{ + { + Path: &gnmiLib.Path{ + Origin: "", + Elem: []*gnmiLib.PathElem{ + { + Name: "interfaces", + }, + { + Name: "interface", + Key: map[string]string{"name": "Ethernet1"}, + }, + { + Name: "state", + }, + { + Name: "description", + }, + }, + Target: "", + }, + Val: &gnmiLib.TypedValue{ + Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}, + }, + }, + }, + }, + }, + } + if err := server.Send(tagResponse); err != nil { + return err + } + if err := server.Send(&gnmiLib.SubscribeResponse{Response: &gnmiLib.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil { + return err + } + taggedResponse := &gnmiLib.SubscribeResponse{ + Response: &gnmiLib.SubscribeResponse_Update{ + Update: &gnmiLib.Notification{ + Timestamp: 1543236572000000000, + Prefix: &gnmiLib.Path{}, + Update: []*gnmiLib.Update{ + { + Path: &gnmiLib.Path{ + Origin: "", + Elem: []*gnmiLib.PathElem{ + { + Name: "interfaces", + }, + { + Name: "interface", + Key: map[string]string{"name": "Ethernet1"}, + }, + { + Name: "state", + }, + { + Name: "counters", + }, + { + Name: "in-broadcast-pkts", + }, + }, + Target: "", + }, + Val: &gnmiLib.TypedValue{ + Value: &gnmiLib.TypedValue_IntVal{IntVal: 42}, + }, + }, + }, + }, + }, + } + return server.Send(taggedResponse) + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "oc-intf-counters", + map[string]string{ + "path": "", + "source": "127.0.0.1", + "name": "Ethernet1", + "oc-intf-desc_description": "foo", + }, + map[string]interface{}{ + "in_broadcast_pkts": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests {