Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Adds support for Prometheus metric rollups #1799

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EXTENSION_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
master
feature_metric_rollup
47 changes: 34 additions & 13 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.
// +k8s:deepcopy-gen=package

package dataset

import (
Expand All @@ -10,8 +11,11 @@ import (
"time"

"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/log"
"gopkg.in/yaml.v2"

"github.com/timescale/promscale/pkg/downsample"
"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
)

const (
Expand Down Expand Up @@ -42,16 +46,17 @@ type Config struct {

// Metrics contains dataset configuration options for metrics data.
type Metrics struct {
ChunkInterval DayDuration `mapstructure:"default_chunk_interval" yaml:"default_chunk_interval"`
Compression *bool `mapstructure:"compress_data" yaml:"compress_data"` // Using pointer to check if the the value was set.
HALeaseRefresh DayDuration `mapstructure:"ha_lease_refresh" yaml:"ha_lease_refresh"`
HALeaseTimeout DayDuration `mapstructure:"ha_lease_timeout" yaml:"ha_lease_timeout"`
RetentionPeriod DayDuration `mapstructure:"default_retention_period" yaml:"default_retention_period"`
ChunkInterval day.Duration `mapstructure:"default_chunk_interval" yaml:"default_chunk_interval"`
Compression *bool `mapstructure:"compress_data" yaml:"compress_data"` // Using pointer to check if the the value was set.
HALeaseRefresh day.Duration `mapstructure:"ha_lease_refresh" yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `mapstructure:"ha_lease_timeout" yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `mapstructure:"default_retention_period" yaml:"default_retention_period"`
Downsampling *[]downsample.Config `mapstructure:"downsampling" yaml:"downsampling,omitempty"`
}

// Traces contains dataset configuration options for traces data.
type Traces struct {
RetentionPeriod DayDuration `mapstructure:"default_retention_period" yaml:"default_retention_period"`
RetentionPeriod day.Duration `mapstructure:"default_retention_period" yaml:"default_retention_period"`
}

// NewConfig creates a new dataset config based on the configuration YAML contents.
Expand All @@ -61,9 +66,25 @@ func NewConfig(contents string) (cfg Config, err error) {
}

// Apply applies the configuration to the database via the supplied DB connection.
func (c *Config) Apply(conn *pgx.Conn) error {
func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
c.applyDefaults()

if c.Metrics.Downsampling == nil {
if err := downsample.SetState(ctx, conn, false); err != nil {
return fmt.Errorf("error setting state for automatic-downsampling: %w", err)
}
log.Info("msg", "Metric downsampling is disabled")
} else {
if err := downsample.SetState(ctx, conn, true); err != nil {
return fmt.Errorf("error setting state for automatic-downsampling: %w", err)
}
log.Info("msg", "Metric downsampling is enabled")
if err := downsample.Sync(ctx, conn, *c.Metrics.Downsampling); err != nil {
return fmt.Errorf("error syncing downsampling configurations: %w", err)
}
log.Info("msg", "Metric downsampling configurations synced", "configuration", fmt.Sprint(*c.Metrics.Downsampling))
}

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
Expand Down Expand Up @@ -91,21 +112,21 @@ func (c *Config) Apply(conn *pgx.Conn) error {

func (c *Config) applyDefaults() {
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval)
c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval)
}
if c.Metrics.Compression == nil {
c.Metrics.Compression = &defaultMetricCompressionVar
}
if c.Metrics.HALeaseRefresh <= 0 {
c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh)
c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh)
}
if c.Metrics.HALeaseTimeout <= 0 {
c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout)
c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout)
}
if c.Metrics.RetentionPeriod <= 0 {
c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod)
c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod)
}
if c.Traces.RetentionPeriod <= 0 {
c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod)
c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod)
}
}
33 changes: 17 additions & 16 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/internal/day"
)

var testCompressionSetting = true
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestNewConfig(t *testing.T) {
default_retention_period: 3d2h`,
cfg: Config{
Metrics: Metrics{
RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour),
RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour),
},
},
},
Expand All @@ -64,14 +65,14 @@ traces:
default_retention_period: 15d`,
cfg: Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
},
},
Expand Down Expand Up @@ -100,29 +101,29 @@ func TestApplyDefaults(t *testing.T) {
t,
Config{
Metrics: Metrics{
ChunkInterval: DayDuration(defaultMetricChunkInterval),
ChunkInterval: day.Duration(defaultMetricChunkInterval),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh),
HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout),
RetentionPeriod: DayDuration(defaultMetricRetentionPeriod),
HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh),
HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout),
RetentionPeriod: day.Duration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: DayDuration(defaultTraceRetentionPeriod),
RetentionPeriod: day.Duration(defaultTraceRetentionPeriod),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/dataset/deepcopy_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions pkg/downsample/downsample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package downsample

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/jackc/pgx/v4"
"github.com/pkg/errors"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/util"
)

