Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: use a pessimistic transaction to finish the job #56516

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_test(
"//pkg/statistics",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/timer/api",
"//pkg/timer/tablestore",
"//pkg/ttl/cache",
Expand Down
15 changes: 7 additions & 8 deletions pkg/ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
Expand Down Expand Up @@ -122,8 +121,9 @@ type ttlJob struct {
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) {
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) error {
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())

// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err := se.RunInTxn(context.TODO(), func() error {
Expand All @@ -145,10 +145,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}, session.TxnModeOptimistic)
failpoint.InjectCall("ttl-finish", &err)
return err
}, session.TxnModePessimistic)
YangKeao marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
return err
}
31 changes: 23 additions & 8 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,12 @@ j:
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
}
err = job.finish(se, se.Now(), summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, se.Now(), summary)
}
cancel()
}
Expand Down Expand Up @@ -591,10 +595,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

summary, err := summarizeErr(errors.New(cancelReason))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}
}
return
Expand All @@ -611,10 +619,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
Expand Down Expand Up @@ -882,11 +894,14 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, no
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
continue
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
Expand Down
150 changes: 150 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
dbsession "github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
timerapi "github.com/pingcap/tidb/pkg/timer/api"
timertable "github.com/pingcap/tidb/pkg/timer/tablestore"
"github.com/pingcap/tidb/pkg/ttl/cache"
Expand Down Expand Up @@ -1304,3 +1306,151 @@ func TestManagerJobAdapterNow(t *testing.T) {
require.Equal(t, "Europe/Berlin", now.Location().String())
require.InDelta(t, now.Unix(), localNow.Unix(), 10)
}

func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
// Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time
// the `task_manager` may update the owner of the task, which may cause a write conflict.
// This test is to simulate this scenario.
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

testTimes := 1000
now := se.Now()
for i := 0; i < testTimes; i++ {
now = now.Add(time.Hour * 48)

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

se := sessionFactory()
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
se := sessionFactory()

m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
}()

go func() {
defer wg.Done()
se := sessionFactory()

job.Finish(se, se.Now(), &ttlworker.TTLSummary{})
}()

wg.Wait()
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
}

func TestFinishError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

errCount := 5
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) {
errCount -= 1
if errCount > 0 {
*err = errors.New("mock error")
}
})

now := se.Now()

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

initializeTest := func() {
errCount = 5
now = now.Add(time.Hour * 48)
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
task.SetResult(nil)
err = m.TaskManager().ReportTaskFinished(se, now, task)
require.NoError(t, err)
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished"))
}

// Test the `CheckFinishedJob` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))

// Test the `rescheduleJobs` can tolerate the `job.finish` error
// cancel job branch
initializeTest()
variable.EnableTTLJob.Store(false)
t.Cleanup(func() {
variable.EnableTTLJob.Store(true)
})
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
variable.EnableTTLJob.Store(true)
// remove table branch
initializeTest()
tk.MustExec("drop table t")
require.NoError(t, m.InfoSchemaCache().Update(se))
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
require.NoError(t, m.InfoSchemaCache().Update(se))
testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

// Teset the `updateHeartBeat` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
// timeout is 6h
now = now.Add(time.Hour * 8)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
9 changes: 7 additions & 2 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,13 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) error {
return j.finish(se, now, summary)
}

func (j *ttlJob) ID() string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus
}

// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error {
return m.reportTaskFinished(se, now, task)
}

// SetResult sets the result of the task
func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}
Expand Down