Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: fix ttl job start time will be override after failover (#44768) #44875

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}

func setTableStatusOwnerSQL(uuid string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{uuid, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
func setTableStatusOwnerSQL(uuid string, tableID int64, jobStart time.Time, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{uuid, id, jobStart, now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) {
Expand Down Expand Up @@ -589,19 +589,21 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
}

jobID = uuid.New().String()
jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
// so the running tasks don't need to be cancelled
jobID = tableStatus.CurrentJobID
expireTime = tableStatus.CurrentJobTTLExpire
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id)
sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand Down
24 changes: 23 additions & 1 deletion ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,30 @@ func TestJobTimeout(t *testing.T) {
// there is already a task
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

m2 := ttlworker.NewJobManager("manager-2", nil, store, nil)
m2.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m2.InfoSchemaCache().Update(se))
// schedule jobs
now = now.Add(10 * time.Minute)
m2.RescheduleJobs(se, now)
jobs := m2.RunningJobs()
require.Equal(t, 1, len(jobs))
require.Equal(t, jobs[0].ID(), tableStatus.CurrentJobID)

// check job has taken by another manager
sql, args = cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err = se.ExecuteSQL(ctx, sql, args...)
require.NoError(t, err)
newTableStatus, err := cache.RowToTableStatus(se, rows[0])
require.NoError(t, err)
require.Equal(t, "manager-2", newTableStatus.CurrentJobOwnerID)
require.Equal(t, tableStatus.CurrentJobID, newTableStatus.CurrentJobID)
require.Equal(t, tableStatus.CurrentJobStartTime, newTableStatus.CurrentJobStartTime)
require.Equal(t, tableStatus.CurrentJobTTLExpire, newTableStatus.CurrentJobTTLExpire)
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())

// the timeout will be checked while updating heartbeat
require.NoError(t, m.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
Expand Down
6 changes: 3 additions & 3 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, nil,
},
{
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, nil,
},
{
Expand All @@ -348,7 +348,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, errors.New("test error message"),
},
{
Expand Down