Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: refactor TTL to seperate creating new job and take timeout job codes #45286

Merged
merged 3 commits into from
Jul 13, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 59 additions & 38 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
@@ -315,7 +314,8 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo
PartitionName: ttlTbl.Partition.O,
}

job, err := m.lockNewJob(m.ctx, se, ttlTbl, now, true)
jobID := uuid.NewString()
job, err := m.lockJob(m.ctx, se, ttlTbl, now, jobID, false)
if err != nil {
firstError = err
tblResult.ErrorMessage = err.Error()
@@ -452,12 +452,16 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
return
}

newJobTables := m.readyForNewJobTables(now)
jobTables, isCreate := m.readyForLockJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range newJobTables {
for i, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err := m.lockNewJob(m.ctx, se, table, now, false)
jobID := ""
if isCreate[i] {
jobID = uuid.NewString()
}
job, err := m.lockJob(m.ctx, se, table, now, jobID, isCreate[i])
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
@@ -482,9 +486,10 @@ func (m *JobManager) localJobs() []*ttlJob {
return jobs
}

// readyForNewJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable {
// readyForLockJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForLockJobTables(now time.Time) ([]*cache.PhysicalTable, []bool) {
tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables))
isCreate := make([]bool, 0, cap(tables))

tblLoop:
for _, table := range m.infoSchemaCache.Tables {
@@ -498,25 +503,53 @@ tblLoop:
}

status := m.tableStatusCache.Tables[table.ID]
ok := m.couldTrySchedule(status, table, now, false)
if ok {
if m.couldLockJob(status, table, now, true, true) {
tables = append(tables, table)
isCreate = append(isCreate, true)
} else if m.couldLockJob(status, table, now, false, false) {
tables = append(tables, table)
isCreate = append(isCreate, false)
}
}

return tables
return tables, isCreate
}

// couldTrySchedule returns whether a table should be tried to run TTL
func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) bool {
if tableStatus == nil {
// if the table status hasn't been created, return true
return true
}
// couldLockJob returns whether a table should be tried to run TTL
func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, isCreate bool, checkScheduleInterval bool) bool {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
if table == nil {
// if the table is not recorded in info schema, return false
return false
}

if isCreate {
if tableStatus == nil {
return true
}

if tableStatus.CurrentJobID != "" {
return false
}

if !checkScheduleInterval {
return true
}

startTime := tableStatus.LastJobStartTime

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
}
return startTime.Add(interval).Before(now)
}

// if isCreate is false, it means to take over an exist job
if tableStatus.CurrentJobID == "" {
return false
}

if tableStatus.CurrentJobOwnerID != "" {
// see whether it's heart beat time is expired
hbTime := tableStatus.CurrentJobOwnerHBTime
@@ -528,27 +561,15 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
}
return false
}

if ignoreScheduleInterval || tableStatus.LastJobStartTime.IsZero() {
return true
}

startTime := tableStatus.LastJobStartTime

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
}
return startTime.Add(interval).Before(now)
return true
}

// occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// lockJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
func (m *JobManager) lockJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, createJobID string, checkScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
var jobID string
jobID := createJobID
isCreate := jobID != ""

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
@@ -560,6 +581,10 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
if !isCreate {
return errors.New("couldn't schedule ttl job")
}

// cannot find the row, insert the status row
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
@@ -579,7 +604,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return err
}
if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, ignoreScheduleInterval) {
if !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, isCreate, checkScheduleInterval) {
return errors.New("couldn't schedule ttl job")
}

@@ -588,7 +613,6 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

jobID = uuid.New().String()
jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
@@ -599,9 +623,6 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
10 changes: 7 additions & 3 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ func TestParallelLockNewJob(t *testing.T) {
m.InfoSchemaCache().Tables[testTable.ID] = testTable

se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
job, err := m.LockJob(context.Background(), se, testTable, time.Now(), uuid.NewString(), false)
require.NoError(t, err)
job.Finish(se, time.Now(), &ttlworker.TTLSummary{})

@@ -95,7 +95,7 @@ func TestParallelLockNewJob(t *testing.T) {
m.InfoSchemaCache().Tables[testTable.ID] = testTable

se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, now, false)
job, err := m.LockJob(context.Background(), se, testTable, now, uuid.NewString(), false)
if err == nil {
successCounter.Add(1)
successJob = job
@@ -129,7 +129,7 @@ func TestFinishJob(t *testing.T) {
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
startTime := time.Now()
job, err := m.LockNewJob(context.Background(), se, testTable, startTime, false)
job, err := m.LockJob(context.Background(), se, testTable, startTime, uuid.NewString(), false)
require.NoError(t, err)

expireTime, err := testTable.EvalExpireTime(context.Background(), se, startTime)
@@ -405,6 +405,7 @@ func TestRescheduleJobs(t *testing.T) {
m := ttlworker.NewJobManager("manager-1", nil, store, nil)
m.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
// schedule jobs
m.RescheduleJobs(se, now)
sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
@@ -423,6 +424,7 @@ func TestRescheduleJobs(t *testing.T) {
anotherManager := ttlworker.NewJobManager("manager-2", nil, store, nil)
anotherManager.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, anotherManager.InfoSchemaCache().Update(se))
require.NoError(t, anotherManager.TableStatusCache().Update(context.Background(), se))
anotherManager.RescheduleJobs(se, now.Add(time.Hour))
sql, args = cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err = se.ExecuteSQL(ctx, sql, args...)
@@ -459,6 +461,7 @@ func TestJobTimeout(t *testing.T) {
m := ttlworker.NewJobManager("manager-1", nil, store, nil)
m.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
// schedule jobs
m.RescheduleJobs(se, now)
// set the worker to be empty, so none of the tasks will be scheduled
@@ -478,6 +481,7 @@ func TestJobTimeout(t *testing.T) {
m2 := ttlworker.NewJobManager("manager-2", nil, store, nil)
m2.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(context.Background(), se))
// schedule jobs
now = now.Add(10 * time.Minute)
m2.RescheduleJobs(se, now)
Loading