diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 828e741e4fd89..f80a62361ea4c 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -68,8 +68,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, [] return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID} } -func setTableStatusOwnerSQL(uuid string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { - return setTableStatusOwnerTemplate, []interface{}{uuid, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} +func setTableStatusOwnerSQL(uuid string, tableID int64, jobStart time.Time, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { + return setTableStatusOwnerTemplate, []interface{}{uuid, id, jobStart, now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} } func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) { @@ -589,19 +589,21 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * } jobID = uuid.New().String() + jobStart := now jobExist := false if len(tableStatus.CurrentJobID) > 0 { // don't create new job if there is already one running // so the running tasks don't need to be cancelled jobID = tableStatus.CurrentJobID expireTime = tableStatus.CurrentJobTTLExpire + jobStart = tableStatus.CurrentJobStartTime jobExist = true } failpoint.Inject("set-job-uuid", func(val failpoint.Value) { jobID = val.(string) }) - sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id) + sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id) _, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 1be9722fe0757..0cfe118de02d4 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -475,8 +475,30 @@ func TestJobTimeout(t *testing.T) { // there is already a task tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + m2 := ttlworker.NewJobManager("manager-2", nil, store, nil) + m2.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, m2.InfoSchemaCache().Update(se)) + // schedule jobs + now = now.Add(10 * time.Minute) + m2.RescheduleJobs(se, now) + jobs := m2.RunningJobs() + require.Equal(t, 1, len(jobs)) + require.Equal(t, jobs[0].ID(), tableStatus.CurrentJobID) + + // check job has taken by another manager + sql, args = cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err = se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + newTableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + require.Equal(t, "manager-2", newTableStatus.CurrentJobOwnerID) + require.Equal(t, tableStatus.CurrentJobID, newTableStatus.CurrentJobID) + require.Equal(t, tableStatus.CurrentJobStartTime, newTableStatus.CurrentJobStartTime) + require.Equal(t, tableStatus.CurrentJobTTLExpire, newTableStatus.CurrentJobTTLExpire) + require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix()) + // the timeout will be checked while updating heartbeat - require.NoError(t, m.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour))) + require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour))) tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout")) tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) } diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 132a81a25084f..9b542500cc1b0 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -296,7 +296,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")), nil, nil, }, { @@ -326,7 +326,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")), nil, nil, }, { @@ -348,7 +348,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")), + getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")), nil, errors.New("test error message"), }, {