Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: example in readme #2

Open
wants to merge 125 commits into
base: postgres
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
a1bd367
initial import of postgres output plugin
svenklemm Oct 19, 2017
0afa350
make address configurable
svenklemm Oct 20, 2017
bcca5c8
add helper functions for create table and insert
svenklemm Oct 22, 2017
a13ac44
add tests
svenklemm Oct 22, 2017
1a6b1c4
fix sql placeholder
svenklemm Oct 24, 2017
5ea8868
let pgx handle time conversion, remove old code
svenklemm Oct 24, 2017
8aa03cf
adjust test cases to datatype changes
svenklemm Oct 24, 2017
749e757
remove debug prints
svenklemm Oct 27, 2017
e157258
check if table exists before creating
svenklemm Oct 27, 2017
05df41b
allow skipping tags
svenklemm Oct 28, 2017
4c35bff
refactoring
svenklemm Nov 5, 2017
07dcf4e
implement TagsAsForeignkeys
svenklemm Nov 5, 2017
8348b7e
fix tests
svenklemm Nov 5, 2017
feee01c
add SampleConfig
svenklemm Nov 5, 2017
7b0315b
register driver
svenklemm Nov 5, 2017
cd4363f
update README
svenklemm Nov 5, 2017
8d5ad62
prepare for create table template
svenklemm Nov 6, 2017
a1e6b9a
quote identifier
svenklemm Nov 7, 2017
b87f066
refactor generateInsert
svenklemm Nov 7, 2017
22d8a1e
use timestamp for time column to allow pg10 partitioning
svenklemm Nov 7, 2017
212125f
remove nondeterministic tests
svenklemm Nov 8, 2017
c3874cf
use template for create table query generation
svenklemm Nov 8, 2017
d5f27b4
make TableTemplate configurable
svenklemm Nov 8, 2017
8ee4803
add quoteLiteral helper function
svenklemm Nov 18, 2017
7e3330c
add tests for quoting
svenklemm Nov 18, 2017
ee6fd6b
add TABLELITERAL to template variables
svenklemm Nov 18, 2017
236ddb9
fix template in doc
svenklemm Nov 18, 2017
09f311a
dont add primary key
svenklemm Nov 18, 2017
9dc0fed
allow using jsonb for fields and tags and make it default
svenklemm Nov 26, 2017
8fb25ae
document jsonb settings
svenklemm Nov 28, 2017
a0868dd
document template better
svenklemm Jan 17, 2018
05a9096
rework TagsAsForeignkeys to have produce 1 foreign key in measurement
svenklemm Jan 23, 2018
0fbbe99
make tag table suffix configurable
svenklemm Jan 23, 2018
079bf70
comment out noisy log messages when fk reference is not found
svenklemm Feb 5, 2018
b337ffd
handle missing columns
svenklemm Feb 5, 2018
4d58e93
remove dead code
svenklemm Feb 11, 2018
352a105
postgresql output: boolean columns
saaros Apr 16, 2018
9ddb5f0
postgresql output: batch inserts to the same table/column set
saaros Apr 17, 2018
7e989e0
use timestamptz for time column
svenklemm May 12, 2018
e50c752
adjust test to timestamptz change
svenklemm May 13, 2018
a7cf1c7
fix code formatting (gofmt)
svenklemm Jun 11, 2018
c33e057
include type in error message about unknown type
svenklemm Jul 5, 2018
b20dd45
handle uint64 as datatype
svenklemm Jul 5, 2018
41a5155
fix Printf call
svenklemm Jul 5, 2018
9178024
show all config parameters in readme
svenklemm Jul 17, 2018
b43071e
use CREATE TABLE IF NOT EXISTS
svenklemm Oct 16, 2018
f6fa727
remove commented out code, initialize vars with values
svenklemm Oct 25, 2018
99970bc
fix TABLELITERAL quoting
svenklemm Nov 27, 2018
150a2a7
pg output: Support defining schema for metrics tables
rikonen Nov 25, 2018
4adb096
pg output: Retry writing metrics after adding missing columns
rikonen Nov 29, 2018
3d2d42f
pg output: Don't try closing nil rows
rikonen Nov 29, 2018
bbfd604
adjust test output
svenklemm Nov 29, 2018
a76cbaa
add schema config settting to README
svenklemm Nov 29, 2018
046d52d
Fix adding tags when using tags as foreign key
svenklemm Nov 29, 2018
4c24bc3
Refactor PostgreSQL output plugin code
May 29, 2019
22aff8c
Optimize insert performance of metrics and tag_id lookup
Jul 14, 2019
e559ac5
Attempt reconnect to db if conn is lost and support PG env variables
Jul 18, 2019
44fabb8
error thrown on insufficient permissions
Dec 23, 2019
ac63696
outputs.postgresql: fix tag table creation to ignore template
phemmer Nov 11, 2020
bc10b00
outputs.postgresql: Use a deterministic tag key
phemmer Nov 11, 2020
680ea4b
outputs.postgresql: Sort columns on table creation
phemmer Nov 11, 2020
5966abd
outputs.postgresql: Use standard postgres data types
phemmer Nov 11, 2020
7a9d8d7
outputs.postgresql: use connection pool
phemmer Nov 11, 2020
a4b8a81
outputs.postgresql: add write concurrency support
phemmer Nov 12, 2020
2659ac5
outputs.postgresql: cache table structure
phemmer Nov 12, 2020
adcef5f
outputs.postgresql: refactor table structure management
phemmer Nov 14, 2020
79146a8
outputs.postgresql: drop unsupported fields when schema updates disabled
phemmer Nov 14, 2020
aff0c3e
outputs.postgresql: give more control over table templating
phemmer Nov 15, 2020
a344a3c
outputs.postgresql: Drop metrics with missing tag columns in DB
phemmer Nov 15, 2020
7f464f1
outputs.postgresql: Re-add population of the tag table
phemmer Nov 16, 2020
c6df522
outputs.postgresql: fix column sorting when creating new tables
phemmer Dec 9, 2020
e33f46f
outputs.postgresql: don't retry permanent errors when in single-conne…
phemmer Dec 9, 2020
9f206c5
identify tag columns using PG column comments
phemmer Jan 4, 2021
fec5a95
outputs/postgresql: split templating into its own package
phemmer Jan 4, 2021
9f1d8c1
outputs/postgresql: rename row_source -> table_source
phemmer Jan 4, 2021
05af57e
outputs/postgresql: use copy for loading tags
phemmer Jan 5, 2021
07d5d46
outputs/postgresql: update config documentation
phemmer Jan 5, 2021
c39a091
outputs/postgresql: update go modules
phemmer Jan 5, 2021
5759658
output/postgresql: Use a transaction for sequential writes
phemmer Jan 13, 2021
e87c9f6
outputs/postgresql: slight documentation cleanup
phemmer Jan 13, 2021
b51987f
outputs.postgresql: add tests
phemmer Apr 12, 2021
1a507bc
outputs.postgresql: add template documentation
phemmer Apr 12, 2021
eb03fda
outputs.postgresql: Reduce template type stutter
phemmer Apr 12, 2021
cd49b3e
outputs.postgresql: Add benchmark
phemmer Apr 18, 2021
ceb6d35
outputs.postgresql: add tag_id insert caching
phemmer Apr 19, 2021
5603641
outputs.postgresql: remove dbConnectedHook for clearing caches
phemmer Apr 19, 2021
517f02c
outputs.postgresql: simplify indexing of table columns
phemmer Apr 19, 2021
4fed553
outputs.postgresql: minor clean up tests
phemmer Apr 20, 2021
82c3170
update modules after rebase
phemmer Apr 20, 2021
a7e2b51
outputs.postgresql: fix index error on inconsistent tags
phemmer Jul 12, 2021
46f31cd
address 'go vet' unkeyed field complaints.
phemmer Jul 13, 2021
cd6203d
outputs.postgresql: remove panics on dropTagColumn & dropFieldColumn
phemmer Jul 15, 2021
2bd9699
outputs.postgresql: rename template package to sqltemplate
phemmer Jul 15, 2021
21bdd23
outputs.postgresql: use testutil.MustMetric
phemmer Jul 15, 2021
9a0ed7c
outputs.postgresql: remove postgres test server auto-detection
phemmer Jul 15, 2021
2d553db
outputs.postgresql: use struct for args to test batchGenerator
phemmer Jul 15, 2021
08ed5e7
outputs.postgresql: unexport WriteTagTable
phemmer Jul 15, 2021
93aec6f
outputs/postgresql: address PR comments
phemmer Aug 13, 2021
ff644a9
outputs/postgresql: Refactor initialization to use telegraf convention
phemmer Aug 13, 2021
fa5f937
Merge remote-tracking branch 'origin/master' into postgres
phemmer Aug 13, 2021
04d2764
outputs/postgresql: address linter complaints
phemmer Aug 14, 2021
a1316ae
outputs/postgresql: update licenses
phemmer Aug 16, 2021
55f071b
outputs/postgresql: remove maybeTempError
phemmer Aug 16, 2021
ed55f2f
outputs/postgresql: configure integration testing
phemmer Aug 16, 2021
50c8b39
outputs.postgresql: change default log level to warn
phemmer Aug 27, 2021
3f946a2
outputs.postgresql: fix error handling on sequential writes
phemmer Aug 27, 2021
48315bb
outputs.postgresql: don't try to detect mismatched data types
phemmer Aug 27, 2021
1936779
outputs.postgresql: add support for pguint
phemmer Aug 27, 2021
0038e57
outputs.postgresql: rewrite schema updates to address concurrency issues
phemmer Aug 31, 2021
1bcc5c2
outputs.postgresql: update README & add test to keep updated
phemmer Sep 1, 2021
94f0b74
outputs.postgresql: fix missing .allColumns template values
phemmer Sep 8, 2021
40ef1f9
outputs.postgresql: use timestamp without time zone
phemmer Sep 17, 2021
2d84174
outputs.postgresql: set pg connection application name
phemmer Sep 22, 2021
a00e292
outputs.postgresql: fixes for cache
phemmer Sep 22, 2021
10c0810
outputs.postgresql: add "connection" parameter to sample config
phemmer Oct 11, 2021
96a1b41
outputs.postgresql: better handling of DDL errors
phemmer Oct 13, 2021
04cfa33
outputs.postgresql: fix tests error logging & handling of %w
phemmer Oct 13, 2021
402b4ec
outputs.postgresql: fix column removal
phemmer Oct 26, 2021
02ee794
Merge remote-tracking branch 'origin/master' into postgres
phemmer Oct 26, 2021
0665544
outputs.postgresql: fix tags_as_foreign_keys typo in README
phemmer Feb 1, 2022
ddb5575
Merge remote-tracking branch 'origin/master' into postgres
phemmer Feb 18, 2022
0e511d6
outputs/postgresql: fix go test detection for pguint extension
phemmer Feb 18, 2022
e87744d
Merge remote-tracking branch 'origin/master' into postgres
phemmer Feb 18, 2022
e27281f
outputs/postgresql: update README for linter
phemmer Feb 18, 2022
efaecae
fix: example in readme
jsbrain Feb 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
outputs.postgresql: Reduce template type stutter
  • Loading branch information
