Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Jul 13, 2023
1 parent 67d5090 commit 7115115
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 119 deletions.
165 changes: 91 additions & 74 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,15 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo
}

jobID := uuid.NewString()
job, err := m.lockJob(m.ctx, se, ttlTbl, now, jobID, false)
if err != nil {
if _, err = m.lockNewJob(m.ctx, se, ttlTbl, now, jobID, false); err != nil {
firstError = err
tblResult.ErrorMessage = err.Error()
tableResults = append(tableResults, tblResult)
continue
}

allError = false
if job != nil {
m.appendJob(job)
tblResult.JobID = job.id
tableResults = append(tableResults, tblResult)
}
tblResult.JobID = jobID
tableResults = append(tableResults, tblResult)
}

if allError {
Expand Down Expand Up @@ -457,15 +452,13 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
// when the heart beat is not sent
for i, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
jobID := ""
var err error
if isCreate[i] {
jobID = uuid.NewString()
}
job, err := m.lockJob(m.ctx, se, table, now, jobID, isCreate[i])
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
_, err = m.lockNewJob(m.ctx, se, table, now, uuid.NewString(), true)
} else {
_, err = m.lockHBTimeoutJob(m.ctx, se, table, now)
}

if err != nil {
logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err))
}
Expand Down Expand Up @@ -546,7 +539,7 @@ func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.P
}

// if isCreate is false, it means to take over an exist job
if tableStatus.CurrentJobID == "" {
if tableStatus == nil || tableStatus.CurrentJobID == "" {
return false
}

Expand All @@ -564,47 +557,47 @@ func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.P
return true
}

// lockJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// localJob and return it.
func (m *JobManager) lockJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, createJobID string, checkScheduleInterval bool) (*ttlJob, error) {
func (m *JobManager) lockHBTimeoutJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
var jobID string
var jobStart time.Time
var expireTime time.Time
jobID := createJobID
isCreate := jobID != ""

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, false)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
return err
}
if len(rows) == 0 {
if !isCreate {
return errors.New("couldn't schedule ttl job")
}

// cannot find the row, insert the status row
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
sql, args = cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.New("table status row still doesn't exist after insertion")
}
if tableStatus == nil || !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, false, false) {
return errors.Errorf("couldn't lock timeout TTL job for table id '%d'", table.ID)
}

jobID = tableStatus.CurrentJobID
jobStart = tableStatus.CurrentJobStartTime
expireTime = tableStatus.CurrentJobTTLExpire
sql, args := setTableStatusOwnerSQL(tableStatus.CurrentJobID, table.ID, jobStart, now, expireTime, m.id)
if _, err = se.ExecuteSQL(ctx, sql, args...); err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
tableStatus, err := cache.RowToTableStatus(se, rows[0])
return nil
}, session.TxnModePessimistic)

if err != nil {
return nil, err
}

return m.appendLockedJob(jobID, se, jobStart, expireTime, table)
}

// lockNewJob locks a new job
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, jobID string, checkScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
err := se.RunInTxn(ctx, func() error {
tableStatus, err := m.getTableStatusForUpdateNotWait(ctx, se, table.ID, table.TableInfo.ID, true)
if err != nil {
return err
}
if !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, isCreate, checkScheduleInterval) {

if !m.couldLockJob(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, true, checkScheduleInterval) {
return errors.New("couldn't schedule ttl job")
}

Expand All @@ -613,28 +606,12 @@ func (m *JobManager) lockJob(ctx context.Context, se session.Session, table *cac
return err
}

jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
// so the running tasks don't need to be cancelled
jobID = tableStatus.CurrentJobID
expireTime = tableStatus.CurrentJobTTLExpire
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}

sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
sql, args := setTableStatusOwnerSQL(jobID, table.ID, now, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

// if the job already exist, don't need to submit scan tasks
if jobExist {
return nil
}

sql, args = createJobHistorySQL(jobID, table, expireTime, now)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
Expand Down Expand Up @@ -662,8 +639,48 @@ func (m *JobManager) lockJob(ctx context.Context, se session.Session, table *cac
return nil, err
}

return m.appendLockedJob(jobID, se, now, expireTime, table)
}

func (m *JobManager) getTableStatusForUpdateNotWait(ctx context.Context, se session.Session, physicalID int64, parentTableID int64, createIfNotExist bool) (*cache.TableStatus, error) {
sql, args := cache.SelectFromTTLTableStatusWithID(physicalID)
// use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return:
// [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.
// Then this tidb node will not waste resource in calculating the ranges.
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 && !createIfNotExist {
return nil, nil
}

if len(rows) == 0 {
// cannot find the row, insert the status row
sql, args = insertNewTableIntoStatusSQL(physicalID, parentTableID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

sql, args = cache.SelectFromTTLTableStatusWithID(physicalID)
rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}

if len(rows) == 0 {
return nil, errors.New("table status row still doesn't exist after insertion")
}
}

return cache.RowToTableStatus(se, rows[0])
}

func (m *JobManager) appendLockedJob(id string, se session.Session, createTime time.Time, expireTime time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
// successfully update the table status, will need to refresh the cache.
err = m.updateInfoSchemaCache(se)
err := m.updateInfoSchemaCache(se)
if err != nil {
return nil, err
}
Expand All @@ -672,29 +689,29 @@ func (m *JobManager) lockJob(ctx context.Context, se session.Session, table *cac
return nil, err
}

job := m.createNewJob(jobID, expireTime, now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, id)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
}
return job, nil
}

func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
return &ttlJob{
job := &ttlJob{
id: id,
ownerID: m.id,

createTime: now,
createTime: createTime,
ttlExpireTime: expireTime,
// at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table
// information from schema cache directly
tbl: table,

status: cache.JobStatusRunning,
}

logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)

return job, nil
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
Expand Down
Loading

0 comments on commit 7115115

Please sign in to comment.