Skip to content

Commit

Permalink
use sperate serializer for each influxdb output client
Browse files Browse the repository at this point in the history
  • Loading branch information
cfz committed Nov 6, 2019
1 parent 6881c64 commit 7c20132
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
21 changes: 12 additions & 9 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type InfluxDB struct {
CreateHTTPClientF func(config *HTTPConfig) (Client, error)
CreateUDPClientF func(config *UDPConfig) (Client, error)

serializer *influx.Serializer
Log telegraf.Logger
Log telegraf.Logger
}

var sampleConfig = `
Expand Down Expand Up @@ -145,11 +144,6 @@ func (i *InfluxDB) Connect() error {
urls = append(urls, defaultURL)
}

i.serializer = influx.NewSerializer()
if i.InfluxUintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}

for _, u := range urls {
parts, err := url.Parse(u)
if err != nil {
Expand Down Expand Up @@ -237,7 +231,7 @@ func (i *InfluxDB) udpClient(url *url.URL) (Client, error) {
config := &UDPConfig{
URL: url,
MaxPayloadSize: int(i.UDPPayload.Size),
Serializer: i.serializer,
Serializer: i.newSerializer(),
Log: i.Log,
}

Expand Down Expand Up @@ -271,7 +265,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
SkipDatabaseCreation: i.SkipDatabaseCreation,
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
Serializer: i.serializer,
Serializer: i.newSerializer(),
Log: i.Log,
}

Expand All @@ -291,6 +285,15 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
return c, nil
}

func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.InfluxUintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}

return serializer
}

func init() {
outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{
Expand Down
19 changes: 11 additions & 8 deletions plugins/outputs/influxdb_v2/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ type InfluxDB struct {
UintSupport bool `toml:"influx_uint_support"`
tls.ClientConfig

clients []Client
serializer *influx.Serializer
clients []Client
}

func (i *InfluxDB) Connect() error {
Expand All @@ -107,11 +106,6 @@ func (i *InfluxDB) Connect() error {
i.URLs = append(i.URLs, defaultURL)
}

i.serializer = influx.NewSerializer()
if i.UintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}

for _, u := range i.URLs {
parts, err := url.Parse(u)
if err != nil {
Expand Down Expand Up @@ -196,7 +190,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
UserAgent: i.UserAgent,
ContentEncoding: i.ContentEncoding,
TLSConfig: tlsConfig,
Serializer: i.serializer,
Serializer: i.newSerializer(),
}

c, err := NewHTTPClient(config)
Expand All @@ -207,6 +201,15 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
return c, nil
}

func (i *InfluxDB) newSerializer() *influx.Serializer {
serializer := influx.NewSerializer()
if i.UintSupport {
serializer.SetFieldTypeSupport(influx.UintSupport)
}

return serializer
}

func init() {
outputs.Add("influxdb_v2", func() telegraf.Output {
return &InfluxDB{
Expand Down

0 comments on commit 7c20132

Please sign in to comment.