Skip to content

Commit

Permalink
outputs.postgresql: cache table structure
Browse files Browse the repository at this point in the history
Previous code was querying the table structure prior to every insert. We should instead cache the table structure, and just assume the admin isn't going to do something silly like dropping columns or deleting the table while we're using it.
  • Loading branch information
phemmer committed Jul 13, 2021
1 parent a4b8a81 commit 2659ac5
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 187 deletions.
87 changes: 38 additions & 49 deletions plugins/outputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Postgresql struct {
dbContext context.Context
dbContextCancel func()
db *pgxpool.Pool
tables tables.Manager
tagTables tables.Manager
tables *tables.TableManager
tagTables *tables.TableManager

writeChan chan []telegraf.Metric

Expand Down Expand Up @@ -181,7 +181,10 @@ func (p *Postgresql) writeSequential(metricsByMeasurement map[string][]telegraf.
for _, metrics := range metricsByMeasurement {
err := p.writeMetricsFromMeasure(p.dbContext, metrics)
if err != nil {
log.Printf("copy error: %v", err)
if !isTempError(err) {
log.Printf("write error (permanent): %v", err)
}
//TODO use a transaction so that we don't end up with a partial write, and end up retrying metrics we've already written
return err
}
}
Expand All @@ -199,35 +202,12 @@ func (p *Postgresql) writeConcurrent(metricsByMeasurement map[string][]telegraf.
return nil
}

var backoffInit = time.Millisecond * 250
var backoffMax = time.Second * 15

func (p *Postgresql) writeWorker(ctx context.Context) {
for {
select {
case metrics := <-p.writeChan:
backoff := time.Duration(0)
for {
err := p.writeMetricsFromMeasure(ctx, metrics)
if err == nil {
break
}

if !isTempError(err) {
log.Printf("write error (permanent): %v", err)
break
}
log.Printf("write error (retry in %s): %v", backoff, err)
time.Sleep(backoff)

if backoff == 0 {
backoff = backoffInit
} else {
backoff *= 2
if backoff > backoffMax {
backoff = backoffMax
}
}
if err := p.writeRetry(ctx, metrics); err != nil {
log.Printf("write error (permanent): %v", err)
}
case <-p.dbContext.Done():
return
Expand All @@ -239,18 +219,46 @@ func isTempError(err error) bool {
return false
}

var backoffInit = time.Millisecond * 250
var backoffMax = time.Second * 15

func (p *Postgresql) writeRetry(ctx context.Context, metrics []telegraf.Metric) error {
backoff := time.Duration(0)
for {
err := p.writeMetricsFromMeasure(ctx, metrics)
if err == nil {
return nil
}

if !isTempError(err) {
return err
}
log.Printf("write error (retry in %s): %v", backoff, err)
time.Sleep(backoff)

if backoff == 0 {
backoff = backoffInit
} else {
backoff *= 2
if backoff > backoffMax {
backoff = backoffMax
}
}
}
}

// Writes the metrics from a specified measure. All the provided metrics must belong to the same measurement.
func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, metrics []telegraf.Metric) error {
targetColumns, targetTagColumns := p.columns.Target(metrics)
measureName := metrics[0].Name()

if p.DoSchemaUpdates {
if err := p.prepareTable(ctx, p.tables, measureName, targetColumns); err != nil {
if err := p.tables.EnsureStructure(ctx, measureName, targetColumns); err != nil {
return err
}
if p.TagsAsForeignkeys {
tagTableName := measureName + p.TagTableSuffix
if err := p.prepareTable(ctx, p.tagTables, tagTableName, targetTagColumns); err != nil {
if err := p.tagTables.EnsureStructure(ctx, tagTableName, targetTagColumns); err != nil {
return err
}
}
Expand All @@ -270,22 +278,3 @@ func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, metrics []tele
_, err := p.db.CopyFrom(ctx, fullTableName, targetColumns.Names, pgx.CopyFromRows(values))
return err
}

// Checks if a table exists in the db, and then validates if all the required columns
// are present or some are missing (if metrics changed their field or tag sets).
func (p *Postgresql) prepareTable(ctx context.Context, tableManager tables.Manager, tableName string, details *utils.TargetColumns) error {
tableExists := tableManager.Exists(ctx, tableName)

if !tableExists {
return tableManager.CreateTable(ctx, tableName, details)
}

missingColumns, err := tableManager.FindColumnMismatch(ctx, tableName, details)
if err != nil {
return err
}
if len(missingColumns) == 0 {
return nil
}
return tableManager.AddColumnsToTable(ctx, tableName, missingColumns, details)
}
6 changes: 3 additions & 3 deletions plugins/outputs/postgresql/tables/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m *mockDb) IsAlive() bool { return true }

func TestNewManager(t *testing.T) {
db := &mockDb{}
res := NewManager(db, "schema", "table template").(*defTableManager)
res := NewManager(db, "schema", "table template").(*TableManager)
assert.Equal(t, "table template", res.tableTemplate)
assert.Equal(t, "schema", res.schema)
assert.Equal(t, db, res.db)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestExists(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
manager := &defTableManager{
manager := &TableManager{
Tables: tc.cache,
db: tc.db,
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestCreateTable(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
manager := &defTableManager{
manager := &TableManager{
Tables: map[string]bool{},
db: tc.db,
tableTemplate: tc.template,
Expand Down
Loading

0 comments on commit 2659ac5

Please sign in to comment.