Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow influxdb_listener to keep the database name from the query string if supplied #6257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ submits data to InfluxDB determines the destination database.
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"

## Optional tag name used to store the database name.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
## If you have a tag that is the same as the one specified below, and supply a database,
## the tag will be overwritten with the database supplied.
# database_tag = ""

## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
Expand Down
40 changes: 28 additions & 12 deletions plugins/inputs/influxdb_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ const (
type TimeFunc func() time.Time

type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int

ServiceAddress string `toml:"service_address"`
// Port gets pulled out of ServiceAddress
Port int
tlsint.ServerConfig

BasicUsername string
BasicPassword string
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`

TimeFunc

Expand Down Expand Up @@ -93,6 +94,13 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB"


## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
now := h.TimeFunc()

precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")

// Handle gzip request bodies
body := req.Body
Expand Down Expand Up @@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {

if err == io.ErrUnexpectedEOF {
// finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision)
err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil {
log.Println("D! "+err.Error(), bufStart+n)
return400 = true
Expand Down Expand Up @@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now, precision); err != nil {
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
log.Println("D! " + err.Error())
return400 = true
}
Expand All @@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
}

func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -371,6 +380,13 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
}

for _, m := range metrics {
// Do we need to keep the database name in the query string.
// If a tag has been supplied to put the db in and we actually got a db query,
// then we write it in. This overwrites the database tag if one was sent.
// This makes it behave like the influx endpoint.
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

Expand Down
33 changes: 18 additions & 15 deletions plugins/inputs/influxdb_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,11 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteHTTP(t *testing.T) {
func TestWriteHTTPKeepDatabase(t *testing.T) {
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"

listener := newTestHTTPListener()
listener.DatabaseTag = "database"

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
Expand All @@ -162,7 +165,19 @@ func TestWriteHTTP(t *testing.T) {
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
map[string]string{"host": "server01", "database": "mydb"},
)

// post single message to listener with a database tag in it already. It should be clobbered.
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01", "database": "mydb"},
)

// post multiple message to listener
Expand All @@ -177,21 +192,9 @@ func TestWriteHTTP(t *testing.T) {
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
map[string]string{"host": hostTag, "database": "mydb"},
)
}

// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode)

acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}

// http listener should add a newline at the end of the buffer if it's not there
Expand Down