Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Adds support to query default downsampling views in absence of __colu…
Browse files Browse the repository at this point in the history
…mn__.

Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Jan 11, 2023
1 parent 4c23c32 commit 4c4ca47
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 183 deletions.
10 changes: 5 additions & 5 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
log.Info("msg", "Metric downsampling configurations synced", "configuration", fmt.Sprint(*c.Metrics.Downsampling))
}

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout))
log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod))
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout.Duration()))
log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod.Duration()))
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod.Duration()))

queries := map[string]interface{}{
setDefaultMetricChunkIntervalSQL: c.Metrics.ChunkInterval.Duration(),
Expand Down
36 changes: 20 additions & 16 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNewConfig(t *testing.T) {
default_retention_period: 3d2h`,
cfg: Config{
Metrics: Metrics{
RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour),
RetentionPeriod: dayDuration(((3*24)+2)*time.Hour, "3d2h"),
},
},
},
Expand All @@ -65,14 +65,14 @@ traces:
default_retention_period: 15d`,
cfg: Config{
Metrics: Metrics{
ChunkInterval: day.Duration(3 * time.Hour),
ChunkInterval: dayDuration(3*time.Hour, "3h"),
Compression: &testCompressionSetting,
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
HALeaseRefresh: dayDuration(2*time.Minute, "2m"),
HALeaseTimeout: dayDuration(5*time.Second, "5s"),
RetentionPeriod: dayDuration(30*24*time.Hour, "30d"),
},
Traces: Traces{
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
RetentionPeriod: dayDuration(15*24*time.Hour, "15d"),
},
},
},
Expand Down Expand Up @@ -101,29 +101,29 @@ func TestApplyDefaults(t *testing.T) {
t,
Config{
Metrics: Metrics{
ChunkInterval: day.Duration(defaultMetricChunkInterval),
ChunkInterval: dayDuration(defaultMetricChunkInterval, ""),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh),
HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout),
RetentionPeriod: day.Duration(defaultMetricRetentionPeriod),
HALeaseRefresh: dayDuration(defaultMetricHALeaseRefresh, ""),
HALeaseTimeout: dayDuration(defaultMetricHALeaseTimeout, ""),
RetentionPeriod: dayDuration(defaultMetricRetentionPeriod, ""),
},
Traces: Traces{
RetentionPeriod: day.Duration(defaultTraceRetentionPeriod),
RetentionPeriod: dayDuration(defaultTraceRetentionPeriod, ""),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: day.Duration(3 * time.Hour),
ChunkInterval: dayDuration(3*time.Hour, "3h"),
Compression: &testCompressionSetting,
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
HALeaseRefresh: dayDuration(2*time.Minute, "2m"),
HALeaseTimeout: dayDuration(5*time.Second, "5s"),
RetentionPeriod: dayDuration(30*24*time.Hour, "30d"),
},
Traces: Traces{
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
RetentionPeriod: dayDuration(15*24*time.Hour, "15d"),
},
}

Expand All @@ -132,3 +132,7 @@ func TestApplyDefaults(t *testing.T) {

require.Equal(t, untouched, copyConfig)
}

func dayDuration(d time.Duration, txt string) day.Duration {
return day.Duration{T: d, Txt: txt}
}
182 changes: 88 additions & 94 deletions pkg/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ import (
"github.com/jackc/pgx/v4"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/util"
)

const (
setDownsamplingStateSQL = "SELECT prom_api.set_downsampling_state($1)"
createDownsamplingSQL = "CALL _prom_catalog.create_downsampling($1, $2, $3)"
createOrUpdateDownsamplingSQL = "CALL _prom_catalog.create_or_update_downsampling($1, $2, $3)"
updateDownsamplingStateForSQL = "SELECT _prom_catalog.update_downsampling_state($1, $2)"
downsamplePrefix = "ds_" // Stands of downsample_
downsamplePrefix = "ds_" // Stands of downsample_
lockID = 55851985173278 // Choosen randomly
)

type Config struct {
Interval day.Duration `yaml:"interval"`
Retention day.Duration `yaml:"retention"`
disabled bool
Interval day.Duration `yaml:"interval"`
Retention day.Duration `yaml:"retention"`
shouldRefresh bool
}

