diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index 5358c4a6172c6..58a4493429869 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -87,6 +87,7 @@ go_test( "//pkg/statistics", "//pkg/store/mockstore", "//pkg/testkit", + "//pkg/testkit/testfailpoint", "//pkg/timer/api", "//pkg/timer/tablestore", "//pkg/ttl/cache", diff --git a/pkg/ttl/ttlworker/job.go b/pkg/ttl/ttlworker/job.go index 44b154bdcf8e5..c3acfd4d4685b 100644 --- a/pkg/ttl/ttlworker/job.go +++ b/pkg/ttl/ttlworker/job.go @@ -20,11 +20,10 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/ttl/session" "github.com/pingcap/tidb/pkg/util/intest" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" ) const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status @@ -122,8 +121,9 @@ type ttlJob struct { } // finish turns current job into last job, and update the error message and statistics summary -func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) { +func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) error { intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) + // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context err := se.RunInTxn(context.TODO(), func() error { @@ -145,10 +145,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary return errors.Wrapf(err, "execute sql: %s", sql) } - return nil - }, session.TxnModeOptimistic) + failpoint.InjectCall("ttl-finish", &err) + return err + }, session.TxnModePessimistic) - if err != nil { - logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) - } + return err } diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index 81b4f3a52e625..b589f34ca1123 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -558,8 +558,12 @@ j: if err != nil { logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) } + err = job.finish(se, se.Now(), summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue + } m.removeJob(job) - job.finish(se, se.Now(), summary) } cancel() } @@ -591,10 +595,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { summary, err := summarizeErr(errors.New(cancelReason)) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) } } return @@ -611,10 +619,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped")) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) } jobTables := m.readyForLockHBTimeoutJobTables(now) @@ -882,11 +894,14 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, no logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) summary, err := summarizeErr(errors.New("job is timeout")) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) - continue } intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String()) diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 685f5afc4be45..e6ea854648bd7 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -34,8 +34,10 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" dbsession "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" timerapi "github.com/pingcap/tidb/pkg/timer/api" timertable "github.com/pingcap/tidb/pkg/timer/tablestore" "github.com/pingcap/tidb/pkg/ttl/cache" @@ -1304,3 +1306,151 @@ func TestManagerJobAdapterNow(t *testing.T) { require.Equal(t, "Europe/Berlin", now.Location().String()) require.InDelta(t, now.Unix(), localNow.Unix(), 10) } + +func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) { + // Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time + // the `task_manager` may update the owner of the task, which may cause a write conflict. + // This test is to simulate this scenario. + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + testTimes := 1000 + now := se.Now() + for i := 0; i < testTimes; i++ { + now = now.Add(time.Hour * 48) + + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + se := sessionFactory() + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + se := sessionFactory() + + m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + }() + + go func() { + defer wg.Done() + se := sessionFactory() + + job.Finish(se, se.Now(), &ttlworker.TTLSummary{}) + }() + + wg.Wait() + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + } +} + +func TestFinishError(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + errCount := 5 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) { + errCount -= 1 + if errCount > 0 { + *err = errors.New("mock error") + } + }) + + now := se.Now() + + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + initializeTest := func() { + errCount = 5 + now = now.Add(time.Hour * 48) + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + require.NoError(t, err) + task.SetResult(nil) + err = m.TaskManager().ReportTaskFinished(se, now, task) + require.NoError(t, err) + tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished")) + } + + // Test the `CheckFinishedJob` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + + // Test the `rescheduleJobs` can tolerate the `job.finish` error + // cancel job branch + initializeTest() + variable.EnableTTLJob.Store(false) + t.Cleanup(func() { + variable.EnableTTLJob.Store(true) + }) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + variable.EnableTTLJob.Store(true) + // remove table branch + initializeTest() + tk.MustExec("drop table t") + require.NoError(t, m.InfoSchemaCache().Update(se)) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + require.NoError(t, m.InfoSchemaCache().Update(se)) + testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + // Teset the `updateHeartBeat` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + // timeout is 6h + now = now.Add(time.Hour * 8) + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) +} diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index f509d26e3dbb1..c62b1ba1689e5 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -203,8 +203,13 @@ func (m *JobManager) ReportMetrics(se session.Session) { m.reportMetrics(se) } -func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) { - j.finish(se, now, summary) +// CheckFinishedJob is an exported version of checkFinishedJob +func (m *JobManager) CheckFinishedJob(se session.Session) { + m.checkFinishedJob(se) +} + +func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) error { + return j.finish(se, now, summary) } func (j *ttlJob) ID() string { diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index cb6765f5dcea9..064c68f86ed40 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -77,6 +77,11 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus } // ReportTaskFinished is an exported version of reportTaskFinished +func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error { + return m.reportTaskFinished(se, now, task) +} + +// SetResult sets the result of the task func (t *runningScanTask) SetResult(err error) { t.result = t.ttlScanTask.result(err) }