phemmer committed Jul 13, 2021
commit eb03fdab0b5e7bc5347846751a31e9517439d836
68 changes: 45 additions & 23 deletions plugins/outputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/jackc/pgconn"
@@ -48,6 +49,7 @@ type Postgresql struct {
tableManager *TableManager

writeChan chan *TableSource
writeWaitGroup *utils.WaitGroup

Logger telegraf.Logger
}
@@ -123,7 +125,9 @@ func (p *Postgresql) Connect() error {
maxConns := int(p.db.Stat().MaxConns())
if maxConns > 1 {
p.writeChan = make(chan *TableSource)
p.writeWaitGroup = utils.NewWaitGroup()
for i := 0; i < maxConns; i++ {
p.writeWaitGroup.Add(1)
go p.writeWorker(p.dbContext)
}
}
@@ -152,6 +156,16 @@ func (p *Postgresql) dbConnectedHook(ctx context.Context, conn *pgx.Conn) error

// Close closes the connection to the database
func (p *Postgresql) Close() error {
if p.writeChan != nil {
// We're using async mode. Gracefully close with timeout.
close(p.writeChan)
select {
case <-p.writeWaitGroup.C():
case <-time.NewTimer(time.Second * 5).C:
}
}

// Die!
p.dbContextCancel()
p.db.Close()
p.tableManager = nil
@@ -259,9 +273,13 @@ func (p *Postgresql) writeConcurrent(tableSources map[string]*TableSource) error
}

func (p *Postgresql) writeWorker(ctx context.Context) {
defer p.writeWaitGroup.Done()
for {
select {
case tableSource := <-p.writeChan:
case tableSource, ok := <-p.writeChan:
if !ok {
return
}
if err := p.writeRetry(ctx, tableSource); err != nil {
p.Logger.Errorf("write error (permanent, dropping sub-batch): %v", err)
}
@@ -301,6 +319,14 @@ func isTempError(err error) bool {
return true
case "57": // Operator Intervention
return true
case "23": // Integrity Constraint Violation
switch pgErr.Code {
case "23505": // unique_violation
if strings.Contains(err.Error(), "pg_type_typname_nsp_index") {
// Happens when you try to create 2 tables simultaneously.
return true
}
}
}
// Assume that any other error that comes from postgres is a permanent error
return false
@@ -319,7 +345,7 @@ func isTempError(err error) bool {
func (p *Postgresql) writeRetry(ctx context.Context, tableSource *TableSource) error {
backoff := time.Duration(0)
for {
err := p.writeMetricsFromMeasureTx(ctx, tableSource)
err := p.writeMetricsFromMeasure(ctx, p.db, tableSource)
if !isTempError(err) {
return err
}
@@ -338,48 +364,45 @@ func (p *Postgresql) writeRetry(ctx context.Context, tableSource *TableSource) e
}
}

func (p *Postgresql) writeMetricsFromMeasureTx(ctx context.Context, tableSource *TableSource) error {
tx, err := p.db.Begin(ctx)
// Writes the metrics from a specified measure. All the provided metrics must belong to the same measurement.
func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, db dbh, tableSource *TableSource) error {
err := p.tableManager.MatchSource(ctx, db, tableSource)
if err != nil {
return err
}
defer tx.Rollback(ctx)

if err := p.writeMetricsFromMeasure(ctx, tx, tableSource); err != nil {
return err
}

return tx.Commit(ctx)
}

// Writes the metrics from a specified measure. All the provided metrics must belong to the same measurement.
func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, db dbh, tableSource *TableSource) error {
err := p.tableManager.MatchSource(ctx, db, tableSource)
tx, err := db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

if p.TagsAsForeignKeys {
if err := p.WriteTagTable(ctx, db, tableSource); err != nil {
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be added from
// future metrics after issue is corrected.
if err := p.WriteTagTable(ctx, tx, tableSource); err != nil {
if p.ForignTagConstraint {
return fmt.Errorf("writing to tag table '%s': %s", tableSource.Name()+p.TagTableSuffix, err)
} else {
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be
// added from future metrics after issue is corrected.
p.Logger.Errorf("writing to tag table '%s': %s", tableSource.Name()+p.TagTableSuffix, err)
}
}
}

fullTableName := utils.FullTableName(p.Schema, tableSource.Name())
_, err = db.CopyFrom(ctx, fullTableName, tableSource.ColumnNames(), tableSource)
return err
if _, err := tx.CopyFrom(ctx, fullTableName, tableSource.ColumnNames(), tableSource); err != nil {
return err
}

tx.Commit(ctx)
return nil
}

func (p *Postgresql) WriteTagTable(ctx context.Context, db dbh, tableSource *TableSource) error {
//TODO cache which tagSets we've already inserted and skip them.
ttsrc := NewTagTableSource(tableSource)

// need a transaction so that if it errors, we don't roll back the parent transaction, just the tags
tx, err := db.Begin(ctx)
if err != nil {
return err
@@ -389,16 +412,15 @@ func (p *Postgresql) WriteTagTable(ctx context.Context, db dbh, tableSource *Tab
ident := pgx.Identifier{ttsrc.postgresql.Schema, ttsrc.Name()}
identTemp := pgx.Identifier{ttsrc.Name() + "_temp"}
sql := fmt.Sprintf("CREATE TEMP TABLE %s (LIKE %s) ON COMMIT DROP", identTemp.Sanitize(), ident.Sanitize())
_, err = tx.Exec(ctx, sql)
if err != nil {
if _, err := tx.Exec(ctx, sql); err != nil {
return fmt.Errorf("creating tags temp table: %w", err)
}

if _, err := tx.CopyFrom(ctx, identTemp, ttsrc.ColumnNames(), ttsrc); err != nil {
return fmt.Errorf("copying into tags temp table: %w", err)
}

if _, err := tx.Exec(ctx, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())); err != nil {
if _, err := tx.Exec(ctx, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ORDER BY tag_id ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())); err != nil {
return fmt.Errorf("inserting into tags table: %w", err)
}

43 changes: 21 additions & 22 deletions plugins/outputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"reflect"
"strings"
"sync"
"testing"
@@ -42,13 +41,13 @@ func (l Log) String() string {
type LogAccumulator struct {
logs []Log
cond *sync.Cond
t *testing.T
tb testing.TB
}

func NewLogAccumulator(t *testing.T) *LogAccumulator {
func NewLogAccumulator(tb testing.TB) *LogAccumulator {
return &LogAccumulator{
cond: sync.NewCond(&sync.Mutex{}),
t: t,
tb: tb,
}
}

@@ -57,8 +56,8 @@ func (la *LogAccumulator) append(level pgx.LogLevel, format string, args []inter
log := Log{level, format, args}
la.logs = append(la.logs, log)
s := log.String()
la.t.Helper()
la.t.Log(s)
la.tb.Helper()
la.tb.Log(s)
la.cond.Broadcast()
la.cond.L.Unlock()
}
@@ -177,42 +176,42 @@ func (la *LogAccumulator) Logs() []Log {
}

func (la *LogAccumulator) Errorf(format string, args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelError, format, args)
}

func (la *LogAccumulator) Error(args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelError, "%v", args)
}

func (la *LogAccumulator) Debugf(format string, args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelDebug, format, args)
}

func (la *LogAccumulator) Debug(args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelDebug, "%v", args)
}

func (la *LogAccumulator) Warnf(format string, args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelWarn, format, args)
}

func (la *LogAccumulator) Warn(args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelWarn, "%v", args)
}

func (la *LogAccumulator) Infof(format string, args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelInfo, format, args)
}

func (la *LogAccumulator) Info(args ...interface{}) {
la.t.Helper()
la.tb.Helper()
la.append(pgx.LogLevelInfo, "%v", args)
}

@@ -252,9 +251,9 @@ type PostgresqlTest struct {
Logger *LogAccumulator
}

func newPostgresqlTest(t *testing.T) *PostgresqlTest {
func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
p := newPostgresql()
logger := NewLogAccumulator(t)
logger := NewLogAccumulator(tb)
p.Logger = logger
pt := &PostgresqlTest{Postgresql: *p}
pt.Logger = logger
@@ -280,19 +279,19 @@ func TestDBConnectedHook(t *testing.T) {
p := newPostgresqlTest(t)
require.NoError(t, p.Connect())

_, err := p.db.Exec(ctx, "SELECT 1")
require.NoError(t, err)
tmTables := p.tableManager.Tables
metrics := []telegraf.Metric{
newMetric(t, "", MSS{}, MSI{"v": 1}),
}
require.NoError(t, p.Write(metrics))

c, _ := p.db.Acquire(ctx)
c.Conn().Close(ctx)
c.Release()

_, err = p.db.Exec(ctx, "SELECT 1")
_, err := p.db.Exec(ctx, "SELECT 1")
require.NoError(t, err)
tmTables2 := p.tableManager.Tables

assert.NotEqual(t, reflect.ValueOf(tmTables).Pointer(), reflect.ValueOf(tmTables2).Pointer())
assert.Empty(t, p.tableManager.table(t.Name()).Columns())
}

func newMetric(
Loading