From a412910d37a16b10159bb9f41e0747fc04782cee Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 6 Sep 2022 17:57:25 -0700 Subject: [PATCH] [logsearch] Fix vacuuming to tables in sync - Ensure that child tables of each parent table corresponding to the same time period are deleted together. - Ensure that child tables corresponding the current time period are never deleted due to disk space. - Print a log when disk space cannot be recovered (i.e. when current tables are too large) indicating to increase available storage capacity. --- logsearchapi/server/partitions.go | 155 +++++++++++++++++-------- logsearchapi/server/partitions_test.go | 19 +++ 2 files changed, 124 insertions(+), 50 deletions(-) diff --git a/logsearchapi/server/partitions.go b/logsearchapi/server/partitions.go index b21c06e4316..40293001fca 100644 --- a/logsearchapi/server/partitions.go +++ b/logsearchapi/server/partitions.go @@ -117,6 +117,32 @@ func (p *partitionTimeRange) next() partitionTimeRange { return newPartitionTimeRange(p.EndDate) } +func getPartitionTimeRangeForTable(name string) (partitionTimeRange, error) { + fmtStr := []rune("2006_01_02") + runes := []rune(name) + + errFn := func(msg string) error { + title := "invalid partition name" + s := ": " + msg + if msg == "" { + s = "" + } + return fmt.Errorf("%s%s", title, s) + } + + if len(runes) <= len(fmtStr) { + return partitionTimeRange{}, errFn("too short") + } + + // Split out the date part of the table name + partSuffix := string(runes[len(runes)-len(fmtStr):]) + startTime, err := time.Parse(string(fmtStr), partSuffix) + if err != nil { + return partitionTimeRange{}, errFn("bad time value: " + partSuffix) + } + return newPartitionTimeRange(startTime), nil +} + type childTableInfo struct { ParentSchema string Parent string @@ -158,33 +184,19 @@ func (c *DBClient) getExistingPartitions(ctx context.Context, t Table) (tableNam return tableNames, nil } -func (c *DBClient) getTablesDiskUsage(ctx context.Context) (m map[Table]map[string]uint64, _ error) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) +func (c *DBClient) getTableDiskUsage(ctx context.Context, tableName string) (int64, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() const ( tableSize QTemplate = `SELECT pg_total_relation_size('%s');` ) - m = make(map[Table]map[string]uint64, len(allTables)) - for _, table := range allTables { - parts, err := c.getExistingPartitions(ctx, table) - if err != nil { - return nil, err - } - cm := make(map[string]uint64, len(parts)) - for _, tableName := range parts { - q := tableSize.build(tableName) - row := c.QueryRowContext(ctx, q) - var size uint64 - if err := row.Scan(&size); err != nil { - return nil, fmt.Errorf("Unable to query relation size: %v", err) - } - cm[tableName] = size - } - m[table] = cm - } - return m, nil + q := tableSize.build(tableName) + row := c.QueryRowContext(ctx, q) + var size int64 + err := row.Scan(&size) + return size, err } func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) error { @@ -197,15 +209,6 @@ func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) e return nil } -func totalDiskUsage(m map[Table]map[string]uint64) (sum uint64) { - for _, cm := range m { - for _, v := range cm { - sum += v - } - } - return -} - func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) { const ( highWaterMarkPercent = 90 @@ -215,19 +218,28 @@ func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) { } func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGBs int) (err error) { - tables := make(map[Table][]string) + tables := make(map[Table][]string, len(allTables)) + du := make(map[Table]map[string]int64, len(allTables)) + var totalUsage int64 for _, table := range allTables { tables[table], err = c.getExistingPartitions(ctx, table) if err != nil { return err } - } - du, err := c.getTablesDiskUsage(ctx) - if err != nil { - return err + + m := make(map[string]int64, len(tables[table])) + for _, partition := range tables[table] { + size, err := c.getTableDiskUsage(ctx, partition) + if err != nil { + return err + } + m[partition] = size + totalUsage += size + } + du[table] = m + } - totalUsage := totalDiskUsage(du) diskCap := uint64(diskCapacityGBs) * 1024 * 1024 * 1024 hi, lo := calculateHiLoWaterMarks(diskCap) @@ -237,22 +249,66 @@ func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGB // Delete oldest child tables in each parent table, until usage is below // `lo`. - var index int + // + // NOTE: Existing partitions for each parent table may not be in sync wrt + // the time periods they correspond to, due to previous errors in deleting + // from the db. So we keep track of the indices of the child tables for each + // parent table to ensure we only delete the oldest tables. + indices := make([]int, len(allTables)) for float64(totalUsage) >= lo { - var recoveredSpace uint64 - for _, table := range allTables { - if index >= len(tables[table]) { - break + var recoveredSpace int64 + + // Find the minStartTime of the first existing partition of each of the + // parent tables. + currentPartStartTime := newPartitionTimeRange(time.Now()).StartDate + var minStartTime time.Time + isSet := false + for i, table := range allTables { + pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]]) + if err != nil { + return err + } + if !isSet { + minStartTime = pt.StartDate + } + if minStartTime.After(pt.StartDate) { + minStartTime = pt.StartDate + } + } + + // Quit without deleting the current partition even if we are over the + // highwater mark! + if minStartTime.Equal(currentPartStartTime) { + var candidateTables []string + for i, table := range allTables { + candidateTables = append(candidateTables, tables[table][indices[i]]) } - tableName := tables[table][index] - err = c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached") + + log.Printf("WARNING: highwater mark reached: no non-current tables exist to delete!"+ + " Please increase the value of "+DiskCapacityEnv+" and ensure disk capacity for PostgreSQL!"+ + " Candidate tables are: %v", candidateTables) + break + } + + // Delete all child tables with the same StartTime = minStartTime + for i, table := range allTables { + pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]]) if err != nil { return err } - recoveredSpace += du[table][tableName] + + if pt.StartDate.Equal(minStartTime) { + tableName := tables[table][indices[i]] + err := c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached") + if err != nil { + return err + } + indices[i] += 1 + recoveredSpace += du[table][tableName] + } } + totalUsage -= recoveredSpace - index++ } log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024)) return nil @@ -260,23 +316,22 @@ func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGB // vacuumData should be called in a new go routine. func (c *DBClient) vacuumData(ctx context.Context, diskCapacityGBs int) { - var ( - normalInterval = 1 * time.Hour - retryInterval = 2 * time.Minute - ) + normalInterval := 1 * time.Hour + retryInterval := 2 * time.Minute timer := time.NewTimer(normalInterval) defer timer.Stop() for { select { case <-timer.C: - timer.Reset(retryInterval) // timer fired, reset it right here. err := c.maintainLowWatermarkUsage(ctx, diskCapacityGBs) if err != nil { log.Printf("Error maintaining high-water mark disk usage: %v (retrying in %s)", err, retryInterval) + timer.Reset(retryInterval) continue } + timer.Reset(normalInterval) case <-ctx.Done(): log.Println("Vacuum thread exiting.") diff --git a/logsearchapi/server/partitions_test.go b/logsearchapi/server/partitions_test.go index cc6006f1189..b1893102aed 100644 --- a/logsearchapi/server/partitions_test.go +++ b/logsearchapi/server/partitions_test.go @@ -131,3 +131,22 @@ func TestPartitionTimeRangeNextPrev(t *testing.T) { } } } + +func TestPartitionNameParsing(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + for i := 0; i < 1000; i++ { + r := randomTime() + p1 := newPartitionTimeRange(r) + + name := "table_" + p1.getPartnameSuffix() + + res, err := getPartitionTimeRangeForTable(name) + if err != nil { + t.Errorf("Test %d: r=%v unexpected err: %v", i, r, err) + } + if !res.isSame(&p1) { + t.Errorf("Test %d: r=%v, expected: %v got %v", i, r, p1.String(), res.String()) + } + } +}