Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Support for apply_downsample_config()
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Jan 11, 2023
1 parent 4c4ca47 commit e8c4cdd
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 210 deletions.
40 changes: 20 additions & 20 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
log.Info("msg", "Metric downsampling configurations synced", "configuration", fmt.Sprint(*c.Metrics.Downsampling))
}

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod.Duration()))
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout))
log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod))
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod))

queries := map[string]interface{}{
setDefaultMetricChunkIntervalSQL: c.Metrics.ChunkInterval.Duration(),
setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval),
setDefaultMetricCompressionSQL: c.Metrics.Compression,
setDefaultMetricHAReleaseRefreshSQL: c.Metrics.HALeaseRefresh.Duration(),
setDefaultMetricHAReleaseTimeoutSQL: c.Metrics.HALeaseTimeout.Duration(),
setDefaultMetricRetentionPeriodSQL: c.Metrics.RetentionPeriod.Duration(),
setDefaultTraceRetentionPeriodSQL: c.Traces.RetentionPeriod.Duration(),
setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh),
setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout),
setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod),
setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod),
}

for sql, param := range queries {
Expand All @@ -111,22 +111,22 @@ func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
}

func (c *Config) applyDefaults() {
if c.Metrics.ChunkInterval.Duration() <= 0 {
c.Metrics.ChunkInterval.SetDuration(defaultMetricChunkInterval)
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval)
}
if c.Metrics.Compression == nil {
c.Metrics.Compression = &defaultMetricCompressionVar
}
if c.Metrics.HALeaseRefresh.Duration() <= 0 {
c.Metrics.HALeaseRefresh.SetDuration(defaultMetricHALeaseRefresh)
if c.Metrics.HALeaseRefresh <= 0 {
c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh)
}
if c.Metrics.HALeaseTimeout.Duration() <= 0 {
c.Metrics.HALeaseTimeout.SetDuration(defaultMetricHALeaseTimeout)
if c.Metrics.HALeaseTimeout <= 0 {
c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout)
}
if c.Metrics.RetentionPeriod.Duration() <= 0 {
c.Metrics.RetentionPeriod.SetDuration(defaultMetricRetentionPeriod)
if c.Metrics.RetentionPeriod <= 0 {
c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod)
}
if c.Traces.RetentionPeriod.Duration() <= 0 {
c.Traces.RetentionPeriod.SetDuration(defaultTraceRetentionPeriod)
if c.Traces.RetentionPeriod <= 0 {
c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod)
}
}
36 changes: 16 additions & 20 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNewConfig(t *testing.T) {
default_retention_period: 3d2h`,
cfg: Config{
Metrics: Metrics{
RetentionPeriod: dayDuration(((3*24)+2)*time.Hour, "3d2h"),
RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour),
},
},
},
Expand All @@ -65,14 +65,14 @@ traces:
default_retention_period: 15d`,
cfg: Config{
Metrics: Metrics{
ChunkInterval: dayDuration(3*time.Hour, "3h"),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: dayDuration(2*time.Minute, "2m"),
HALeaseTimeout: dayDuration(5*time.Second, "5s"),
RetentionPeriod: dayDuration(30*24*time.Hour, "30d"),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: dayDuration(15*24*time.Hour, "15d"),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
},
},
Expand Down Expand Up @@ -101,29 +101,29 @@ func TestApplyDefaults(t *testing.T) {
t,
Config{
Metrics: Metrics{
ChunkInterval: dayDuration(defaultMetricChunkInterval, ""),
ChunkInterval: day.Duration(defaultMetricChunkInterval),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: dayDuration(defaultMetricHALeaseRefresh, ""),
HALeaseTimeout: dayDuration(defaultMetricHALeaseTimeout, ""),
RetentionPeriod: dayDuration(defaultMetricRetentionPeriod, ""),
HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh),
HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout),
RetentionPeriod: day.Duration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: dayDuration(defaultTraceRetentionPeriod, ""),
RetentionPeriod: day.Duration(defaultTraceRetentionPeriod),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: dayDuration(3*time.Hour, "3h"),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: dayDuration(2*time.Minute, "2m"),
HALeaseTimeout: dayDuration(5*time.Second, "5s"),
RetentionPeriod: dayDuration(30*24*time.Hour, "30d"),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: dayDuration(15*24*time.Hour, "15d"),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
}

Expand All @@ -132,7 +132,3 @@ func TestApplyDefaults(t *testing.T) {

require.Equal(t, untouched, copyConfig)
}

func dayDuration(d time.Duration, txt string) day.Duration {
return day.Duration{T: d, Txt: txt}
}
139 changes: 28 additions & 111 deletions pkg/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ package downsample

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/jackc/pgx/v4"

"github.com/timescale/promscale/pkg/internal/day"
Expand All @@ -17,36 +16,36 @@ import (
)

const (
setDownsamplingStateSQL = "SELECT prom_api.set_downsampling_state($1)"
createOrUpdateDownsamplingSQL = "CALL _prom_catalog.create_or_update_downsampling($1, $2, $3)"
updateDownsamplingStateForSQL = "SELECT _prom_catalog.update_downsampling_state($1, $2)"
downsamplePrefix = "ds_" // Stands of downsample_
lockID = 55851985173278 // Choosen randomly
setGlobalDownsamplingStateSQL = "SELECT prom_api.set_global_downsampling_state($1)"
applyDownsampleConfigSQL = "SELECT _prom_catalog.apply_downsample_config($1::jsonb)"
downsamplePrefix = "ds_" // Stands for downsample_
lockID = 55985173312278 // Choosen randomly
)

type Config struct {
Interval day.Duration `yaml:"interval"`
Retention day.Duration `yaml:"retention"`
shouldRefresh bool
Interval day.Duration `yaml:"interval"`
Retention day.Duration `yaml:"retention"`
}

func (c Config) Name() string {
return downsamplePrefix + c.Interval.Text()
return downsamplePrefix + day.String(c.Interval)
}

func SetState(ctx context.Context, conn *pgx.Conn, state bool) error {
_, err := conn.Exec(ctx, setDownsamplingStateSQL, state)
_, err := conn.Exec(ctx, setGlobalDownsamplingStateSQL, state)
if err != nil {
return fmt.Errorf("error setting downsampling state: %w", err)
}
return nil
}

// Sync updates the downsampling cfgs in the DB in accordance with the given new cfgs. It:
// 1. Creates of new downsampling cfgs that are not in the database
// 2. Updates retention duration of downsampling cfgs that are present in the database but with a different retention duration
// 3. Enables refreshing of downsampling cfgs in the database that are in the new cfgs but were previously disabled
// 4. Disables refreshing of downsampling cfgs in the database that were not found in the new cfgs
type cfgWithName struct {
Name string `json:"schema_name"`
Interval string `json:"ds_interval"`
Retention string `json:"retention"`
}

// Sync the given downsampling configurations with the database.
func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error {
pgLock, err := util.NewPgAdvisoryLock(lockID, conn.Config().ConnString())
if err != nil {
Expand All @@ -67,102 +66,20 @@ func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error {
log.Error("msg", "error unlocking downsampling.Sync advisory_lock", "err", err.Error())
}
}()

newCfgs := make(map[string]Config) // These are the new downsampling cfgs that the user provided. Relation => schema_name: definition{}
for _, c := range cfgs {
newCfgs[c.Name()] = Config{Interval: c.Interval, Retention: c.Retention}
}

existingCfgs, err := getExistingCfgs(ctx, conn)
if err != nil {
return fmt.Errorf("error fetching existing downsampling cfgs: %w", err)
}

createOrUpdate := make(map[string]Config)
for newLabel, newCfg := range newCfgs {
if existingCfg, found := existingCfgs[newLabel]; found {
if !existingCfg.shouldRefresh || existingCfg.Retention.Duration() != newCfg.Retention.Duration() {
createOrUpdate[newLabel] = newCfg
}
if existingCfg.Interval.Duration() != newCfg.Interval.Duration() {
// This should never be the case since newlabel is schema specific. But we still check for safety purpose.
return fmt.Errorf("interval mismatch: existing interval %v, new interval %v", existingCfg.Interval.Duration(), newCfg.Interval.Duration())
}
} else {
createOrUpdate[newLabel] = newCfg
var applyCfgs []cfgWithName
for i := range cfgs {
c := cfgs[i]
applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: day.String(c.Interval), Retention: day.String(c.Retention)})
}
if len(applyCfgs) > 0 {
str, err := json.Marshal(applyCfgs)
if err != nil {
return fmt.Errorf("error marshalling configs: %w", err)
}
}

if len(createOrUpdate) > 0 {
if err = createOrUpdateDownsampling(ctx, conn, createOrUpdate); err != nil {
return fmt.Errorf("error creating or updating given downsampling configs: %w", err)
fmt.Println("str", string(str))
if _, err = conn.Exec(ctx, applyDownsampleConfigSQL, str); err != nil {
return fmt.Errorf("error applying downsample config: %w", err)
}
}

if err = disable(ctx, conn, newCfgs, existingCfgs); err != nil {
return fmt.Errorf("error disabling downsampling configs: %w", err)
}
return nil
}

func getExistingCfgs(ctx context.Context, conn *pgx.Conn) (map[string]Config, error) {
rows, err := conn.Query(ctx, "SELECT schema_name, resolution, retention, should_refresh FROM _prom_catalog.downsample")
if err != nil {
return nil, fmt.Errorf("querying existing resolutions: %w", err)
}
defer rows.Close()

existingCfgs := make(map[string]Config) // These are the existing downsampling cfgs in the database.
for rows.Next() {
var (
schemaName string
shouldRefresh bool
interval, retention time.Duration
)
if err := rows.Scan(&schemaName, &interval, &retention, &shouldRefresh); err != nil {
return nil, fmt.Errorf("error scanning output rows for existing resolutions: %w", err)
}
existingCfgs[schemaName] = Config{Interval: day.Duration{T: interval}, Retention: day.Duration{T: retention}, shouldRefresh: shouldRefresh}
}
return existingCfgs, nil
}

// createOrUpdateDownsampling does 3 things:
// 1. It creates new downsampling configurations that are given in 'cfgs'
// 2. It updates the retention of a downsampling configuration if it is present in the database with the same lName
// 3. It enables a downsampling if it was previously disabled
// Refer to _prom_catalog.create_or_update_downsampling($1, $2, $3) to learn more.
func createOrUpdateDownsampling(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error {
var batch pgx.Batch
for lName, def := range cfgs {
batch.Queue(createOrUpdateDownsamplingSQL, lName, def.Interval.Duration(), def.Retention.Duration())
}
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error closing batch: %w", err)
}
return nil
}

// disable downsampling cfgs.
func disable(ctx context.Context, conn *pgx.Conn, newCfgs, existingCfgs map[string]Config) error {
disable := []string{}
for existingName := range existingCfgs {
if _, found := newCfgs[existingName]; !found {
disable = append(disable, existingName)
}
}
if len(disable) == 0 {
return nil
}

var batch pgx.Batch
for _, n := range disable {
batch.Queue(updateDownsamplingStateForSQL, n, false)
}
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error closing batch: %w", err)
}
return nil
}
Loading

0 comments on commit e8c4cdd

Please sign in to comment.