Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb output database routing tag #5490

Merged
merged 2 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## For UDP url endpoint database needs to be configured on server side.
# database = "telegraf"

## The value of this tag will be used to determine the database. If this
## tag is not set the 'database' option is used as the default.
# database_tag = ""

## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
Expand Down
202 changes: 110 additions & 92 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/influx"
)

type APIErrorType int

const (
_ APIErrorType = iota
DatabaseNotFound
)

const (
defaultRequestTimeout = time.Second * 5
defaultDatabase = "telegraf"
Expand All @@ -37,7 +30,6 @@ const (
)

var (

// Escape an identifier in InfluxQL.
escapeIdentifier = strings.NewReplacer(
"\n", `\n`,
Expand All @@ -46,12 +38,11 @@ var (
)
)

// APIError is an error reported by the InfluxDB server
// APIError is a general error reported by the InfluxDB server
type APIError struct {
StatusCode int
Title string
Description string
Type APIErrorType
}

func (e APIError) Error() string {
Expand All @@ -61,6 +52,11 @@ func (e APIError) Error() string {
return e.Title
}

type DatabaseNotFoundError struct {
APIError
Database string
}

// QueryResponse is the response body from the /query endpoint
type QueryResponse struct {
Results []QueryResult `json:"results"`
Expand All @@ -87,62 +83,55 @@ func (r WriteResponse) Error() string {
}

type HTTPConfig struct {
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
RetentionPolicy string
Consistency string
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
RetentionPolicy string
Consistency string
SkipDatabaseCreation bool

InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
}

type httpClient struct {
WriteURL string
QueryURL string
ContentEncoding string
Timeout time.Duration
Username string
Password string
Headers map[string]string

client *http.Client
serializer *influx.Serializer
url *url.URL
database string
client *http.Client
config HTTPConfig
createdDatabases map[string]bool
}

func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
if config.URL == nil {
return nil, ErrMissingURL
}

database := config.Database
if database == "" {
database = defaultDatabase
if config.Database == "" {
config.Database = defaultDatabase
}

timeout := config.Timeout
if timeout == 0 {
timeout = defaultRequestTimeout
if config.Timeout == 0 {
config.Timeout = defaultRequestTimeout
}

userAgent := config.UserAgent
if userAgent == "" {
userAgent = "Telegraf/" + internal.Version()
}

var headers = make(map[string]string, len(config.Headers)+1)
headers["User-Agent"] = userAgent
if config.Headers == nil {
config.Headers = make(map[string]string)
}
config.Headers["User-Agent"] = userAgent
for k, v := range config.Headers {
headers[k] = v
config.Headers[k] = v
}

var proxy func(*http.Request) (*url.URL, error)
Expand All @@ -152,22 +141,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
proxy = http.ProxyFromEnvironment
}

serializer := config.Serializer
if serializer == nil {
serializer = influx.NewSerializer()
}

writeURL, err := makeWriteURL(
config.URL,
database,
config.RetentionPolicy,
config.Consistency)
if err != nil {
return nil, err
}
queryURL, err := makeQueryURL(config.URL)
if err != nil {
return nil, err
if config.Serializer == nil {
config.Serializer = influx.NewSerializer()
}

var transport *http.Transport
Expand All @@ -192,40 +167,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
}

client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Timeout: config.Timeout,
Transport: transport,
},
database: database,
url: config.URL,
WriteURL: writeURL,
QueryURL: queryURL,
ContentEncoding: config.ContentEncoding,
Timeout: timeout,
Username: config.Username,
Password: config.Password,
Headers: headers,
createdDatabases: make(map[string]bool),
config: config,
}
return client, nil
}

// URL returns the origin URL that this client connects too.
func (c *httpClient) URL() string {
return c.url.String()
return c.config.URL.String()
}

// URL returns the database that this client connects too.
// Database returns the default database that this client connects too.
func (c *httpClient) Database() string {
return c.database
return c.config.Database
}

// CreateDatabase attempts to create a new database in the InfluxDB server.
// Note that some names are not allowed by the server, notably those with
// non-printable characters or slashes.
func (c *httpClient) CreateDatabase(ctx context.Context) error {
func (c *httpClient) CreateDatabase(ctx context.Context, database string) error {
query := fmt.Sprintf(`CREATE DATABASE "%s"`,
escapeIdentifier.Replace(c.database))
escapeIdentifier.Replace(database))

req, err := c.makeQueryRequest(query)

Expand All @@ -241,6 +208,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

if err != nil {
if resp.StatusCode == 200 {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -252,6 +220,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

// Even with a 200 response there can be an error
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -264,10 +233,52 @@ func (c *httpClient) CreateDatabase(ctx context.Context) error {

// Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
var err error
batches := make(map[string][]telegraf.Metric)
if c.config.DatabaseTag == "" {
err := c.writeBatch(ctx, c.config.Database, metrics)
if err != nil {
return err
}
} else {
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}

if _, ok := batches[db]; !ok {
batches[db] = make([]telegraf.Metric, 0)
}

batches[db] = append(batches[db], metric)
}

for db, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] {
err := c.CreateDatabase(ctx, db)
if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
c.config.URL, db, err)
}
}

err := c.writeBatch(ctx, db, batch)
if err != nil {
return err
}
}
}
return nil
}

func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
if err != nil {
return err
}

reader := influx.NewReader(metrics, c.serializer)
req, err := c.makeWriteRequest(reader)
reader := influx.NewReader(metrics, c.config.Serializer)
req, err := c.makeWriteRequest(url, reader)
if err != nil {
return err
}
Expand All @@ -292,11 +303,13 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

if strings.Contains(desc, errStringDatabaseNotFound) {
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
Type: DatabaseNotFound,
return &DatabaseNotFoundError{
APIError: APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
},
Database: db,
}
}

Expand Down Expand Up @@ -340,11 +353,16 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
queryURL, err := makeQueryURL(c.config.URL)
if err != nil {
return nil, err
}

params := url.Values{}
params.Set("q", query)
form := strings.NewReader(params.Encode())

req, err := http.NewRequest("POST", c.QueryURL, form)
req, err := http.NewRequest("POST", queryURL, form)
if err != nil {
return nil, err
}
Expand All @@ -355,36 +373,36 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
return req, nil
}

func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
if c.config.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}

req, err := http.NewRequest("POST", c.WriteURL, body)
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "text/plain; charset=utf-8")
c.addHeaders(req)

if c.ContentEncoding == "gzip" {
if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

return req, nil
}

func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" {
req.SetBasicAuth(c.Username, c.Password)
if c.config.Username != "" || c.config.Password != "" {
req.SetBasicAuth(c.config.Username, c.config.Password)
}

for header, value := range c.Headers {
for header, value := range c.config.Headers {
req.Header.Set(header, value)
}
}
Expand Down
Loading