Skip to content

Commit

Permalink
ttl: use a pessimistic transaction to finish the job (pingcap#56516)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Oct 10, 2024
1 parent f732b78 commit 93cad31
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_test(
"//pkg/statistics",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/timer/api",
"//pkg/timer/tablestore",
"//pkg/ttl/cache",
Expand Down
15 changes: 7 additions & 8 deletions pkg/ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
Expand Down Expand Up @@ -122,8 +121,9 @@ type ttlJob struct {
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) {
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) error {
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())

// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err := se.RunInTxn(context.TODO(), func() error {
Expand All @@ -145,10 +145,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}, session.TxnModeOptimistic)
failpoint.InjectCall("ttl-finish", &err)
return err
}, session.TxnModePessimistic)

if err != nil {
logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
return err
}
31 changes: 23 additions & 8 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,12 @@ j:
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
}
err = job.finish(se, se.Now(), summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, se.Now(), summary)
}
cancel()
}
Expand Down Expand Up @@ -591,10 +595,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

summary, err := summarizeErr(errors.New(cancelReason))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}
}
return
Expand All @@ -611,10 +619,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
Expand Down Expand Up @@ -882,11 +894,14 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, no
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
continue
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
Expand Down
150 changes: 150 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
dbsession "github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
timerapi "github.com/pingcap/tidb/pkg/timer/api"
timertable "github.com/pingcap/tidb/pkg/timer/tablestore"
"github.com/pingcap/tidb/pkg/ttl/cache"
Expand Down Expand Up @@ -1304,3 +1306,151 @@ func TestManagerJobAdapterNow(t *testing.T) {
require.Equal(t, "Europe/Berlin", now.Location().String())
require.InDelta(t, now.Unix(), localNow.Unix(), 10)
}

func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
// Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time
// the `task_manager` may update the owner of the task, which may cause a write conflict.
// This test is to simulate this scenario.
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

testTimes := 1000
now := se.Now()
for i := 0; i < testTimes; i++ {
now = now.Add(time.Hour * 48)

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

se := sessionFactory()
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
se := sessionFactory()

m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
}()

go func() {
defer wg.Done()
se := sessionFactory()

job.Finish(se, se.Now(), &ttlworker.TTLSummary{})
}()

wg.Wait()
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
}

func TestFinishError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

errCount := 5
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) {
errCount -= 1
if errCount > 0 {
*err = errors.New("mock error")
}
})

now := se.Now()

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

initializeTest := func() {
errCount = 5
now = now.Add(time.Hour * 48)
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
task.SetResult(nil)
err = m.TaskManager().ReportTaskFinished(se, now, task)
require.NoError(t, err)
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished"))
}

// Test the `CheckFinishedJob` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))

// Test the `rescheduleJobs` can tolerate the `job.finish` error
// cancel job branch
initializeTest()
variable.EnableTTLJob.Store(false)
t.Cleanup(func() {
variable.EnableTTLJob.Store(true)
})
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
variable.EnableTTLJob.Store(true)
// remove table branch
initializeTest()
tk.MustExec("drop table t")
require.NoError(t, m.InfoSchemaCache().Update(se))
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
require.NoError(t, m.InfoSchemaCache().Update(se))
testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

// Teset the `updateHeartBeat` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
// timeout is 6h
now = now.Add(time.Hour * 8)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
9 changes: 7 additions & 2 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,13 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) error {
return j.finish(se, now, summary)
}

func (j *ttlJob) ID() string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus
}

// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error {
return m.reportTaskFinished(se, now, task)
}

// SetResult sets the result of the task
func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}
Expand Down

0 comments on commit 93cad31

Please sign in to comment.