const (
setGlobalDownsamplingStateSQL = "SELECT prom_api.set_global_downsampling_state($1)"
applyDownsampleConfigSQL = "SELECT _prom_catalog.apply_downsample_config($1::jsonb)"
downsamplePrefix = "ds_" // Stands for downsample_
lockID = 55985173312278 // Choosen randomly
)

type Config struct {
Interval day.Duration `yaml:"interval"`
Retention day.Duration `yaml:"retention"`
}

func (c Config) Name() string {
return downsamplePrefix + c.Interval.String()
}

func SetState(ctx context.Context, conn *pgx.Conn, state bool) error {
_, err := conn.Exec(ctx, setGlobalDownsamplingStateSQL, state)
return errors.WithMessage(err, "error setting downsampling state")
}

type cfgWithName struct {
Name string `json:"schema_name"`
Interval string `json:"ds_interval"`
Retention string `json:"retention"`
}

// Sync the given downsampling configurations with the database.
func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error {
pgLock, err := util.NewPgAdvisoryLock(lockID, conn.Config().ConnString())
if err != nil {
return fmt.Errorf("error getting lock for syncing downsampling config")
}
defer pgLock.Close()

try := func() (bool, error) {
got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time.
return got, errors.WithMessage(err, "error trying pg advisory_lock")
}

got, err := try()
if err != nil {
return err
}
if !got {
// Wait for sometime and try again. If we still did not get the lock, throw an error.
time.Sleep(time.Second * 5)
got, err = try()
if err != nil {
return err
}
if !got {
return fmt.Errorf("timeout: unable to take the advisory lock for syncing downsampling state")
}
}

var applyCfgs []cfgWithName
for i := range cfgs {
c := cfgs[i]
applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: c.Interval.String(), Retention: c.Retention.String()})
}
if len(applyCfgs) > 0 {
str, err := json.Marshal(applyCfgs)
if err != nil {
return fmt.Errorf("error marshalling configs: %w", err)
}
if _, err = conn.Exec(ctx, applyDownsampleConfigSQL, str); err != nil {
return fmt.Errorf("error applying downsample config: %w", err)
}
}
return nil
}
42 changes: 33 additions & 9 deletions pkg/dataset/duration.go → pkg/internal/day/duration.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.
package dataset

package day

import (
"fmt"
Expand All @@ -15,15 +16,16 @@ import (
const (
dayUnit = 'd'
unknownUnitDErrorPrefix = `time: unknown unit "d"`
day = int64(time.Hour * 24)
)

// DayDuration acts like a time.Duration with support for "d" unit
// Duration acts like a time.Duration with support for "d" unit
// which is used for specifying number of days in duration.
type DayDuration time.Duration
type Duration time.Duration

// UnmarshalText unmarshals strings into DayDuration values while
// handling the day unit. It leans heavily into time.ParseDuration.
func (d *DayDuration) UnmarshalText(s []byte) error {
func (d *Duration) UnmarshalText(s []byte) error {
val, err := time.ParseDuration(string(s))
if err != nil {
// Check for specific error indicating we are using days unit.
Expand All @@ -36,7 +38,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error {
return err
}
}
*d = DayDuration(val)
*d = Duration(val)
return nil
}

Expand Down Expand Up @@ -67,8 +69,30 @@ func handleDays(s []byte) (time.Duration, error) {
}

// String returns a string value of DayDuration.
func (d DayDuration) String() string {
return time.Duration(d).String()
func (d Duration) String() string {
remainder := int64(d)
days := remainder / day
remainder = remainder % day
hours := remainder / int64(time.Hour)
remainder = remainder % int64(time.Hour)
mins := remainder / int64(time.Minute)
remainder = remainder % int64(time.Minute)
secs := remainder / int64(time.Second)

display := ""
if days != 0 {
display = fmt.Sprintf("%dd", days)
}
if hours != 0 {
display = fmt.Sprintf("%s%dh", display, hours)
}
if mins != 0 {
display = fmt.Sprintf("%s%dm", display, mins)
}
if secs != 0 {
display = fmt.Sprintf("%s%ds", display, secs)
}
return display
}

// StringToDayDurationHookFunc returns a mapstructure.DecodeHookFunc that
Expand All @@ -82,7 +106,7 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc {
return data, nil
}

var d DayDuration
var d Duration

if t != reflect.TypeOf(d) {
return data, nil
Expand All @@ -92,6 +116,6 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc {
if err != nil {
return nil, err
}
return DayDuration(d), nil
return d, nil
}
}
Loading