Skip to content

Commit

Permalink
ttl: refactor TTL to seperate creating new job and take timeout job c…
Browse files Browse the repository at this point in the history
…odes (#45286)

close #45285
  • Loading branch information
lcwangchao authored Jul 13, 2023
1 parent 63197ec commit 9c428bf
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 146 deletions.
230 changes: 134 additions & 96 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -315,20 +314,16 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo
PartitionName: ttlTbl.Partition.O,
}

job, err := m.lockNewJob(m.ctx, se, ttlTbl, now, true)
if err != nil {
jobID := uuid.NewString()
if _, err = m.lockNewJob(m.ctx, se, ttlTbl, now, jobID, false); err != nil {
firstError = err
tblResult.ErrorMessage = err.Error()
tableResults = append(tableResults, tblResult)
continue
}

allError = false
if job != nil {
m.appendJob(job)
tblResult.JobID = job.id
tableResults = append(tableResults, tblResult)
}
tblResult.JobID = jobID
tableResults = append(tableResults, tblResult)
}

if allError {
Expand Down Expand Up @@ -452,16 +447,18 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
return
}

newJobTables := m.readyForNewJobTables(now)
jobTables, isCreate := m.readyForLockJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range newJobTables {
for i, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err := m.lockNewJob(m.ctx, se, table, now, false)
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
var err error
if isCreate[i] {
_, err = m.lockNewJob(m.ctx, se, table, now, uuid.NewString(), true)
} else {
_, err = m.lockHBTimeoutJob(m.ctx, se, table, now)
}

if err != nil {
logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err))
}
Expand All @@ -482,9 +479,10 @@ func (m *JobManager) localJobs() []*ttlJob {
return jobs
}

// readyForNewJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable {
// readyForLockJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForLockJobTables(now time.Time) ([]*cache.PhysicalTable, []bool) {
tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables))
isCreate := make([]bool, 0, cap(tables))

tblLoop:
for _, table := range m.infoSchemaCache.Tables {
Expand All @@ -498,25 +496,53 @@ tblLoop:
}

status := m.tableStatusCache.Tables[table.ID]
ok := m.couldTrySchedule(status, table, now, false)
if ok {
if m.couldLockJob(status, table, now, true, true) {
tables = append(tables, table)
isCreate = append(isCreate, true)
} else if m.couldLockJob(status, table, now, false, false) {
tables = append(tables, table)
isCreate = append(isCreate, false)
}
}

return tables
return tables, isCreate
}

// couldTrySchedule returns whether a table should be tried to run TTL
func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) bool {
if tableStatus == nil {
// if the table status hasn't been created, return true
return true
}
// couldLockJob returns whether a table should be tried to run TTL
func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, isCreate bool, checkScheduleInterval bool) bool {
if table == nil {
// if the table is not recorded in info schema, return false
return false
}

if isCreate {
if tableStatus == nil {
return true
}

if tableStatus.CurrentJobID != "" {
return false
}

if !checkScheduleInterval {
return true
}

startTime := tableStatus.LastJobStartTime

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
}
return startTime.Add(interval).Before(now)
}

// if isCreate is false, it means to take over an exist job
if tableStatus == nil || tableStatus.CurrentJobID == "" {
return false
}

if tableStatus.CurrentJobOwnerID != "" {
// see whether it's heart beat time is expired
hbTime := tableStatus.CurrentJobOwnerHBTime
Expand All @@ -528,58 +554,50 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
}
return false
}
return true
}

if ignoreScheduleInterval || tableStatus.LastJobStartTime.IsZero() {
return true
}
func (m *JobManager) lockHBTimeoutJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
var jobID string
var jobStart time.Time
var expireTime time.Time
err := se.RunInTxn(ctx, func() error {
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, false)
if err != nil {
return err
}

if tableStatus == nil || !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, false, false) {
return errors.Errorf("couldn't lock timeout TTL job for table id '%d'", table.ID)
}

startTime := tableStatus.LastJobStartTime
jobID = tableStatus.CurrentJobID
jobStart = tableStatus.CurrentJobStartTime
expireTime = tableStatus.CurrentJobTTLExpire
sql, args := setTableStatusOwnerSQL(tableStatus.CurrentJobID, table.ID, jobStart, now, expireTime, m.id)
if _, err = se.ExecuteSQL(ctx, sql, args...); err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
return nil
}, session.TxnModePessimistic)

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
return nil, err
}
return startTime.Add(interval).Before(now)

return m.appendLockedJob(jobID, se, jobStart, expireTime, table)
}

// occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
// lockNewJob locks a new job
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, jobID string, checkScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
var jobID string

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
// cannot find the row, insert the status row
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
sql, args = cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.New("table status row still doesn't exist after insertion")
}
}
tableStatus, err := cache.RowToTableStatus(se, rows[0])
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, true)
if err != nil {
return err
}
if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, ignoreScheduleInterval) {

if !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, true, checkScheduleInterval) {
return errors.New("couldn't schedule ttl job")
}

Expand All @@ -588,32 +606,12 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

jobID = uuid.New().String()
jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
// so the running tasks don't need to be cancelled
jobID = tableStatus.CurrentJobID
expireTime = tableStatus.CurrentJobTTLExpire
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
sql, args := setTableStatusOwnerSQL(jobID, table.ID, now, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

// if the job already exist, don't need to submit scan tasks
if jobExist {
return nil
}

sql, args = createJobHistorySQL(jobID, table, expireTime, now)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
Expand Down Expand Up @@ -641,8 +639,48 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

return m.appendLockedJob(jobID, se, now, expireTime, table)
}

func (m *JobManager) getTableStatusForUpdateNotWait(ctx context.Context, se session.Session, physicalID int64, parentTableID int64, createIfNotExist bool) (*cache.TableStatus, error) {
sql, args := cache.SelectFromTTLTableStatusWithID(physicalID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 && !createIfNotExist {
return nil, nil
}

if len(rows) == 0 {
// cannot find the row, insert the status row
sql, args = insertNewTableIntoStatusSQL(physicalID, parentTableID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

sql, args = cache.SelectFromTTLTableStatusWithID(physicalID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 {
return nil, errors.New("table status row still doesn't exist after insertion")
}
}

return cache.RowToTableStatus(se, rows[0])
}

func (m *JobManager) appendLockedJob(id string, se session.Session, createTime time.Time, expireTime time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
// successfully update the table status, will need to refresh the cache.
err = m.updateInfoSchemaCache(se)
err := m.updateInfoSchemaCache(se)
if err != nil {
return nil, err
}
Expand All @@ -651,29 +689,29 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

job := m.createNewJob(jobID, expireTime, now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, id)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
}
return job, nil
}

func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
return &ttlJob{
job := &ttlJob{
id: id,
ownerID: m.id,

createTime: now,
createTime: createTime,
ttlExpireTime: expireTime,
// at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table
// information from schema cache directly
tbl: table,

status: cache.JobStatusRunning,
}

logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)

return job, nil
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
Expand Down
Loading

0 comments on commit 9c428bf

Please sign in to comment.