Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Throttle vacuum workers based on chunk age
Browse files Browse the repository at this point in the history
Throttle vacuum workers based on chunk age. Figure out how many workers
to used based on the oldest transaction id age of the chunks in our
batch. The age should be between vacuumFreezeMinAge and
autovacuumFreezeMaxAge. The closer the maxChunkAge is to the
autovacuumFreezeMaxAge, the more workers we assign. If maxChunkAge is
== vacuumFreezeMinAge, then we just use 1 worker.

If the autovacuum engine beats us to a chunk, skip that chunk.
  • Loading branch information
jgpruitt committed Nov 20, 2022
1 parent 6b9a20b commit bbbe179
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 87 deletions.
4 changes: 2 additions & 2 deletions pkg/runner/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestParseFlags(t *testing.T) {
result: func(c Config) Config {
c.VacuumCfg.Disable = true
c.VacuumCfg.RunFrequency = 10 * time.Minute
c.VacuumCfg.Parallelism = 4
c.VacuumCfg.MaxParallelism = 4
return c
},
},
Expand All @@ -192,7 +192,7 @@ func TestParseFlags(t *testing.T) {
shouldError: false,
result: func(c Config) Config {
c.VacuumCfg.RunFrequency = 30 * time.Minute
c.VacuumCfg.Parallelism = 5
c.VacuumCfg.MaxParallelism = 5
return c
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func Run(cfg *Config) error {
)

if !cfg.VacuumCfg.Disable {
ve := vacuum.NewEngine(client.MaintenanceConnection(), cfg.VacuumCfg.RunFrequency, cfg.VacuumCfg.Parallelism)
ve := vacuum.NewEngine(client.MaintenanceConnection(), cfg.VacuumCfg.RunFrequency, cfg.VacuumCfg.MaxParallelism)
group.Add(
func() error {
log.Info("msg", "Starting vacuum engine")
Expand Down
39 changes: 29 additions & 10 deletions pkg/tests/end_to_end_tests/vacuum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import (
"github.com/timescale/promscale/pkg/vacuum"
)

const (
sqlChunksToFreeze = `
SELECT %s
FROM _ps_catalog.chunks_to_freeze
WHERE coalesce(last_vacuum, '-infinity'::timestamptz) < now() - interval '15 minutes'
AND pg_catalog.age(relfrozenxid) > (SELECT setting::bigint FROM pg_settings WHERE name = 'vacuum_freeze_min_age')
`
)

type uncompressedChunk struct {
id int
schema string
Expand All @@ -34,18 +43,18 @@ func TestVacuumNonBlocking(t *testing.T) {
}
testVacuum(t, func(e *vacuum.Engine, db *pgxpool.Pool) {
go e.Start()
// wait until view returns 0 or 20 second elapse which ever comes first
// wait until view returns 0 or a timeout elapses, whichever comes first
wait(db)
e.Stop()
})
}

func wait(db *pgxpool.Pool) {
ctx := context.Background()
for i := 0; i < 20; i++ {
for i := 0; i < 60; i++ {
time.Sleep(time.Second)
var nbr int64
err := db.QueryRow(ctx, "select count(*) from _ps_catalog.chunks_to_freeze").Scan(&nbr)
err := db.QueryRow(ctx, fmt.Sprintf(sqlChunksToFreeze, "count(*)")).Scan(&nbr)
if err != nil {
continue
}
Expand All @@ -66,23 +75,33 @@ func testVacuum(t *testing.T, do func(e *vacuum.Engine, db *pgxpool.Pool)) {
var ctx = context.Background()
databaseName := fmt.Sprintf("%s_vacuum", *testDatabase)
withDB(t, databaseName, func(db *pgxpool.Pool, tb testing.TB) {
setVacuumFreezeMinAge(t, ctx, db)
createMetric(t, ctx, db, "metric1")
createMetric(t, ctx, db, "metric2")
loadSamples(t, ctx, db, "metric1")
loadSamples(t, ctx, db, "metric2")
chunksExist(t, ctx, db)
viewEmpty(t, ctx, db)
nbr := countChunksToFreeze(t, ctx, db)
require.Equal(t, int64(0), nbr, "Expected view to return zero before compressing chunks results but got %d", nbr)
const count = 5
uncompressed := chooseUncompressedChunks(t, ctx, db, count)
compressChunks(t, ctx, db, uncompressed)
compressed := view(t, ctx, db)
require.Equal(t, count, len(compressed), "Expected to find %d compressed chunks but got %d", count, len(compressed))
require.Equal(t, count, len(compressed), "Expected view to return %d compressed chunks before running engine but got %d", count, len(compressed))
engine := vacuum.NewEngine(pgxconn.NewPgxConn(db), time.Second, count)
do(engine, db)
viewEmpty(t, ctx, db)
nbr = countChunksToFreeze(t, ctx, db)
require.Equal(t, int64(0), nbr, "Expected view to return zero after running engine results but got %d", nbr)
})
}

func setVacuumFreezeMinAge(t *testing.T, ctx context.Context, db *pgxpool.Pool) {
_, err := db.Exec(ctx, "set vacuum_freeze_min_age to 0")
if err != nil {
t.Fatalf("Failed to set vacuum_freeze_min_age to 0: %v", err)
}
}

func createMetric(t *testing.T, ctx context.Context, db *pgxpool.Pool, name string) {
_, err := db.Exec(ctx, fmt.Sprintf("create table %s(id int, t timestamptz not null, val double precision)", name))
if err != nil {
Expand Down Expand Up @@ -126,13 +145,13 @@ func chunksExist(t *testing.T, ctx context.Context, db *pgxpool.Pool) {
require.GreaterOrEqual(t, nbr, int64(10), "Expected a bunch of chunks but got %d", nbr)
}

func viewEmpty(t *testing.T, ctx context.Context, db *pgxpool.Pool) {
func countChunksToFreeze(t *testing.T, ctx context.Context, db *pgxpool.Pool) int64 {
var nbr int64
err := db.QueryRow(ctx, "select count(*) from _ps_catalog.chunks_to_freeze").Scan(&nbr)
err := db.QueryRow(ctx, fmt.Sprintf(sqlChunksToFreeze, "count(*)")).Scan(&nbr)
if err != nil {
t.Fatalf("Failed to count chunks: %v", err)
}
require.Equal(t, int64(0), nbr, "Expected view to return zero results but got %d", nbr)
return nbr
}

func chooseUncompressedChunks(t *testing.T, ctx context.Context, db *pgxpool.Pool, count int) []uncompressedChunk {
Expand Down Expand Up @@ -171,7 +190,7 @@ func compressChunks(t *testing.T, ctx context.Context, db *pgxpool.Pool, chunks
}

func view(t *testing.T, ctx context.Context, db *pgxpool.Pool) []compressedChunk {
rows, err := db.Query(ctx, "select id from _ps_catalog.chunks_to_freeze")
rows, err := db.Query(ctx, fmt.Sprintf(sqlChunksToFreeze, "id"))
if err != nil {
t.Fatalf("Failed to select from _ps_catalog.chunks_to_freeze: %v", err)
}
Expand Down
Loading

0 comments on commit bbbe179

Please sign in to comment.