From baca5f9a25c20ce1d39fda68d26c615f059cc433 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Tue, 20 Mar 2018 13:37:13 -0300 Subject: [PATCH 01/12] Create plugin to insert using CopyIn and insert grouped/batch --- plugins/outputs/all/all.go | 2 + plugins/outputs/postgresql_batch/README.md | 13 ++ .../postgresql_batch/postgresql_batch.go | 132 ++++++++++++++++++ .../postgresql_batch/postgresql_batch_test.go | 47 +++++++ plugins/outputs/postgresql_copy/README.md | 13 ++ .../postgresql_copy/postgresql_copy.go | 108 ++++++++++++++ .../postgresql_copy/postgresql_copy_test.go | 48 +++++++ 7 files changed, 363 insertions(+) create mode 100644 plugins/outputs/postgresql_batch/README.md create mode 100644 plugins/outputs/postgresql_batch/postgresql_batch.go create mode 100644 plugins/outputs/postgresql_batch/postgresql_batch_test.go create mode 100644 plugins/outputs/postgresql_copy/README.md create mode 100644 plugins/outputs/postgresql_copy/postgresql_copy.go create mode 100644 plugins/outputs/postgresql_copy/postgresql_copy_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index a5d2a44daad59..771c9c4ac9adb 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -26,6 +26,8 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/nsq" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" + _ "github.com/influxdata/telegraf/plugins/outputs/postgresql_batch" + _ "github.com/influxdata/telegraf/plugins/outputs/postgresql_copy" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" diff --git a/plugins/outputs/postgresql_batch/README.md b/plugins/outputs/postgresql_batch/README.md new file mode 100644 index 0000000000000..9087cdd7281c0 --- /dev/null +++ b/plugins/outputs/postgresql_batch/README.md @@ -0,0 +1,13 @@ +# PostgreSQL Output Plugin + +This output plugin writes all metrics to PostgreSQL in batch. + +Obs: Currently, you should create your PostgreSQL tables first + +### Configuration: + +```toml +# Send metrics to PostgreSQL using batch (grouped insert) +[[outputs.postgresql_batch]] + address = "host=localhost user=postgres sslmode=verify-full" +``` \ No newline at end of file diff --git a/plugins/outputs/postgresql_batch/postgresql_batch.go b/plugins/outputs/postgresql_batch/postgresql_batch.go new file mode 100644 index 0000000000000..576ca19ca95a3 --- /dev/null +++ b/plugins/outputs/postgresql_batch/postgresql_batch.go @@ -0,0 +1,132 @@ +package postgresql_batch + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type PostgresqlBatch struct { + db *sql.DB + Address string + Inserts map[string]string + Columns map[string][]string +} + +func (p *PostgresqlBatch) Connect() error { + db, err := sql.Open("pgx", p.Address) + if err != nil { + return err + } + p.db = db + + p.Inserts = make(map[string]string) + p.Columns = make(map[string][]string) + + return nil +} + +func (p *PostgresqlBatch) Close() error { + return p.db.Close() +} + +func quoteIdent(name string) string { + return pgx.Identifier{name}.Sanitize() +} + +var sampleConfig = ` + # Send metrics to PostgreSQL using COPY + [[outputs.postgresql_batch]] + address = "host=localhost user=postgres sslmode=verify-full" +` + +func (p *PostgresqlBatch) SampleConfig() string { return sampleConfig } +func (p *PostgresqlBatch) Description() string { return "Send metrics to Postgresql in batch" } + +func (p *PostgresqlBatch) generateInsert(tablename string, columns []string) string { + var quoted []string + for _, column := range columns { + quoted = append(quoted, quoteIdent(column)) + } + + return fmt.Sprintf("INSERT INTO %s(%s) VALUES ", quoteIdent(tablename), strings.Join(quoted, ",")) +} + +func (p *PostgresqlBatch) buildTableInsert(metric telegraf.Metric) { + table := metric.Name() + if p.Inserts[table] == "" { + for key, _ := range metric.Tags() { + p.Columns[table] = append(p.Columns[table], key) + } + for key, _ := range metric.Fields() { + p.Columns[table] = append(p.Columns[table], key) + } + p.Inserts[table] = p.generateInsert(table, append(p.Columns[table], "time")) + } +} + +func quoted(value interface{}) interface{} { + switch value.(type) { + case string: + return "'" + value.(string) + "'" + case time.Time: + return quoted(value.(time.Time).Format("2006-01-02 15:04:05")) + default: + return value + } +} + +func joinValues(values []interface{}) string { + strs := make([]string, len(values)) + for i, value := range values { + strs[i] = fmt.Sprintf("%v", value) + } + return strings.Join(strs, ", ") +} + +func buildValues(metric telegraf.Metric, columns []string) string { + var values []interface{} + mapString := metric.Tags() + for key, value := range metric.Fields() { + mapString[key] = fmt.Sprintf("%v", value) + } + for _, column := range columns { + values = append(values, quoted(mapString[column])) + } + values = append(values, quoted(metric.Time())) + return "(" + joinValues(values) + ")" +} + +func (p *PostgresqlBatch) Write(metrics []telegraf.Metric) error { + values := make(map[string][]string) + for _, metric := range metrics { + p.buildTableInsert(metric) + table := metric.Name() + values[table] = append(values[table], buildValues(metric, p.Columns[table])) + } + for table, values := range values { + if len(values) == 0 { + continue + } + sql := p.Inserts[table] + strings.Join(values, ",") + _, err := p.db.Exec(sql) + if err != nil { + return err + } + } + return nil +} + +func init() { + outputs.Add("postgresql_batch", func() telegraf.Output { return newPostgresqlBatch() }) +} + +func newPostgresqlBatch() *PostgresqlBatch { + return &PostgresqlBatch{} +} diff --git a/plugins/outputs/postgresql_batch/postgresql_batch_test.go b/plugins/outputs/postgresql_batch/postgresql_batch_test.go new file mode 100644 index 0000000000000..fa50b0c246527 --- /dev/null +++ b/plugins/outputs/postgresql_batch/postgresql_batch_test.go @@ -0,0 +1,47 @@ +package postgresql_batch + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + + "github.com/stretchr/testify/assert" +) + +func TestBuildInsert(t *testing.T) { + table := "cpu_usage" + timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) + tags := map[string]string{"host": "address", "zone": "west"} + fields := map[string]interface{}{"cpu_perc": float64(0.2)} + m, _ := metric.New(table, tags, fields, timestamp) + + p := newPostgresqlBatch() + p.Inserts = make(map[string]string) + assert.Empty(t, p.Inserts[table]) + p.Columns = make(map[string][]string) + assert.Empty(t, p.Columns[table]) + + p.buildTableInsert(m) + assert.Equal(t, len(p.Columns[table]), 3) + assert.Equal(t, p.Columns[table][0], "host") + assert.Equal(t, p.Columns[table][1], "zone") + assert.Equal(t, p.Columns[table][2], "cpu_perc") + assert.Equal(t, p.Inserts[table], "INSERT INTO \"" + table + "\"(\"host\",\"zone\",\"cpu_perc\",\"time\") VALUES ") +} + +func TestBuildValues(t *testing.T) { + table := "cpu_usage" + timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) + tags := map[string]string{"host": "address", "zone": "west"} + fields := map[string]interface{}{"cpu_perc": float64(0.2)} + m, _ := metric.New(table, tags, fields, timestamp) + + p := newPostgresqlBatch() + p.Inserts = make(map[string]string) + p.Columns = make(map[string][]string) + + p.buildTableInsert(m) + values := buildValues(m, p.Columns[table]) + assert.Equal(t, values, "('address', 'west', '0.2', '2010-11-10 21:00:00')") +} \ No newline at end of file diff --git a/plugins/outputs/postgresql_copy/README.md b/plugins/outputs/postgresql_copy/README.md new file mode 100644 index 0000000000000..799feebe4640c --- /dev/null +++ b/plugins/outputs/postgresql_copy/README.md @@ -0,0 +1,13 @@ +# PostgreSQL Output Plugin + +This output plugin writes all metrics to PostgreSQL using CopyIn. + +Obs: Currently, you should create your PostgreSQL tables first + +### Configuration: + +```toml +# Send metrics to PostgreSQL using CopyIn +[[outputs.postgresql_copy]] + address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" +``` diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go new file mode 100644 index 0000000000000..00c961483591a --- /dev/null +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -0,0 +1,108 @@ +package postgresql_copy + +import ( + "database/sql" + + "github.com/lib/pq" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type PostgresqlCopy struct { + db *sql.DB + Address string + Columns map[string][]string +} + +func (p *PostgresqlCopy) Connect() error { + db, err := sql.Open("postgres", p.Address) + if err != nil { + return err + } + p.db = db + p.Columns = make(map[string][]string) + return nil +} + +func (p *PostgresqlCopy) Close() error { + return p.db.Close() +} + +var sampleConfig = ` + # Send metrics to PostgreSQL using COPY + [[outputs.postgresql_copy]] + address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" +` + +func (p *PostgresqlCopy) SampleConfig() string { return sampleConfig } +func (p *PostgresqlCopy) Description() string { return "Send metrics to Postgres using Copy" } + +func (p *PostgresqlCopy) buildColumns(table string, metric telegraf.Metric) { + if len(p.Columns[table]) != 0 { + return + } + for key, _ := range metric.Fields() { + p.Columns[table] = append(p.Columns[table], key) + } + for key, _ := range metric.Tags() { + p.Columns[table] = append(p.Columns[table], key) + } +} + +func buildValues(metric telegraf.Metric, columns []string) []interface{} { + var values []interface{} + all_metric := metric.Fields() + for key, value := range metric.Tags() { + all_metric[key] = value + } + for _, column := range columns { + values = append(values, all_metric[column]) + } + values = append(values, metric.Time()) + return values +} + +func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { + tables := make(map[string][][]interface{}) + for _, metric := range metrics { + table := metric.Name() + p.buildColumns(table, metric) + tables[table] = append(tables[table], buildValues(metric, p.Columns[table])) + } + + txn, err := p.db.Begin() + if err != nil { + return err + } + for table, values := range tables { + if len(values) == 0 { + continue + } + columns := append(p.Columns[table], "time") + stmt, _ := txn.Prepare(pq.CopyIn(table, columns...)) + for _, value := range values { + _, err = stmt.Exec(value...) + if err != nil { + return err + } + } + _, err = stmt.Exec() + if err != nil { + return err + } + } + err = txn.Commit() + if err != nil { + return err + } + return nil +} + +func init() { + outputs.Add("postgresql_copy", func() telegraf.Output { return newPostgresqlCopy() }) +} + +func newPostgresqlCopy() *PostgresqlCopy { + return &PostgresqlCopy{} +} diff --git a/plugins/outputs/postgresql_copy/postgresql_copy_test.go b/plugins/outputs/postgresql_copy/postgresql_copy_test.go new file mode 100644 index 0000000000000..7083f6c7460b7 --- /dev/null +++ b/plugins/outputs/postgresql_copy/postgresql_copy_test.go @@ -0,0 +1,48 @@ +package postgresql_copy + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + + "github.com/stretchr/testify/assert" +) + +func TestBuildColumns(t *testing.T) { + table := "cpu_usage" + timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) + tags := map[string]string{"host": "address", "zone": "west"} + fields := map[string]interface{}{"cpu_perc": float64(0.2)} + m, _ := metric.New(table, tags, fields, timestamp) + + p := newPostgresqlCopy() + p.Columns = make(map[string][]string) + assert.Empty(t, p.Columns[table]) + + p.buildColumns(table, m) + assert.Equal(t, len(p.Columns[table]), 3) + assert.Equal(t, p.Columns[table][0], "cpu_perc") + assert.Equal(t, p.Columns[table][1], "host") + assert.Equal(t, p.Columns[table][2], "zone") +} + + +func TestBuildValues(t *testing.T) { + timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) + table := "cpu_usage" + tags := map[string]string{"host": "address", "zone": "west"} + fields := map[string]interface{}{"cpu_perc": float64(0.2)} + m, _ := metric.New(table, tags, fields, timestamp) + + p := newPostgresqlCopy() + p.Columns = make(map[string][]string) + p.buildColumns(table, m) + + values := buildValues(m, p.Columns[table]) + assert.Equal(t, len(values), 4) + assert.Equal(t, values[0], 0.2) + assert.Equal(t, values[1], "address") + assert.Equal(t, values[2], "west") + assert.Equal(t, values[3], m.Time()) +} \ No newline at end of file From fd73a01ebb221d2fb4f68362651904863d7809df Mon Sep 17 00:00:00 2001 From: otherpirate Date: Tue, 20 Mar 2018 15:56:11 -0300 Subject: [PATCH 02/12] Add lib/pq deps --- Godeps | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 Godeps diff --git a/Godeps b/Godeps new file mode 100644 index 0000000000000..ab0c59862242d --- /dev/null +++ b/Godeps @@ -0,0 +1,97 @@ +collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 +github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c +github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 +github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 +github.com/aws/aws-sdk-go c861d27d0304a79f727e9a8a4e2ac1e74602fdc0 +github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 +github.com/bsm/sarama-cluster abf039439f66c1ce78017f560b490612552f6472 +github.com/cenkalti/backoff b02f2bbce11d7ea6b97f282ef1771b0fe2f65ef3 +github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28 +github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29 +github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6 +github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 +github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29 +github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27 +github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a +github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 +github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c +github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 +github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 +github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 +github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a +github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 +github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 +github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 +github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 +github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 +github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc +github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea +github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 +github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 +github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 +github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f +github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea +github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f +github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec +github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 +github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d +github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 +github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 +github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 +github.com/lib/pq b2004221932bd6b13167ef654c81cffac36f7537 +github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c +github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 +github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 +github.com/mitchellh/mapstructure d0303fe809921458f417bcf828397a65db30a7e4 +github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e +github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b +github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b +github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 +github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 +github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f +github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f +github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 +github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c +github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 +github.com/openzipkin/zipkin-go-opentracing 1cafbdfde94fbf2b373534764e0863aa3bd0bf7b +github.com/pierrec/lz4 5c9560bfa9ace2bf86080bf40d46b34ae44604df +github.com/pierrec/xxHash 5a004441f897722c627870a981d02b29924215fa +github.com/pkg/errors 645ef00459ed84a119197bfb8d8205042c6df63d +github.com/pmezard/go-difflib/difflib 792786c7400a136282c1664665ae0a8db921c6c2 +github.com/prometheus/client_golang c317fb74746eac4fc65fe3909195f4cf67c5562a +github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 +github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4 +github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 +github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c +github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 +github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b +github.com/shirou/gopsutil 384a55110aa5ae052eb93ea94940548c1e305a99 +github.com/shirou/w32 3c9377fc6748f222729a8270fe2775d149a249ad +github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 +github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d +github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 +github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 +github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 +github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 +github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 +github.com/tidwall/gjson 0623bd8fbdbf97cc62b98d15108832851a658e59 +github.com/tidwall/match 173748da739a410c5b0b813b956f89ff94730b4c +github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe +github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee +github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 +github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a +github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 +golang.org/x/crypto dc137beb6cce2043eb6b5f223ab8bf51c32459f4 +golang.org/x/net f2499483f923065a842d38eb4c7f1927e6fc6e6d +golang.org/x/sys 739734461d1c916b6c72a63d7efda2b27edb369f +golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3 +gopkg.in/asn1-ber.v1 4e86f4367175e39f69d9358a5f17b4dda270378d +gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc +gopkg.in/fsnotify.v1 a8a77c9133d2d6fd8334f3260d06f60e8d80a5fb +gopkg.in/gorethink/gorethink.v3 7ab832f7b65573104a555d84a27992ae9ea1f659 +gopkg.in/ldap.v2 8168ee085ee43257585e50c6441aadf54ecb2c9f +gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 +gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 +gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 +gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 From a1902f89bfe6a96710b109ef7507dd295dc73bb5 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Tue, 20 Mar 2018 16:05:08 -0300 Subject: [PATCH 03/12] Fix unstable tests --- .../postgresql_batch/postgresql_batch_test.go | 17 ++++++++++++----- .../postgresql_copy/postgresql_copy_test.go | 8 ++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/plugins/outputs/postgresql_batch/postgresql_batch_test.go b/plugins/outputs/postgresql_batch/postgresql_batch_test.go index fa50b0c246527..4138f61766787 100644 --- a/plugins/outputs/postgresql_batch/postgresql_batch_test.go +++ b/plugins/outputs/postgresql_batch/postgresql_batch_test.go @@ -24,10 +24,14 @@ func TestBuildInsert(t *testing.T) { p.buildTableInsert(m) assert.Equal(t, len(p.Columns[table]), 3) - assert.Equal(t, p.Columns[table][0], "host") - assert.Equal(t, p.Columns[table][1], "zone") - assert.Equal(t, p.Columns[table][2], "cpu_perc") - assert.Equal(t, p.Inserts[table], "INSERT INTO \"" + table + "\"(\"host\",\"zone\",\"cpu_perc\",\"time\") VALUES ") + assert.Contains(t, p.Columns[table], "host") + assert.Contains(t, p.Columns[table], "zone") + assert.Contains(t, p.Columns[table], "cpu_perc") + assert.Contains(t, p.Inserts[table], "INSERT INTO \"" + table + "\"(") + assert.Contains(t, p.Inserts[table], "\"host\"") + assert.Contains(t, p.Inserts[table], "\"zone\"") + assert.Contains(t, p.Inserts[table], "\"cpu_perc\"") + assert.Contains(t, p.Inserts[table], ",\"time\") VALUES ") } func TestBuildValues(t *testing.T) { @@ -43,5 +47,8 @@ func TestBuildValues(t *testing.T) { p.buildTableInsert(m) values := buildValues(m, p.Columns[table]) - assert.Equal(t, values, "('address', 'west', '0.2', '2010-11-10 21:00:00')") + assert.Contains(t, values, "'address'") + assert.Contains(t, values, "'west'") + assert.Contains(t, values, "'0.2'") + assert.Contains(t, values, "'2010-11-10") } \ No newline at end of file diff --git a/plugins/outputs/postgresql_copy/postgresql_copy_test.go b/plugins/outputs/postgresql_copy/postgresql_copy_test.go index 7083f6c7460b7..78a6a0f49bde9 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy_test.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy_test.go @@ -23,8 +23,8 @@ func TestBuildColumns(t *testing.T) { p.buildColumns(table, m) assert.Equal(t, len(p.Columns[table]), 3) assert.Equal(t, p.Columns[table][0], "cpu_perc") - assert.Equal(t, p.Columns[table][1], "host") - assert.Equal(t, p.Columns[table][2], "zone") + assert.Contains(t, p.Columns[table], "host") + assert.Contains(t, p.Columns[table], "zone") } @@ -42,7 +42,7 @@ func TestBuildValues(t *testing.T) { values := buildValues(m, p.Columns[table]) assert.Equal(t, len(values), 4) assert.Equal(t, values[0], 0.2) - assert.Equal(t, values[1], "address") - assert.Equal(t, values[2], "west") + assert.Contains(t, values, "address") + assert.Contains(t, values, "west") assert.Equal(t, values[3], m.Time()) } \ No newline at end of file From 0c1ec93b0175b406ab04f552468eb7fc90c7faef Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 13 Apr 2018 14:04:01 -0300 Subject: [PATCH 04/12] handle with null tags and fields --- plugins/outputs/postgresql_copy/README.md | 1 + .../postgresql_copy/postgresql_copy.go | 61 ++++++++++++------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/plugins/outputs/postgresql_copy/README.md b/plugins/outputs/postgresql_copy/README.md index 799feebe4640c..282bec30f837f 100644 --- a/plugins/outputs/postgresql_copy/README.md +++ b/plugins/outputs/postgresql_copy/README.md @@ -10,4 +10,5 @@ Obs: Currently, you should create your PostgreSQL tables first # Send metrics to PostgreSQL using CopyIn [[outputs.postgresql_copy]] address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" + ignore_inser_errors = false ``` diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index 00c961483591a..a0fe7d899e5b1 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -2,17 +2,18 @@ package postgresql_copy import ( "database/sql" - - "github.com/lib/pq" + "log" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/lib/pq" ) type PostgresqlCopy struct { - db *sql.DB - Address string - Columns map[string][]string + db *sql.DB + Address string + Columns map[string][]string + IgnoreInserErrors bool } func (p *PostgresqlCopy) Connect() error { @@ -21,7 +22,6 @@ func (p *PostgresqlCopy) Connect() error { return err } p.db = db - p.Columns = make(map[string][]string) return nil } @@ -38,15 +38,26 @@ var sampleConfig = ` func (p *PostgresqlCopy) SampleConfig() string { return sampleConfig } func (p *PostgresqlCopy) Description() string { return "Send metrics to Postgres using Copy" } -func (p *PostgresqlCopy) buildColumns(table string, metric telegraf.Metric) { - if len(p.Columns[table]) != 0 { - return - } - for key, _ := range metric.Fields() { - p.Columns[table] = append(p.Columns[table], key) +func (p *PostgresqlCopy) buildColumns(metrics []telegraf.Metric) { + table_columns := make(map[string]map[string]bool) + for _, metric := range metrics { + table := metric.Name() + if table_columns[table] == nil { + table_columns[table] = map[string]bool{} + } + for key, _ := range metric.Fields() { + table_columns[table][key] = true + } + for key, _ := range metric.Tags() { + table_columns[table][key] = true + } } - for key, _ := range metric.Tags() { - p.Columns[table] = append(p.Columns[table], key) + + p.Columns = make(map[string][]string) + for table, columns := range table_columns { + for column := range columns { + p.Columns[table] = append(p.Columns[table], column) + } } } @@ -64,10 +75,10 @@ func buildValues(metric telegraf.Metric, columns []string) []interface{} { } func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { + p.buildColumns(metrics) tables := make(map[string][][]interface{}) for _, metric := range metrics { table := metric.Name() - p.buildColumns(table, metric) tables[table] = append(tables[table], buildValues(metric, p.Columns[table])) } @@ -80,12 +91,21 @@ func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { continue } columns := append(p.Columns[table], "time") - stmt, _ := txn.Prepare(pq.CopyIn(table, columns...)) + stmt, err := txn.Prepare(pq.CopyIn(table, columns...)) + if err != nil { + return err + } for _, value := range values { _, err = stmt.Exec(value...) - if err != nil { - return err + if err == nil { + continue + } + + if p.IgnoreInserErrors { + log.Printf("E! Could not insert into %s: %s", table, values) + continue } + return err } _, err = stmt.Exec() if err != nil { @@ -93,10 +113,7 @@ func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { } } err = txn.Commit() - if err != nil { - return err - } - return nil + return err } func init() { From d6964d05a0ef9a504a2efbd7e3afb9363189bdd9 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Thu, 19 Apr 2018 16:58:11 -0300 Subject: [PATCH 05/12] Fix inser to insert --- plugins/outputs/postgresql_copy/README.md | 2 +- .../postgresql_copy/postgresql_copy.go | 25 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/plugins/outputs/postgresql_copy/README.md b/plugins/outputs/postgresql_copy/README.md index 282bec30f837f..90e61cb937877 100644 --- a/plugins/outputs/postgresql_copy/README.md +++ b/plugins/outputs/postgresql_copy/README.md @@ -10,5 +10,5 @@ Obs: Currently, you should create your PostgreSQL tables first # Send metrics to PostgreSQL using CopyIn [[outputs.postgresql_copy]] address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" - ignore_inser_errors = false + ignore_insert_errors = false ``` diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index a0fe7d899e5b1..da55397729528 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -10,10 +10,10 @@ import ( ) type PostgresqlCopy struct { - db *sql.DB - Address string - Columns map[string][]string - IgnoreInserErrors bool + db *sql.DB + Address string + Columns map[string][]string + IgnoreInsertErrors bool } func (p *PostgresqlCopy) Connect() error { @@ -101,18 +101,29 @@ func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { continue } - if p.IgnoreInserErrors { + if p.IgnoreInsertErrors { log.Printf("E! Could not insert into %s: %s", table, values) continue } return err } _, err = stmt.Exec() - if err != nil { - return err + if err == nil { + continue + } + if p.IgnoreInsertErrors { + log.Printf("E! Error in stmt execute %s", err) + continue } } err = txn.Commit() + if err == nil { + return nil + } + if p.IgnoreInsertErrors { + log.Printf("E! Error in commit %s", err) + return nil + } return err } From 4e98ce2b17e51d2e71ebb12e529caea04b2f4aad Mon Sep 17 00:00:00 2001 From: otherpirate Date: Tue, 24 Apr 2018 18:50:31 -0300 Subject: [PATCH 06/12] Improve ignore errors param --- plugins/outputs/postgresql_copy/postgresql_copy.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index da55397729528..363a5689568ff 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -90,11 +90,18 @@ func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { if len(values) == 0 { continue } + columns := append(p.Columns[table], "time") stmt, err := txn.Prepare(pq.CopyIn(table, columns...)) if err != nil { - return err + if p.IgnoreInsertErrors == false { + log.Printf("E! Error in stmt execute %s", err) + continue + } else { + return err + } } + for _, value := range values { _, err = stmt.Exec(value...) if err == nil { From a304af4a264afdbb293960bf2dbd47797a7d6885 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Thu, 3 May 2018 10:39:01 -0300 Subject: [PATCH 07/12] Fix ignore error conditions --- plugins/outputs/postgresql_copy/postgresql_copy.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index 363a5689568ff..a1deb5ce8ba29 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -94,8 +94,9 @@ func (p *PostgresqlCopy) Write(metrics []telegraf.Metric) error { columns := append(p.Columns[table], "time") stmt, err := txn.Prepare(pq.CopyIn(table, columns...)) if err != nil { - if p.IgnoreInsertErrors == false { + if p.IgnoreInsertErrors { log.Printf("E! Error in stmt execute %s", err) + txn, err = p.db.Begin() continue } else { return err From 9c40df4fd9525b683b93f1c03bd9f9935796b54b Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 15 Mar 2019 13:57:10 -0300 Subject: [PATCH 08/12] Remove deps + improve config generator --- Godeps | 97 ------------------- .../postgresql_batch/postgresql_batch.go | 19 ++-- .../postgresql_copy/postgresql_copy.go | 3 +- 3 files changed, 10 insertions(+), 109 deletions(-) delete mode 100644 Godeps diff --git a/Godeps b/Godeps deleted file mode 100644 index ab0c59862242d..0000000000000 --- a/Godeps +++ /dev/null @@ -1,97 +0,0 @@ -collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 -github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c -github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 -github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 -github.com/aws/aws-sdk-go c861d27d0304a79f727e9a8a4e2ac1e74602fdc0 -github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 -github.com/bsm/sarama-cluster abf039439f66c1ce78017f560b490612552f6472 -github.com/cenkalti/backoff b02f2bbce11d7ea6b97f282ef1771b0fe2f65ef3 -github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28 -github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29 -github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6 -github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 -github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29 -github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27 -github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a -github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 -github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c -github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 -github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 -github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 -github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a -github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 -github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 -github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 -github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 -github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 -github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc -github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea -github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 -github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 -github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f -github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea -github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f -github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec -github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 -github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d -github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 -github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 -github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 -github.com/lib/pq b2004221932bd6b13167ef654c81cffac36f7537 -github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c -github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 -github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 -github.com/mitchellh/mapstructure d0303fe809921458f417bcf828397a65db30a7e4 -github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e -github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b -github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b -github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 -github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 -github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f -github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f -github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 -github.com/opentracing-contrib/go-observer a52f2342449246d5bcc273e65cbdcfa5f7d6c63c -github.com/opentracing/opentracing-go 06f47b42c792fef2796e9681353e1d908c417827 -github.com/openzipkin/zipkin-go-opentracing 1cafbdfde94fbf2b373534764e0863aa3bd0bf7b -github.com/pierrec/lz4 5c9560bfa9ace2bf86080bf40d46b34ae44604df -github.com/pierrec/xxHash 5a004441f897722c627870a981d02b29924215fa -github.com/pkg/errors 645ef00459ed84a119197bfb8d8205042c6df63d -github.com/pmezard/go-difflib/difflib 792786c7400a136282c1664665ae0a8db921c6c2 -github.com/prometheus/client_golang c317fb74746eac4fc65fe3909195f4cf67c5562a -github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 -github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4 -github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 -github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c -github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 -github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b -github.com/shirou/gopsutil 384a55110aa5ae052eb93ea94940548c1e305a99 -github.com/shirou/w32 3c9377fc6748f222729a8270fe2775d149a249ad -github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 -github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d -github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 -github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 -github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 -github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 -github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 -github.com/tidwall/gjson 0623bd8fbdbf97cc62b98d15108832851a658e59 -github.com/tidwall/match 173748da739a410c5b0b813b956f89ff94730b4c -github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe -github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee -github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 -github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a -github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 -golang.org/x/crypto dc137beb6cce2043eb6b5f223ab8bf51c32459f4 -golang.org/x/net f2499483f923065a842d38eb4c7f1927e6fc6e6d -golang.org/x/sys 739734461d1c916b6c72a63d7efda2b27edb369f -golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3 -gopkg.in/asn1-ber.v1 4e86f4367175e39f69d9358a5f17b4dda270378d -gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc -gopkg.in/fsnotify.v1 a8a77c9133d2d6fd8334f3260d06f60e8d80a5fb -gopkg.in/gorethink/gorethink.v3 7ab832f7b65573104a555d84a27992ae9ea1f659 -gopkg.in/ldap.v2 8168ee085ee43257585e50c6441aadf54ecb2c9f -gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 -gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 -gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 -gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 diff --git a/plugins/outputs/postgresql_batch/postgresql_batch.go b/plugins/outputs/postgresql_batch/postgresql_batch.go index 576ca19ca95a3..f6627bb45bfbf 100644 --- a/plugins/outputs/postgresql_batch/postgresql_batch.go +++ b/plugins/outputs/postgresql_batch/postgresql_batch.go @@ -42,8 +42,7 @@ func quoteIdent(name string) string { var sampleConfig = ` # Send metrics to PostgreSQL using COPY - [[outputs.postgresql_batch]] - address = "host=localhost user=postgres sslmode=verify-full" + address = "host=localhost user=postgres sslmode=verify-full" ` func (p *PostgresqlBatch) SampleConfig() string { return sampleConfig } @@ -73,12 +72,12 @@ func (p *PostgresqlBatch) buildTableInsert(metric telegraf.Metric) { func quoted(value interface{}) interface{} { switch value.(type) { - case string: - return "'" + value.(string) + "'" - case time.Time: - return quoted(value.(time.Time).Format("2006-01-02 15:04:05")) - default: - return value + case string: + return "'" + value.(string) + "'" + case time.Time: + return quoted(value.(time.Time).Format("2006-01-02 15:04:05")) + default: + return value } } @@ -94,8 +93,8 @@ func buildValues(metric telegraf.Metric, columns []string) string { var values []interface{} mapString := metric.Tags() for key, value := range metric.Fields() { - mapString[key] = fmt.Sprintf("%v", value) - } + mapString[key] = fmt.Sprintf("%v", value) + } for _, column := range columns { values = append(values, quoted(mapString[column])) } diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index a1deb5ce8ba29..47e209fddb557 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -31,8 +31,7 @@ func (p *PostgresqlCopy) Close() error { var sampleConfig = ` # Send metrics to PostgreSQL using COPY - [[outputs.postgresql_copy]] - address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" + address = "postgres://USER:PWD@HOST:PORT/DATABASE?sslmode=disable" ` func (p *PostgresqlCopy) SampleConfig() string { return sampleConfig } From b35db115e7356c62bbf2037536d9af4b69c8b28a Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 15 Mar 2019 14:03:36 -0300 Subject: [PATCH 09/12] gopkc.lock add lib/pq --- Gopkg.lock | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index 47feeb386f7e1..9aa729e30cb96 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -730,6 +730,17 @@ pruneopts = "" revision = "299bdde78165d4ca4bc7d064d8d6a4f39ac6de8c" +[[projects]] + digest = "1:29145d7af4adafd72a79df5e41456ac9e232d5a28c1cd4dacf3ff008a217fc10" + name = "github.com/lib/pq" + packages = [ + ".", + "oid", + ] + pruneopts = "" + revision = "4ded0e9383f75c197b3a2aaa6d590ac52df6fd79" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:7e9956922e349af0190afa0b6621befcd201072679d8e51a9047ff149f2afe93" @@ -1569,6 +1580,7 @@ "github.com/karrick/godirwalk", "github.com/kballard/go-shellquote", "github.com/kubernetes/apimachinery/pkg/api/resource", + "github.com/lib/pq", "github.com/matttproud/golang_protobuf_extensions/pbutil", "github.com/miekg/dns", "github.com/multiplay/go-ts3", From 7168b4e0698336c6e6dbe04f07834220634d7403 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 15 Mar 2019 14:07:43 -0300 Subject: [PATCH 10/12] Fix fmt --- plugins/outputs/all/all.go | 2 +- plugins/outputs/postgresql_batch/postgresql_batch.go | 4 ++-- plugins/outputs/postgresql_batch/postgresql_batch_test.go | 4 ++-- plugins/outputs/postgresql_copy/postgresql_copy.go | 4 ++-- plugins/outputs/postgresql_copy/postgresql_copy_test.go | 5 ++--- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 771c9c4ac9adb..fa9f1fee2642e 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -25,9 +25,9 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/nsq" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" - _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/postgresql_batch" _ "github.com/influxdata/telegraf/plugins/outputs/postgresql_copy" + _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" diff --git a/plugins/outputs/postgresql_batch/postgresql_batch.go b/plugins/outputs/postgresql_batch/postgresql_batch.go index f6627bb45bfbf..12df4a47dda88 100644 --- a/plugins/outputs/postgresql_batch/postgresql_batch.go +++ b/plugins/outputs/postgresql_batch/postgresql_batch.go @@ -60,10 +60,10 @@ func (p *PostgresqlBatch) generateInsert(tablename string, columns []string) str func (p *PostgresqlBatch) buildTableInsert(metric telegraf.Metric) { table := metric.Name() if p.Inserts[table] == "" { - for key, _ := range metric.Tags() { + for key := range metric.Tags() { p.Columns[table] = append(p.Columns[table], key) } - for key, _ := range metric.Fields() { + for key := range metric.Fields() { p.Columns[table] = append(p.Columns[table], key) } p.Inserts[table] = p.generateInsert(table, append(p.Columns[table], "time")) diff --git a/plugins/outputs/postgresql_batch/postgresql_batch_test.go b/plugins/outputs/postgresql_batch/postgresql_batch_test.go index 4138f61766787..a4fdf7648d660 100644 --- a/plugins/outputs/postgresql_batch/postgresql_batch_test.go +++ b/plugins/outputs/postgresql_batch/postgresql_batch_test.go @@ -27,7 +27,7 @@ func TestBuildInsert(t *testing.T) { assert.Contains(t, p.Columns[table], "host") assert.Contains(t, p.Columns[table], "zone") assert.Contains(t, p.Columns[table], "cpu_perc") - assert.Contains(t, p.Inserts[table], "INSERT INTO \"" + table + "\"(") + assert.Contains(t, p.Inserts[table], "INSERT INTO \""+table+"\"(") assert.Contains(t, p.Inserts[table], "\"host\"") assert.Contains(t, p.Inserts[table], "\"zone\"") assert.Contains(t, p.Inserts[table], "\"cpu_perc\"") @@ -51,4 +51,4 @@ func TestBuildValues(t *testing.T) { assert.Contains(t, values, "'west'") assert.Contains(t, values, "'0.2'") assert.Contains(t, values, "'2010-11-10") -} \ No newline at end of file +} diff --git a/plugins/outputs/postgresql_copy/postgresql_copy.go b/plugins/outputs/postgresql_copy/postgresql_copy.go index 47e209fddb557..caf4decb427b1 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy.go @@ -44,10 +44,10 @@ func (p *PostgresqlCopy) buildColumns(metrics []telegraf.Metric) { if table_columns[table] == nil { table_columns[table] = map[string]bool{} } - for key, _ := range metric.Fields() { + for key := range metric.Fields() { table_columns[table][key] = true } - for key, _ := range metric.Tags() { + for key := range metric.Tags() { table_columns[table][key] = true } } diff --git a/plugins/outputs/postgresql_copy/postgresql_copy_test.go b/plugins/outputs/postgresql_copy/postgresql_copy_test.go index 78a6a0f49bde9..705cd9feddec6 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy_test.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy_test.go @@ -27,14 +27,13 @@ func TestBuildColumns(t *testing.T) { assert.Contains(t, p.Columns[table], "zone") } - func TestBuildValues(t *testing.T) { timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) table := "cpu_usage" tags := map[string]string{"host": "address", "zone": "west"} fields := map[string]interface{}{"cpu_perc": float64(0.2)} m, _ := metric.New(table, tags, fields, timestamp) - + p := newPostgresqlCopy() p.Columns = make(map[string][]string) p.buildColumns(table, m) @@ -45,4 +44,4 @@ func TestBuildValues(t *testing.T) { assert.Contains(t, values, "address") assert.Contains(t, values, "west") assert.Equal(t, values[3], m.Time()) -} \ No newline at end of file +} From 13f5574c8f108ca68c7f42e7cceda4aad3b74da9 Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 15 Mar 2019 14:48:08 -0300 Subject: [PATCH 11/12] Fix test --- plugins/outputs/postgresql_copy/postgresql_copy_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/postgresql_copy/postgresql_copy_test.go b/plugins/outputs/postgresql_copy/postgresql_copy_test.go index 705cd9feddec6..b3b695152b7bf 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy_test.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" @@ -20,7 +21,7 @@ func TestBuildColumns(t *testing.T) { p.Columns = make(map[string][]string) assert.Empty(t, p.Columns[table]) - p.buildColumns(table, m) + p.buildColumns([]telegraf.Metric{m}) assert.Equal(t, len(p.Columns[table]), 3) assert.Equal(t, p.Columns[table][0], "cpu_perc") assert.Contains(t, p.Columns[table], "host") @@ -36,7 +37,7 @@ func TestBuildValues(t *testing.T) { p := newPostgresqlCopy() p.Columns = make(map[string][]string) - p.buildColumns(table, m) + p.buildColumns([]telegraf.Metric{m}) values := buildValues(m, p.Columns[table]) assert.Equal(t, len(values), 4) From 18a98cfa96f6bc4d3a0835a08809dbbc65c9a74e Mon Sep 17 00:00:00 2001 From: otherpirate Date: Fri, 15 Mar 2019 15:14:43 -0300 Subject: [PATCH 12/12] Fix test --- plugins/outputs/postgresql_copy/postgresql_copy_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/outputs/postgresql_copy/postgresql_copy_test.go b/plugins/outputs/postgresql_copy/postgresql_copy_test.go index b3b695152b7bf..439abb63061b5 100644 --- a/plugins/outputs/postgresql_copy/postgresql_copy_test.go +++ b/plugins/outputs/postgresql_copy/postgresql_copy_test.go @@ -23,7 +23,7 @@ func TestBuildColumns(t *testing.T) { p.buildColumns([]telegraf.Metric{m}) assert.Equal(t, len(p.Columns[table]), 3) - assert.Equal(t, p.Columns[table][0], "cpu_perc") + assert.Contains(t, p.Columns[table], "cpu_perc") assert.Contains(t, p.Columns[table], "host") assert.Contains(t, p.Columns[table], "zone") } @@ -41,8 +41,8 @@ func TestBuildValues(t *testing.T) { values := buildValues(m, p.Columns[table]) assert.Equal(t, len(values), 4) - assert.Equal(t, values[0], 0.2) assert.Contains(t, values, "address") assert.Contains(t, values, "west") - assert.Equal(t, values[3], m.Time()) + assert.Contains(t, values, 0.2) + assert.Contains(t, values, m.Time()) }