func (c Config) Name() string {
Expand All @@ -42,116 +45,97 @@ func SetState(ctx context.Context, conn *pgx.Conn, state bool) error {
// Sync updates the downsampling cfgs in the DB in accordance with the given new cfgs. It:
// 1. Creates of new downsampling cfgs that are not in the database
// 2. Updates retention duration of downsampling cfgs that are present in the database but with a different retention duration
// 3. Disables refreshing of downsampling cfgs in the database that were not found in the new cfgs
// 4. Enables refreshing of downsampling cfgs in the database that are in the new cfgs but were previously disabled
// 3. Enables refreshing of downsampling cfgs in the database that are in the new cfgs but were previously disabled
// 4. Disables refreshing of downsampling cfgs in the database that were not found in the new cfgs
func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error {
newCfgs := make(map[string]Config) // These are the new downsampling cfgs that the user provided. Relation => schema_name: definition{}
for _, c := range cfgs {
newCfgs[c.Name()] = Config{Interval: c.Interval, Retention: c.Retention}
}

rows, err := conn.Query(ctx, "SELECT schema_name, resolution, retention, should_refresh FROM _prom_catalog.downsample")
pgLock, err := util.NewPgAdvisoryLock(lockID, conn.Config().ConnString())
if err != nil {
return fmt.Errorf("querying existing resolutions: %w", err)
return fmt.Errorf("error getting lock for syncing downsampling config")
}
defer rows.Close()

existingCfgs := make(map[string]Config) // These are the existing downsampling cfgs in the database.
for rows.Next() {
var (
schemaName string
shouldRefresh bool
interval, retention time.Duration
)
if err := rows.Scan(&schemaName, &interval, &retention, &shouldRefresh); err != nil {
return fmt.Errorf("error scanning output rows for existing resolutions: %w", err)
}
existingCfgs[schemaName] = Config{Interval: day.Duration{T: interval}, Retention: day.Duration{T: retention}, disabled: !shouldRefresh}
defer pgLock.Close()
got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time.
if err != nil {
return fmt.Errorf("error trying pg advisory_lock")
}

// Update cfgs that have a different retention duration than the new cfgs.
update := make(map[string]Config)
for name, newDef := range newCfgs {
existingDef, found := existingCfgs[name]
if found && newDef.Retention.Duration() != existingDef.Retention.Duration() {
update[name] = newDef
}
if !got {
// Some other Promscale instance is already working on the downsampling.Sync()
// Hence, we should skip.
return nil
}
if len(update) > 0 {
if err = updateRetention(ctx, conn, update); err != nil {
return fmt.Errorf("updating retention of downsampling cfg: %w", err)
defer func() {
if _, err = pgLock.Unlock(); err != nil {
log.Error("msg", "error unlocking downsampling.Sync advisory_lock", "err", err.Error())
}
}
}()

// Enable downsampling cfgs that were previously disabled.
disabled := []string{}
if err := conn.QueryRow(ctx, "SELECT array_agg(schema_name) FROM _prom_catalog.downsample WHERE NOT should_refresh").Scan(&disabled); err != nil {
return fmt.Errorf("error fetching downsampling configs that are disabled: %w", err)
}
disabledDownsampleConfig := map[string]struct{}{}
for _, n := range disabled {
disabledDownsampleConfig[n] = struct{}{}
}
enable := []string{}
for name := range newCfgs {
if _, found := disabledDownsampleConfig[name]; found {
enable = append(enable, name)
}
}
if len(enable) > 0 {
if err = updateState(ctx, conn, enable, true); err != nil {
return fmt.Errorf("error enabling downsampling cfgs: %w", err)
}
newCfgs := make(map[string]Config) // These are the new downsampling cfgs that the user provided. Relation => schema_name: definition{}
for _, c := range cfgs {
newCfgs[c.Name()] = Config{Interval: c.Interval, Retention: c.Retention}
}

// Disable downsampling cfgs that are applied in the database but are not present in the new downsampling cfgs.
disable := []string{}
for existingName := range existingCfgs {
if _, found := newCfgs[existingName]; !found {
disable = append(disable, existingName)
}
}
if len(disable) > 0 {
if err = updateState(ctx, conn, disable, false); err != nil {
return fmt.Errorf("error disabling downsampling cfgs: %w", err)
existingCfgs, err := getExistingCfgs(ctx, conn)
if err != nil {
return fmt.Errorf("error fetching existing downsampling cfgs: %w", err)
}

createOrUpdate := make(map[string]Config)
for newLabel, newCfg := range newCfgs {
if existingCfg, found := existingCfgs[newLabel]; found {
if !existingCfg.shouldRefresh || existingCfg.Retention.Duration() != newCfg.Retention.Duration() {
createOrUpdate[newLabel] = newCfg
}
if existingCfg.Interval.Duration() != newCfg.Interval.Duration() {
// This should never be the case since newlabel is schema specific. But we still check for safety purpose.
return fmt.Errorf("interval mismatch: existing interval %v, new interval %v", existingCfg.Interval.Duration(), newCfg.Interval.Duration())
}
} else {
createOrUpdate[newLabel] = newCfg
}
}

// Create new downsampling cfgs that are not present in the database.
createDownsamplingCfgs := make(map[string]Config)
for newName, newDef := range newCfgs {
if _, found := existingCfgs[newName]; !found {
createDownsamplingCfgs[newName] = newDef
}
}
if len(createDownsamplingCfgs) > 0 {
if err = create(ctx, conn, createDownsamplingCfgs); err != nil {
return fmt.Errorf("creating new downsampling configurations: %w", err)
if len(createOrUpdate) > 0 {
if err = createOrUpdateDownsampling(ctx, conn, createOrUpdate); err != nil {
return fmt.Errorf("error creating or updating given downsampling configs: %w", err)
}
}

if err = disable(ctx, conn, newCfgs, existingCfgs); err != nil {
return fmt.Errorf("error disabling downsampling configs: %w", err)
}
return nil
}

// updateRetention of existing downsampled cfgs.
func updateRetention(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error {
var batch pgx.Batch
for schemaName, def := range cfgs {
batch.Queue("UPDATE _prom_catalog.downsample SET retention = $1 WHERE schema_name = $2", def.Retention.Duration(), schemaName)
func getExistingCfgs(ctx context.Context, conn *pgx.Conn) (map[string]Config, error) {
rows, err := conn.Query(ctx, "SELECT schema_name, resolution, retention, should_refresh FROM _prom_catalog.downsample")
if err != nil {
return nil, fmt.Errorf("querying existing resolutions: %w", err)
}
if batch.Len() > 0 {
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error closing batch: %w", err)
defer rows.Close()

existingCfgs := make(map[string]Config) // These are the existing downsampling cfgs in the database.
for rows.Next() {
var (
schemaName string
shouldRefresh bool
interval, retention time.Duration
)
if err := rows.Scan(&schemaName, &interval, &retention, &shouldRefresh); err != nil {
return nil, fmt.Errorf("error scanning output rows for existing resolutions: %w", err)
}
existingCfgs[schemaName] = Config{Interval: day.Duration{T: interval}, Retention: day.Duration{T: retention}, shouldRefresh: shouldRefresh}
}
return nil
return existingCfgs, nil
}

func create(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error {
// createOrUpdateDownsampling does 3 things:
// 1. It creates new downsampling configurations that are given in 'cfgs'
// 2. It updates the retention of a downsampling configuration if it is present in the database with the same lName
// 3. It enables a downsampling if it was previously disabled
// Refer to _prom_catalog.create_or_update_downsampling($1, $2, $3) to learn more.
func createOrUpdateDownsampling(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error {
var batch pgx.Batch
for lName, def := range cfgs {
batch.Queue(createDownsamplingSQL, lName, def.Interval.Duration(), def.Retention.Duration())
batch.Queue(createOrUpdateDownsamplingSQL, lName, def.Interval.Duration(), def.Retention.Duration())
}
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
Expand All @@ -160,11 +144,21 @@ func create(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error {
return nil
}

// updateState enables or disables a particular downsampling cfg based on new state.
func updateState(ctx context.Context, conn *pgx.Conn, name []string, newState bool) error {
// disable downsampling cfgs.
func disable(ctx context.Context, conn *pgx.Conn, newCfgs, existingCfgs map[string]Config) error {
disable := []string{}
for existingName := range existingCfgs {
if _, found := newCfgs[existingName]; !found {
disable = append(disable, existingName)
}
}
if len(disable) == 0 {
return nil
}

var batch pgx.Batch
for _, n := range name {
batch.Queue(updateDownsamplingStateForSQL, n, newState)
for _, n := range disable {
batch.Queue(updateDownsamplingStateForSQL, n, false)
}
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/day/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
// This can be useful when we need to know the num of days user wanted, since
// this information is lost after parsing.
type Duration struct {
text string // Holds the original duration text.
T time.Duration
Txt string // Holds the original duration text.
T time.Duration
}

// UnmarshalText unmarshals strings into DayDuration values while
Expand All @@ -44,7 +44,7 @@ func (d *Duration) UnmarshalText(s []byte) error {
}
}
d.T = val
d.text = string(s)
d.Txt = string(s)
return nil
}

Expand Down Expand Up @@ -81,7 +81,7 @@ func (d *Duration) String() string {

// Text returns the original text received while parsing.
func (d *Duration) Text() string {
return d.text
return d.Txt
}

// Duration returns the parsed duration.
Expand Down
25 changes: 19 additions & 6 deletions pkg/pgmodel/querier/clauses.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ func setParameterNumbers(clause string, existingArgs []interface{}, newArgs ...i
}

type clauseBuilder struct {
schemaName string
metricName string
columnName string
contradiction bool
clauses []string
args []interface{}
schemaName string
metricName string
columnName string
contradiction bool
downsamplingView bool
clauses []string
args []interface{}
}

func (c *clauseBuilder) SetMetricName(name string) {
Expand Down Expand Up @@ -92,6 +93,18 @@ func (c *clauseBuilder) GetColumnName() string {
return c.columnName
}

// UseDefaultDownsamplingView is set to true when the user applies __schema__ only (and not __column__). In this, we
// query from q_<metric_name> views since it contains the 'value' column that the connector's SQL query needs.
// Raw downsampled data does not contain a 'value' column, hence we create these default downsampling views in the database
// for querying.
func (c *clauseBuilder) UseDefaultDownsamplingView(b bool) {
c.downsamplingView = b
}

func (c *clauseBuilder) DefaultDownsamplingView() bool {
return c.downsamplingView
}

func (c *clauseBuilder) addClause(clause string, args ...interface{}) error {
if len(args) > 0 {
switch args[0] {
Expand Down
Loading

0 comments on commit 4c4ca47

Please sign in to comment.