From c94ce3c2d935f8f27b5f4ef951e01e4aa09ea8f1 Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Wed, 14 Aug 2019 16:18:27 +0200 Subject: [PATCH 1/5] Updating to allow the users to keep the database that gets sent in the query string. Adding a test for the code change Adding new configuration to the README. Adding toml tags to make it like the rest of the plugins. --- plugins/inputs/influxdb_listener/README.md | 10 ++++ .../inputs/influxdb_listener/http_listener.go | 53 ++++++++++++++----- .../influxdb_listener/http_listener_test.go | 20 ++----- 3 files changed, 56 insertions(+), 27 deletions(-) diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index 8b6d2ad51c538..4c7db247c921e 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -46,6 +46,16 @@ submits data to InfluxDB determines the destination database. tls_cert = "/etc/telegraf/cert.pem" tls_key = "/etc/telegraf/key.pem" + ## If the write has a database on it then it should be kept + ## for metrics further on. The database will be added as a tag. + ## This tag can be used in downstream outputs. + keep_database = true + + ## Optional tag name used to store the database if you want to change it to something custom. + ## If not set it will be "database" + ## Only used if keep_database is set to true. + # database_tag = database + ## Optional username and password to accept for HTTP basic authentication. ## You probably want to make sure you have TLS configured above for this. # basic_username = "foobar" diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index 7e55447869018..d72df5f43a61c 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -32,22 +32,28 @@ const ( // a single InfluxDB point. // 64 KB DEFAULT_MAX_LINE_SIZE = 64 * 1024 + + // DefaultDatabaseTag is the name of the tag that will be used to carry + // the database collected from the query string + DefaultDatabaseTag = "database" ) type TimeFunc func() time.Time type HTTPListener struct { - ServiceAddress string - ReadTimeout internal.Duration - WriteTimeout internal.Duration - MaxBodySize internal.Size - MaxLineSize internal.Size - Port int - + ServiceAddress string `toml:"service_address"` + // Port gets pulled out of ServiceAddress + Port int tlsint.ServerConfig - BasicUsername string - BasicPassword string + ReadTimeout internal.Duration `toml:"read_timeout"` + WriteTimeout internal.Duration `toml:"write_timeout"` + MaxBodySize internal.Size `toml:"max_body_size"` + MaxLineSize internal.Size `toml:"max_line_size"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + KeepDatabase bool `toml:"keep_database"` + DatabaseTag string `toml:"database_tag"` TimeFunc @@ -93,6 +99,16 @@ const sampleConfig = ` ## Maximum line size allowed to be sent in bytes. ## 0 means to use the default of 65536 bytes (64 kibibytes) max_line_size = "64KiB" + + ## If the write has a database on it then it should be kept + ## for metrics further on. The database will be added as a tag. + ## This tag can be used in downstream outputs. + keep_database = true + + ## Optional tag name used to store the database if you want to change it to something custom. + ## If not set it will be "database" + ## Only used if keep_database is set to true. + # database_tag = database ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections @@ -258,6 +274,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { now := h.TimeFunc() precision := req.URL.Query().Get("precision") + db := req.URL.Query().Get("db") // Handle gzip request bodies body := req.Body @@ -315,7 +332,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { if err == io.ErrUnexpectedEOF { // finished reading the request body - err = h.parse(buf[:n+bufStart], now, precision) + err = h.parse(buf[:n+bufStart], now, precision, db) if err != nil { log.Println("D! "+err.Error(), bufStart+n) return400 = true @@ -346,7 +363,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { bufStart = 0 continue } - if err := h.parse(buf[:i+1], now, precision); err != nil { + if err := h.parse(buf[:i+1], now, precision, db); err != nil { log.Println("D! " + err.Error()) return400 = true } @@ -359,7 +376,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { } } -func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { +func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error { h.mu.Lock() defer h.mu.Unlock() @@ -371,6 +388,16 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { } for _, m := range metrics { + // Do we need to keep the database name in the query string + if h.KeepDatabase { + // Did we get a database argument. If we didn't get it. We can't set it. + if db != "" { + // Is there already a database set. If not use the database in the query string. + if _, ok := m.Tags()[h.DatabaseTag]; !ok { + m.AddTag(h.DatabaseTag, db) + } + } + } h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } @@ -436,12 +463,14 @@ func init() { return &HTTPListener{ ServiceAddress: ":8186", TimeFunc: time.Now, + DatabaseTag: DefaultDatabaseTag, } }) inputs.Add("influxdb_listener", func() telegraf.Input { return &HTTPListener{ ServiceAddress: ":8186", TimeFunc: time.Now, + DatabaseTag: DefaultDatabaseTag, } }) } diff --git a/plugins/inputs/influxdb_listener/http_listener_test.go b/plugins/inputs/influxdb_listener/http_listener_test.go index 9642950613840..3c4584b1dca2a 100644 --- a/plugins/inputs/influxdb_listener/http_listener_test.go +++ b/plugins/inputs/influxdb_listener/http_listener_test.go @@ -46,6 +46,7 @@ func newTestHTTPListener() *HTTPListener { listener := &HTTPListener{ ServiceAddress: "localhost:0", TimeFunc: time.Now, + DatabaseTag: DefaultDatabaseTag, } return listener } @@ -146,8 +147,9 @@ func TestWriteHTTPBasicAuth(t *testing.T) { require.EqualValues(t, http.StatusNoContent, resp.StatusCode) } -func TestWriteHTTP(t *testing.T) { +func TestWriteHTTPKeepDatabase(t *testing.T) { listener := newTestHTTPListener() + listener.KeepDatabase = true acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -162,7 +164,7 @@ func TestWriteHTTP(t *testing.T) { acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, + map[string]string{"host": "server01", "database": "mydb"}, ) // post multiple message to listener @@ -177,21 +179,9 @@ func TestWriteHTTP(t *testing.T) { for _, hostTag := range hostTags { acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag}, + map[string]string{"host": hostTag, "database": "mydb"}, ) } - - // Post a gigantic metric to the listener and verify that an error is returned: - resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - resp.Body.Close() - require.EqualValues(t, 400, resp.StatusCode) - - acc.Wait(3) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) } // http listener should add a newline at the end of the buffer if it's not there From 81d9e8efa66a9aec9b520a3fd335e9078d01aac4 Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Thu, 15 Aug 2019 10:37:36 +0200 Subject: [PATCH 2/5] making requested changes in PR --- plugins/inputs/influxdb_listener/README.md | 12 ++++------ .../inputs/influxdb_listener/http_listener.go | 22 +++++-------------- .../influxdb_listener/http_listener_test.go | 3 +-- 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index 4c7db247c921e..c1a4447c5e491 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -46,15 +46,11 @@ submits data to InfluxDB determines the destination database. tls_cert = "/etc/telegraf/cert.pem" tls_key = "/etc/telegraf/key.pem" - ## If the write has a database on it then it should be kept - ## for metrics further on. The database will be added as a tag. + ## Optional tag name used to store the database. + ## If the write has a database in the query string then it will be kept in this tag name. ## This tag can be used in downstream outputs. - keep_database = true - - ## Optional tag name used to store the database if you want to change it to something custom. - ## If not set it will be "database" - ## Only used if keep_database is set to true. - # database_tag = database + ## The default value of nothing means it will be off and the database will not be recorded. + # database_tag = "" ## Optional username and password to accept for HTTP basic authentication. ## You probably want to make sure you have TLS configured above for this. diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index d72df5f43a61c..b22e99800f96d 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -32,10 +32,6 @@ const ( // a single InfluxDB point. // 64 KB DEFAULT_MAX_LINE_SIZE = 64 * 1024 - - // DefaultDatabaseTag is the name of the tag that will be used to carry - // the database collected from the query string - DefaultDatabaseTag = "database" ) type TimeFunc func() time.Time @@ -52,7 +48,6 @@ type HTTPListener struct { MaxLineSize internal.Size `toml:"max_line_size"` BasicUsername string `toml:"basic_username"` BasicPassword string `toml:"basic_password"` - KeepDatabase bool `toml:"keep_database"` DatabaseTag string `toml:"database_tag"` TimeFunc @@ -100,15 +95,12 @@ const sampleConfig = ` ## 0 means to use the default of 65536 bytes (64 kibibytes) max_line_size = "64KiB" - ## If the write has a database on it then it should be kept - ## for metrics further on. The database will be added as a tag. - ## This tag can be used in downstream outputs. - keep_database = true - ## Optional tag name used to store the database if you want to change it to something custom. - ## If not set it will be "database" - ## Only used if keep_database is set to true. - # database_tag = database + ## Optional tag name used to store the database. + ## If the write has a database in the query string then it will be kept in this tag name. + ## This tag can be used in downstream outputs. + ## The default value of nothing means it will be off and the database will not be recorded. + # database_tag = "" ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections @@ -389,7 +381,7 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error for _, m := range metrics { // Do we need to keep the database name in the query string - if h.KeepDatabase { + if h.DatabaseTag != "" { // Did we get a database argument. If we didn't get it. We can't set it. if db != "" { // Is there already a database set. If not use the database in the query string. @@ -463,14 +455,12 @@ func init() { return &HTTPListener{ ServiceAddress: ":8186", TimeFunc: time.Now, - DatabaseTag: DefaultDatabaseTag, } }) inputs.Add("influxdb_listener", func() telegraf.Input { return &HTTPListener{ ServiceAddress: ":8186", TimeFunc: time.Now, - DatabaseTag: DefaultDatabaseTag, } }) } diff --git a/plugins/inputs/influxdb_listener/http_listener_test.go b/plugins/inputs/influxdb_listener/http_listener_test.go index 3c4584b1dca2a..5f5de55891eb3 100644 --- a/plugins/inputs/influxdb_listener/http_listener_test.go +++ b/plugins/inputs/influxdb_listener/http_listener_test.go @@ -46,7 +46,6 @@ func newTestHTTPListener() *HTTPListener { listener := &HTTPListener{ ServiceAddress: "localhost:0", TimeFunc: time.Now, - DatabaseTag: DefaultDatabaseTag, } return listener } @@ -149,7 +148,7 @@ func TestWriteHTTPBasicAuth(t *testing.T) { func TestWriteHTTPKeepDatabase(t *testing.T) { listener := newTestHTTPListener() - listener.KeepDatabase = true + listener.DatabaseTag = "database" acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) From 24d524f7e7603dde25c09f3c5198431e06d4b2fb Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Thu, 15 Aug 2019 10:57:45 +0200 Subject: [PATCH 3/5] trivial change to trigger a built since I can't restart the failed one. --- plugins/inputs/influxdb_listener/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index c1a4447c5e491..b0f76500ecca0 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -46,7 +46,7 @@ submits data to InfluxDB determines the destination database. tls_cert = "/etc/telegraf/cert.pem" tls_key = "/etc/telegraf/key.pem" - ## Optional tag name used to store the database. + ## Optional tag name used to store the database name. ## If the write has a database in the query string then it will be kept in this tag name. ## This tag can be used in downstream outputs. ## The default value of nothing means it will be off and the database will not be recorded. From d4448a845cb17090fe8bf2589760dc2972e9568f Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Fri, 16 Aug 2019 11:26:32 +0200 Subject: [PATCH 4/5] database will be written even if metric came in with a tag already set. Updated test to assert this behaviour. Updated README with this information --- plugins/inputs/influxdb_listener/README.md | 2 ++ plugins/inputs/influxdb_listener/http_listener.go | 13 +++++-------- .../inputs/influxdb_listener/http_listener_test.go | 14 ++++++++++++++ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index b0f76500ecca0..5efa6baf1a554 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -50,6 +50,8 @@ submits data to InfluxDB determines the destination database. ## If the write has a database in the query string then it will be kept in this tag name. ## This tag can be used in downstream outputs. ## The default value of nothing means it will be off and the database will not be recorded. + ## If you have a tag that is the same as the one specified below, and supply a database, + ## the tag will be overwritten with the database supplied. # database_tag = "" ## Optional username and password to accept for HTTP basic authentication. diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index b22e99800f96d..f094522e5be64 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -381,14 +381,11 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error for _, m := range metrics { // Do we need to keep the database name in the query string - if h.DatabaseTag != "" { - // Did we get a database argument. If we didn't get it. We can't set it. - if db != "" { - // Is there already a database set. If not use the database in the query string. - if _, ok := m.Tags()[h.DatabaseTag]; !ok { - m.AddTag(h.DatabaseTag, db) - } - } + // If a tag has been supplied to put the db in and we actually got a db query, + // then we write it in. This overwrites the database tag if one was sent. + // This makes it behave like the influx endpoint. + if h.DatabaseTag != "" && db != "" { + m.AddTag(h.DatabaseTag, db) } h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } diff --git a/plugins/inputs/influxdb_listener/http_listener_test.go b/plugins/inputs/influxdb_listener/http_listener_test.go index 5f5de55891eb3..6d14e6539603b 100644 --- a/plugins/inputs/influxdb_listener/http_listener_test.go +++ b/plugins/inputs/influxdb_listener/http_listener_test.go @@ -147,6 +147,8 @@ func TestWriteHTTPBasicAuth(t *testing.T) { } func TestWriteHTTPKeepDatabase(t *testing.T) { + testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n" + listener := newTestHTTPListener() listener.DatabaseTag = "database" @@ -166,6 +168,18 @@ func TestWriteHTTPKeepDatabase(t *testing.T) { map[string]string{"host": "server01", "database": "mydb"}, ) + // post single message to listener with a database tag in it already. It should be clobbered. + resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB))) + require.NoError(t, err) + resp.Body.Close() + require.EqualValues(t, 204, resp.StatusCode) + + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "database": "mydb"}, + ) + // post multiple message to listener resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) From 6aba7dbc2fc0f0158b3df73396af2976d9202818 Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Fri, 16 Aug 2019 11:49:49 +0200 Subject: [PATCH 5/5] trivial change to restart the build job that failed :( --- plugins/inputs/influxdb_listener/http_listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index f094522e5be64..5383fd2aad6d9 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -380,7 +380,7 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error } for _, m := range metrics { - // Do we need to keep the database name in the query string + // Do we need to keep the database name in the query string. // If a tag has been supplied to put the db in and we actually got a db query, // then we write it in. This overwrites the database tag if one was sent. // This makes it behave like the influx endpoint.