Skip to content

Commit

Permalink
ttl: only gc in leader to save performance
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Feb 10, 2025
1 parent d7a8b67 commit b4b17dd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -22885,7 +22885,7 @@
"targets": [
{
"exemplar": true,
"expr": "avg(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)",
"expr": "max(tidb_server_ttl_watermark_delay{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", type=\"schedule\"}) by (type, name)",
"interval": "",
"legendFormat": "{{ name }}",
"queryType": "randomWalk",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,8 @@ func UpdateDelayMetrics(records map[int64]*DelayMetricsRecord) {
metrics.TTLWatermarkDelay.With(prometheus.Labels{metrics.LblType: "schedule", metrics.LblName: delay}).Set(v)
}
}

// ClearDelayMetrics clears the metrics of TTL delay
func ClearDelayMetrics() {
metrics.TTLWatermarkDelay.Reset()
}
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ go_test(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//mock",
Expand Down
13 changes: 13 additions & 0 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,15 @@ func (m *JobManager) reportMetrics(se session.Session) {
metrics.RunningJobsCnt.Set(runningJobs)
metrics.CancellingJobsCnt.Set(cancellingJobs)

if !m.isLeader() {
// only the leader can do collect delay metrics to reduce the performance overhead
metrics.ClearDelayMetrics()
return
}

if time.Since(m.lastReportDelayMetricsTime) > 10*time.Minute {
m.lastReportDelayMetricsTime = time.Now()
logutil.Logger(m.ctx).Info("TTL leader to collect delay metrics")
records, err := GetDelayMetricRecords(m.ctx, se, time.Now())
if err != nil {
logutil.Logger(m.ctx).Info("failed to get TTL delay metrics", zap.Error(err))
Expand Down Expand Up @@ -1093,6 +1100,12 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {

// DoGC deletes some old TTL job histories and redundant scan tasks
func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time) {
if !m.isLeader() {
// only the leader can do the GC to reduce the performance impact
return
}

logutil.Logger(m.ctx).Info("TTL leader to DoGC")
// Remove the table not exist in info schema cache.
// Delete the table status before deleting the tasks. Therefore the related tasks
if err := m.updateInfoSchemaCache(se); err == nil {
Expand Down
58 changes: 54 additions & 4 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
metrics2 "github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/statistics"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/skip"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand Down Expand Up @@ -797,10 +799,15 @@ func TestGCScanTasks(t *testing.T) {
addScanTaskRecord(3, 2, 1)
addScanTaskRecord(3, 2, 2)

isLeader := false
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
return isLeader
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
// only leader can do GC
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select count(1) from mysql.tidb_ttl_task").Check(testkit.Rows("6"))
isLeader = true
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2"))
}
Expand All @@ -816,10 +823,15 @@ func TestGCTableStatus(t *testing.T) {
// insert table status without corresponding table
tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", 2024, 2024)

isLeader := false
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
return isLeader
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
// only leader can do GC
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select count(1) from mysql.tidb_ttl_table_status").Check(testkit.Rows("1"))
isLeader = true
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil)

Expand Down Expand Up @@ -877,11 +889,16 @@ func TestGCTTLHistory(t *testing.T) {
addHistory(6, 91)
addHistory(7, 100)

isLeader := false
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
return isLeader
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
m.DoGC(context.TODO(), se, se.Now())
// only leader can go GC
tk.MustQuery("select count(1) from mysql.tidb_ttl_job_history").Check(testkit.Rows("7"))
isLeader = true
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

Expand Down Expand Up @@ -1047,6 +1064,37 @@ func TestDelayMetrics(t *testing.T) {
checkRecord(records, "t3", now.Add(-3*time.Hour))
checkRecord(records, "t4", now.Add(-3*time.Hour))
checkRecord(records, "t5", emptyTime)

metrics.ClearDelayMetrics()
getMetricCnt := func() int {
ch := make(chan prometheus.Metric)
go func() {
metrics2.TTLWatermarkDelay.Collect(ch)
close(ch)
}()

cnt := 0
for range ch {
cnt++
}
return cnt
}

isLeader := false
m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, func() bool {
return isLeader
})
// If the manager is not leader, the metrics will be empty.
m.ReportMetrics(se)
require.Zero(t, getMetricCnt())
// leader will collect metrics
isLeader = true
m.ReportMetrics(se)
require.Equal(t, len(metrics.WaterMarkScheduleDelayNames), getMetricCnt())
// when back to non-leader, the metrics will be empty.
isLeader = false
m.ReportMetrics(se)
require.Zero(t, getMetricCnt())
}

type poolTestWrapper struct {
Expand Down Expand Up @@ -1503,7 +1551,9 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {

ctx := context.Background()
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, func() bool {
return true
})

now := se.Now()

Expand Down

0 comments on commit b4b17dd

Please sign in to comment.