Skip to content

Commit

Permalink
Add tag based routing in influxdb/influxdb_v2 outputs (influxdata#5490)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 32f9536 commit 75589a7
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 208 deletions.
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## For UDP url endpoint database needs to be configured on server side.
# database = "telegraf"

## The value of this tag will be used to determine the database. If this
## tag is not set the 'database' option is used as the default.
# database_tag = ""

## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
Expand Down
202 changes: 110 additions & 92 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/influx"
)

type APIErrorType int

const (
_ APIErrorType = iota
DatabaseNotFound
)

const (
defaultRequestTimeout = time.Second * 5
defaultDatabase = "telegraf"
Expand All @@ -37,7 +30,6 @@ const (
)

var (

// Escape an identifier in InfluxQL.
escapeIdentifier = strings.NewReplacer(
"\n", `\n`,
Expand All @@ -46,12 +38,11 @@ var (
)
)

// APIError is an error reported by the InfluxDB server
// APIError is a general error reported by the InfluxDB server
type APIError struct {
StatusCode int
Title string
Description string
Type APIErrorType
}

func (e APIError) Error() string {
Expand All @@ -61,6 +52,11 @@ func (e APIError) Error() string {
return e.Title
}

type DatabaseNotFoundError struct {
APIError
Database string
}

// QueryResponse is the response body from the /query endpoint
type QueryResponse struct {
Results []QueryResult `json:"results"`
Expand All @@ -87,62 +83,55 @@ func (r WriteResponse) Error() string {
}

type HTTPConfig struct {
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
RetentionPolicy string
Consistency string
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
RetentionPolicy string
Consistency string
SkipDatabaseCreation bool

InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
}

type httpClient struct {
WriteURL string
QueryURL string
ContentEncoding string
Timeout time.Duration
Username string
Password string
Headers map[string]string

client *http.Client
serializer *influx.Serializer
url *url.URL
database string
client *http.Client
config HTTPConfig
createdDatabases map[string]bool
}

func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
if config.URL == nil {
return nil, ErrMissingURL
}

database := config.Database
if database == "" {
database = defaultDatabase
if config.Database == "" {
config.Database = defaultDatabase
}

timeout := config.Timeout
if timeout == 0 {
timeout = defaultRequestTimeout
if config.Timeout == 0 {
config.Timeout = defaultRequestTimeout
}

userAgent := config.UserAgent
if userAgent == "" {
userAgent = "Telegraf/" + internal.Version()
}

var headers = make(map[string]string, len(config.Headers)+1)
headers["User-Agent"] = userAgent
if config.Headers == nil {
config.Headers = make(map[string]string)
}
config.Headers["User-Agent"] = userAgent
for k, v := range config.Headers {
headers[k] = v
config.Headers[k] = v
}

var proxy func(*http.Request) (*url.URL, error)
Expand All @@ -152,22 +141,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
proxy = http.ProxyFromEnvironment
}

serializer := config.Serializer
if serializer == nil {
serializer = influx.NewSerializer()
}

writeURL, err := makeWriteURL(
config.URL,
database,
config.RetentionPolicy,
config.Consistency)
if err != nil {
return nil, err
}
queryURL, err := makeQueryURL(config.URL)
if err != nil {
return nil, err
if config.Serializer == nil {
config.Serializer = influx.NewSerializer()
}

var transport *http.Transport
Expand All @@ -192,40 +167,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
}

client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Timeout: config.Timeout,
Transport: transport,
},
database: database,
url: config.URL,
WriteURL: writeURL,
QueryURL: queryURL,
ContentEncoding: config.ContentEncoding,
Timeout: timeout,
Username: config.Username,
Password: config.Password,
Headers: headers,
createdDatabases: make(map[string]bool),
config: config,
}
return client, nil
}

// URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string {
return c.url.String()
return c.config.URL.String()
}

// URL returns the database that this client connects too.
// Database returns the default database that this client connects too.
func (c *httpClient) Database() string {
return c.database
return c.config.Database
}

// CreateDatabase attempts to create a new database in the InfluxDB server.
// Note that some names are not allowed by the server, notably those with
// non-printable characters or slashes.
func (c *httpClient) CreateDatabase(ctx context.Context) error {
func (c *httpClient) CreateDatabase(ctx context.Context, database string) error {
query := fmt.Sprintf(`CREATE DATABASE "%s"`,
escapeIdentifier.Replace(c.database))
escapeIdentifier.Replace(database))

req, err := c.makeQueryRequest(query)

Expand All @@ -241,6 +208,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

if err != nil {
if resp.StatusCode == 200 {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -252,6 +220,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

// Even with a 200 response there can be an error
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -264,10 +233,52 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

// Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
var err error
batches := make(map[string][]telegraf.Metric)
if c.config.DatabaseTag == "" {
err := c.writeBatch(ctx, c.config.Database, metrics)
if err != nil {
return err
}
} else {
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}

if _, ok := batches[db]; !ok {
batches[db] = make([]telegraf.Metric, 0)
}

batches[db] = append(batches[db], metric)
}

for db, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] {
err := c.CreateDatabase(ctx, db)
if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
c.config.URL, db, err)
}
}

err := c.writeBatch(ctx, db, batch)
if err != nil {
return err
}
}
}
return nil
}

func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
if err != nil {
return err
}

reader := influx.NewReader(metrics, c.serializer)
req, err := c.makeWriteRequest(reader)
reader := influx.NewReader(metrics, c.config.Serializer)
req, err := c.makeWriteRequest(url, reader)
if err != nil {
return err
}
Expand All @@ -292,11 +303,13 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

if strings.Contains(desc, errStringDatabaseNotFound) {
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
Type: DatabaseNotFound,
return &DatabaseNotFoundError{
APIError: APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
},
Database: db,
}
}

Expand Down Expand Up @@ -340,11 +353,16 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
queryURL, err := makeQueryURL(c.config.URL)
if err != nil {
return nil, err
}

params := url.Values{}
params.Set("q", query)
form := strings.NewReader(params.Encode())

req, err := http.NewRequest("POST", c.QueryURL, form)
req, err := http.NewRequest("POST", queryURL, form)
if err != nil {
return nil, err
}
Expand All @@ -355,36 +373,36 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
return req, nil
}

func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
if c.config.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}

req, err := http.NewRequest("POST", c.WriteURL, body)
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

if c.ContentEncoding == "gzip" {
if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

return req, nil
}

func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" {
req.SetBasicAuth(c.Username, c.Password)
if c.config.Username != "" || c.config.Password != "" {
req.SetBasicAuth(c.config.Username, c.config.Password)
}

for header, value := range c.Headers {
for header, value := range c.config.Headers {
req.Header.Set(header, value)
}
}
Expand Down
Loading

0 comments on commit 75589a7

Please sign in to comment.