Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: refactor TTL to seperate creating new job and take timeout job codes #45286

Merged
merged 3 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 134 additions & 96 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -315,20 +314,16 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo
PartitionName: ttlTbl.Partition.O,
}

job, err := m.lockNewJob(m.ctx, se, ttlTbl, now, true)
if err != nil {
jobID := uuid.NewString()
if _, err = m.lockNewJob(m.ctx, se, ttlTbl, now, jobID, false); err != nil {
firstError = err
tblResult.ErrorMessage = err.Error()
tableResults = append(tableResults, tblResult)
continue
}

allError = false
if job != nil {
m.appendJob(job)
tblResult.JobID = job.id
tableResults = append(tableResults, tblResult)
}
tblResult.JobID = jobID
tableResults = append(tableResults, tblResult)
}

if allError {
Expand Down Expand Up @@ -452,16 +447,18 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
return
}

newJobTables := m.readyForNewJobTables(now)
jobTables, isCreate := m.readyForLockJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range newJobTables {
for i, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err := m.lockNewJob(m.ctx, se, table, now, false)
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
var err error
if isCreate[i] {
_, err = m.lockNewJob(m.ctx, se, table, now, uuid.NewString(), true)
} else {
_, err = m.lockHBTimeoutJob(m.ctx, se, table, now)
}

if err != nil {
logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err))
}
Expand All @@ -482,9 +479,10 @@ func (m *JobManager) localJobs() []*ttlJob {
return jobs
}

// readyForNewJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable {
// readyForLockJobTables returns all tables which should spawn a TTL job according to cache
func (m *JobManager) readyForLockJobTables(now time.Time) ([]*cache.PhysicalTable, []bool) {
tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables))
isCreate := make([]bool, 0, cap(tables))

tblLoop:
for _, table := range m.infoSchemaCache.Tables {
Expand All @@ -498,25 +496,53 @@ tblLoop:
}

status := m.tableStatusCache.Tables[table.ID]
ok := m.couldTrySchedule(status, table, now, false)
if ok {
if m.couldLockJob(status, table, now, true, true) {
tables = append(tables, table)
isCreate = append(isCreate, true)
} else if m.couldLockJob(status, table, now, false, false) {
tables = append(tables, table)
isCreate = append(isCreate, false)
}
}

return tables
return tables, isCreate
}

// couldTrySchedule returns whether a table should be tried to run TTL
func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) bool {
if tableStatus == nil {
// if the table status hasn't been created, return true
return true
}
// couldLockJob returns whether a table should be tried to run TTL
func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, isCreate bool, checkScheduleInterval bool) bool {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
if table == nil {
// if the table is not recorded in info schema, return false
return false
}

if isCreate {
if tableStatus == nil {
return true
}

if tableStatus.CurrentJobID != "" {
return false
}

if !checkScheduleInterval {
return true
}

startTime := tableStatus.LastJobStartTime

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
}
return startTime.Add(interval).Before(now)
}

// if isCreate is false, it means to take over an exist job
if tableStatus == nil || tableStatus.CurrentJobID == "" {
return false
}

if tableStatus.CurrentJobOwnerID != "" {
// see whether it's heart beat time is expired
hbTime := tableStatus.CurrentJobOwnerHBTime
Expand All @@ -528,58 +554,50 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
}
return false
}
return true
}

if ignoreScheduleInterval || tableStatus.LastJobStartTime.IsZero() {
return true
}
func (m *JobManager) lockHBTimeoutJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
var jobID string
var jobStart time.Time
var expireTime time.Time
err := se.RunInTxn(ctx, func() error {
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, false)
if err != nil {
return err
}

if tableStatus == nil || !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, false, false) {
return errors.Errorf("couldn't lock timeout TTL job for table id '%d'", table.ID)
}

startTime := tableStatus.LastJobStartTime
jobID = tableStatus.CurrentJobID
jobStart = tableStatus.CurrentJobStartTime
expireTime = tableStatus.CurrentJobTTLExpire
sql, args := setTableStatusOwnerSQL(tableStatus.CurrentJobID, table.ID, jobStart, now, expireTime, m.id)
if _, err = se.ExecuteSQL(ctx, sql, args...); err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
return nil
}, session.TxnModePessimistic)

interval, err := table.TTLInfo.GetJobInterval()
if err != nil {
logutil.Logger(m.ctx).Warn("illegal job interval", zap.Error(err))
return false
return nil, err
}
return startTime.Add(interval).Before(now)

return m.appendLockedJob(jobID, se, jobStart, expireTime, table)
}

// occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
// lockNewJob locks a new job
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, jobID string, checkScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
var jobID string

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
// cannot find the row, insert the status row
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
sql, args = cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.New("table status row still doesn't exist after insertion")
}
}
tableStatus, err := cache.RowToTableStatus(se, rows[0])
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, true)
if err != nil {
return err
}
if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, ignoreScheduleInterval) {

if !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, true, checkScheduleInterval) {
return errors.New("couldn't schedule ttl job")
}

Expand All @@ -588,32 +606,12 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

jobID = uuid.New().String()
jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
// so the running tasks don't need to be cancelled
jobID = tableStatus.CurrentJobID
expireTime = tableStatus.CurrentJobTTLExpire
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
sql, args := setTableStatusOwnerSQL(jobID, table.ID, now, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

// if the job already exist, don't need to submit scan tasks
if jobExist {
return nil
}

sql, args = createJobHistorySQL(jobID, table, expireTime, now)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
Expand Down Expand Up @@ -641,8 +639,48 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

return m.appendLockedJob(jobID, se, now, expireTime, table)
}

func (m *JobManager) getTableStatusForUpdateNotWait(ctx context.Context, se session.Session, physicalID int64, parentTableID int64, createIfNotExist bool) (*cache.TableStatus, error) {
sql, args := cache.SelectFromTTLTableStatusWithID(physicalID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 && !createIfNotExist {
return nil, nil
}

if len(rows) == 0 {
// cannot find the row, insert the status row
sql, args = insertNewTableIntoStatusSQL(physicalID, parentTableID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

sql, args = cache.SelectFromTTLTableStatusWithID(physicalID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 {
return nil, errors.New("table status row still doesn't exist after insertion")
}
}

return cache.RowToTableStatus(se, rows[0])
}

func (m *JobManager) appendLockedJob(id string, se session.Session, createTime time.Time, expireTime time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
// successfully update the table status, will need to refresh the cache.
err = m.updateInfoSchemaCache(se)
err := m.updateInfoSchemaCache(se)
if err != nil {
return nil, err
}
Expand All @@ -651,29 +689,29 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

job := m.createNewJob(jobID, expireTime, now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, id)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
}
return job, nil
}

func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
return &ttlJob{
job := &ttlJob{
id: id,
ownerID: m.id,

createTime: now,
createTime: createTime,
ttlExpireTime: expireTime,
// at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table
// information from schema cache directly
tbl: table,

status: cache.JobStatusRunning,
}

logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)

return job, nil
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
Expand Down
Loading