diff --git a/EXTENSION_VERSION b/EXTENSION_VERSION index 1e5c905f50..318b82ecbb 100644 --- a/EXTENSION_VERSION +++ b/EXTENSION_VERSION @@ -1 +1 @@ -downsampling_new_config +feature_metric_rollup \ No newline at end of file diff --git a/pkg/dataset/deepcopy_generated.go b/pkg/dataset/deepcopy_generated.go index 8fde2e35a9..c9041f6857 100644 --- a/pkg/dataset/deepcopy_generated.go +++ b/pkg/dataset/deepcopy_generated.go @@ -34,15 +34,11 @@ func (in *Config) DeepCopy() *Config { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Metrics) DeepCopyInto(out *Metrics) { *out = *in - out.ChunkInterval = in.ChunkInterval if in.Compression != nil { in, out := &in.Compression, &out.Compression *out = new(bool) **out = **in } - out.HALeaseRefresh = in.HALeaseRefresh - out.HALeaseTimeout = in.HALeaseTimeout - out.RetentionPeriod = in.RetentionPeriod if in.Downsampling != nil { in, out := &in.Downsampling, &out.Downsampling *out = new([]downsample.Config) @@ -68,7 +64,6 @@ func (in *Metrics) DeepCopy() *Metrics { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Traces) DeepCopyInto(out *Traces) { *out = *in - out.RetentionPeriod = in.RetentionPeriod return } diff --git a/pkg/downsample/downsample.go b/pkg/downsample/downsample.go index 3ea640e72e..a481e971e9 100644 --- a/pkg/downsample/downsample.go +++ b/pkg/downsample/downsample.go @@ -8,10 +8,11 @@ import ( "context" "encoding/json" "fmt" + "time" + "github.com/jackc/pgx/v4" "github.com/timescale/promscale/pkg/internal/day" - "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/util" ) @@ -28,7 +29,7 @@ type Config struct { } func (c Config) Name() string { - return downsamplePrefix + day.String(c.Interval) + return downsamplePrefix + c.Interval.String() } func SetState(ctx context.Context, conn *pgx.Conn, state bool) error { @@ -52,24 +53,35 @@ func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error { return fmt.Errorf("error getting lock for syncing downsampling config") } defer pgLock.Close() - got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time. + + try := func() (bool, error) { + got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time. + if err != nil { + return false, fmt.Errorf("error trying pg advisory_lock") + } + return got, nil + } + + got, err := try() if err != nil { - return fmt.Errorf("error trying pg advisory_lock") + return err } if !got { - // Some other Promscale instance is already working on the downsampling.Sync() - // Hence, we should skip. - return nil - } - defer func() { - if _, err = pgLock.Unlock(); err != nil { - log.Error("msg", "error unlocking downsampling.Sync advisory_lock", "err", err.Error()) + // Wait for sometime and try again. If we still did not get the lock, throw an error. + time.Sleep(time.Second * 5) + got, err = try() + if err != nil { + return err + } + if !got { + return fmt.Errorf("timeout: unable to take the advisory lock for syncing downsampling state") } - }() + } + var applyCfgs []cfgWithName for i := range cfgs { c := cfgs[i] - applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: day.String(c.Interval), Retention: day.String(c.Retention)}) + applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: c.Interval.String(), Retention: c.Retention.String()}) } if len(applyCfgs) > 0 { str, err := json.Marshal(applyCfgs) diff --git a/pkg/internal/day/duration.go b/pkg/internal/day/duration.go index 0611d18939..88d043da29 100644 --- a/pkg/internal/day/duration.go +++ b/pkg/internal/day/duration.go @@ -16,6 +16,7 @@ import ( const ( dayUnit = 'd' unknownUnitDErrorPrefix = `time: unknown unit "d"` + day = int64(time.Hour * 24) ) // Duration acts like a time.Duration with support for "d" unit @@ -69,7 +70,29 @@ func handleDays(s []byte) (time.Duration, error) { // String returns a string value of DayDuration. func (d Duration) String() string { - return time.Duration(d).String() + remainder := int64(d) + days := remainder / day + remainder = remainder % day + hours := remainder / int64(time.Hour) + remainder = remainder % int64(time.Hour) + mins := remainder / int64(time.Minute) + remainder = remainder % int64(time.Minute) + secs := remainder / int64(time.Second) + + display := "" + if days != 0 { + display = fmt.Sprintf("%dd", days) + } + if hours != 0 { + display = fmt.Sprintf("%s%dh", display, hours) + } + if mins != 0 { + display = fmt.Sprintf("%s%dm", display, mins) + } + if secs != 0 { + display = fmt.Sprintf("%s%ds", display, secs) + } + return display } // StringToDayDurationHookFunc returns a mapstructure.DecodeHookFunc that @@ -96,32 +119,3 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc { return d, nil } } - -// String returns the output in form of days:hours:mins:secs -func String(d Duration) string { - const day = int64(time.Hour * 24) - - remainder := int64(d) - days := remainder / day - remainder = remainder % day - hours := remainder / int64(time.Hour) - remainder = remainder % int64(time.Hour) - mins := remainder / int64(time.Minute) - remainder = remainder % int64(time.Minute) - secs := remainder / int64(time.Second) - - display := "" - if days != 0 { - display = fmt.Sprintf("%dd", days) - } - if hours != 0 { - display = fmt.Sprintf("%s%dh", display, hours) - } - if mins != 0 { - display = fmt.Sprintf("%s%dm", display, mins) - } - if secs != 0 { - display = fmt.Sprintf("%s%ds", display, secs) - } - return display -} diff --git a/pkg/internal/day/duration_test.go b/pkg/internal/day/duration_test.go index b307fc39c1..c19b95602b 100644 --- a/pkg/internal/day/duration_test.go +++ b/pkg/internal/day/duration_test.go @@ -45,7 +45,7 @@ func TestString(t *testing.T) { t.Run(tc.in, func(t *testing.T) { var d Duration require.NoError(t, d.UnmarshalText([]byte(tc.in))) - require.Equal(t, tc.out, String(d)) + require.Equal(t, tc.out, d.String()) }) } } diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index 2de9dfdc02..b8e5e49819 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -18,5 +18,5 @@ func init() { PromscaleExtensionVersion = strings.TrimSpace(string(content)) PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" - PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made. + //PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made. } diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 56883b6d29..e82204f69c 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -43,7 +43,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }{ { name: "Query non-existant column, empty result", - query: `cagg{__column__="nonexistant"}`, + query: `cagg{__schema__="cagg_schema",__column__="nonexistant"}`, startMs: startTime, endMs: endTime, stepMs: 360 * 1000, @@ -78,7 +78,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query max column", - query: `cagg{__column__="max",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="max",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, @@ -104,7 +104,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query min column", - query: `cagg{__column__="min",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="min",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, @@ -130,7 +130,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query avg column", - query: `cagg{__column__="avg",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="avg",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, diff --git a/pkg/tests/end_to_end_tests/telemetry_test.go b/pkg/tests/end_to_end_tests/telemetry_test.go index f4c531b01a..141a88d28e 100644 --- a/pkg/tests/end_to_end_tests/telemetry_test.go +++ b/pkg/tests/end_to_end_tests/telemetry_test.go @@ -288,8 +288,8 @@ func TestTelemetrySQLStats(t *testing.T) { require.NoError(t, engine.Sync()) err = conn.QueryRow(context.Background(), "SELECT value FROM _timescaledb_catalog.metadata WHERE key = 'promscale_metrics_total' AND value IS NOT NULL").Scan(&metrics) - require.NoError(t, err) - require.Equal(t, "0", metrics) // Without promscale_extension, this will give error saying "no rows in result set". + require.NoError(t, err) // Without promscale_extension, this will give error saying "no rows in result set". + require.Equal(t, "0", metrics) // Add dummy metric. _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.create_metric_table('test_metric')")