diff --git a/domain/domain.go b/domain/domain.go index de448c530c801..c68f6c1349d31 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -2385,10 +2385,6 @@ func (do *Domain) runTTLJobManager(ctx context.Context) { ttlJobManager.Start() do.ttlJobManager = ttlJobManager - // TODO: read the worker count from `do.sysVarCache` and resize the workers - ttlworker.ScanWorkersCount.Store(4) - ttlworker.DeleteWorkerCount.Store(4) - <-do.exit ttlJobManager.Stop() diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 92f4a0d944adf..6f4badaa475ed 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -1525,8 +1524,6 @@ func TestRenameMultiTables(t *testing.T) { } func TestCreateTableWithTTL(t *testing.T) { - parser.TTLFeatureGate = true - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1546,8 +1543,6 @@ func TestCreateTableWithTTL(t *testing.T) { } func TestAlterTTLInfo(t *testing.T) { - parser.TTLFeatureGate = true - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1584,8 +1579,6 @@ func TestAlterTTLInfo(t *testing.T) { } func TestDisableTTLForTempTable(t *testing.T) { - parser.TTLFeatureGate = true - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1594,8 +1587,6 @@ func TestDisableTTLForTempTable(t *testing.T) { } func TestDisableTTLForFKParentTable(t *testing.T) { - parser.TTLFeatureGate = true - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/executor/showtest/BUILD.bazel b/executor/showtest/BUILD.bazel index aa61ad5bacd3b..807e00c8e88ec 100644 --- a/executor/showtest/BUILD.bazel +++ b/executor/showtest/BUILD.bazel @@ -15,7 +15,6 @@ go_test( "//executor", "//infoschema", "//meta/autoid", - "//parser", "//parser/auth", "//parser/model", "//parser/mysql", diff --git a/executor/showtest/show_test.go b/executor/showtest/show_test.go index 1a8778599d39a..11b2db520d723 100644 --- a/executor/showtest/show_test.go +++ b/executor/showtest/show_test.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -2020,8 +2019,6 @@ func TestShowLimitReturnRow(t *testing.T) { } func TestShowTTLOption(t *testing.T) { - parser.TTLFeatureGate = true - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/parser/BUILD.bazel b/parser/BUILD.bazel index e6b5bed7c4075..f52b1fc9ac4f3 100644 --- a/parser/BUILD.bazel +++ b/parser/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "lexer.go", "misc.go", "parser.go", - "ttlfeaturegate.go", "yy_parser.go", ], importpath = "github.com/pingcap/tidb/parser", diff --git a/parser/ast/ddl_test.go b/parser/ast/ddl_test.go index fed5c1c759fbf..e6107f34513ec 100644 --- a/parser/ast/ddl_test.go +++ b/parser/ast/ddl_test.go @@ -16,7 +16,6 @@ package ast_test import ( "testing" - "github.com/pingcap/tidb/parser" . "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/stretchr/testify/require" @@ -843,8 +842,6 @@ func TestFlashBackDatabaseRestore(t *testing.T) { } func TestTableOptionTTLRestore(t *testing.T) { - parser.TTLFeatureGate = true - sourceSQL1 := "create table t (created_at datetime) ttl = created_at + INTERVAL 1 YEAR" sourceSQL2 := "alter table t ttl_enable = 'OFF'" sourceSQL3 := "alter table t remove ttl" diff --git a/parser/parser.go b/parser/parser.go index 05688d8fc2ec3..e58fd78da70c8 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -11962,10 +11962,6 @@ yynewstate: } case 38: { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } parser.yyVAL.item = &ast.AlterTableSpec{ Tp: ast.AlterTableRemoveTTL, } @@ -20080,10 +20076,6 @@ yynewstate: } case 2156: { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } parser.yyVAL.item = &ast.TableOption{ Tp: ast.TableOptionTTL, ColumnName: &ast.ColumnName{Name: model.NewCIStr(yyS[yypt-4].ident)}, @@ -20093,10 +20085,6 @@ yynewstate: } case 2157: { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } onOrOff := strings.ToLower(yyS[yypt-0].ident) if onOrOff == "on" { parser.yyVAL.item = &ast.TableOption{Tp: ast.TableOptionTTLEnable, BoolValue: true} diff --git a/parser/parser.y b/parser/parser.y index a833e06cf727d..514103f7196b6 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -1742,10 +1742,6 @@ AlterTableSpecSingleOpt: } | "REMOVE" "TTL" { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } $$ = &ast.AlterTableSpec{ Tp: ast.AlterTableRemoveTTL, } @@ -11787,10 +11783,6 @@ TableOption: } | "TTL" EqOpt Identifier '+' "INTERVAL" Literal TimeUnit { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } $$ = &ast.TableOption{ Tp: ast.TableOptionTTL, ColumnName: &ast.ColumnName{Name: model.NewCIStr($3)}, @@ -11800,10 +11792,6 @@ TableOption: } | "TTL_ENABLE" EqOpt stringLit { - if !TTLFeatureGate { - yylex.AppendError(ErrSyntax) - return 1 - } onOrOff := strings.ToLower($3) if onOrOff == "on" { $$ = &ast.TableOption{Tp: ast.TableOptionTTLEnable, BoolValue: true} diff --git a/parser/parser_test.go b/parser/parser_test.go index 36170ad1d6883..b5975c475c06a 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -7021,8 +7021,6 @@ func TestIntervalPartition(t *testing.T) { } func TestTTLTableOption(t *testing.T) { - parser.TTLFeatureGate = true - table := []testCase{ // create table with various temporal interval {"create table t (created_at datetime) TTL = created_at + INTERVAL 3.1415 YEAR", true, "CREATE TABLE `t` (`created_at` DATETIME) TTL = `created_at` + INTERVAL 3.1415 YEAR"}, @@ -7050,21 +7048,3 @@ func TestTTLTableOption(t *testing.T) { RunTest(t, table, false) } - -func TestTTLFeatureGate(t *testing.T) { - parser.TTLFeatureGate = false - - table := []testCase{ - {"create table t (created_at datetime) TTL = created_at + INTERVAL 3.1415 YEAR", false, ""}, - {"create table t (created_at datetime) TTL_ENABLE = 'OFF'", false, ""}, - {"create table t (created_at datetime) TTL created_at + INTERVAL 1 YEAR TTL_ENABLE 'OFF'", false, ""}, - {"create table t (created_at datetime) /*T![ttl] ttl=created_at + INTERVAL 1 YEAR ttl_enable='ON'*/", false, ""}, - {"alter table t TTL = created_at + INTERVAL 1 MONTH", false, ""}, - {"alter table t TTL_ENABLE = 'ON'", false, ""}, - {"alter table t TTL = created_at + INTERVAL 1 MONTH TTL_ENABLE 'OFF'", false, ""}, - {"alter table t /*T![ttl] ttl=created_at + INTERVAL 1 YEAR ttl_enable='ON'*/", false, ""}, - {"alter table t remove ttl", false, ""}, - } - - RunTest(t, table, false) -} diff --git a/parser/ttlfeaturegate.go b/parser/ttlfeaturegate.go deleted file mode 100644 index d18633f137afe..0000000000000 --- a/parser/ttlfeaturegate.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package parser - -// TTLFeatureGate determines whether to enable the ttl related syntax in parser -var TTLFeatureGate = false diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index b11af728bc5c2..25e868b7c21f2 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2173,6 +2173,70 @@ var defaultSysVars = []*SysVar{ return nil }, }, + { + Scope: ScopeGlobal, Name: TiDBTTLJobRunInterval, Value: DefTiDBTTLJobRunInterval, Type: TypeDuration, MinValue: int64(10 * time.Minute), MaxValue: uint64(8760 * time.Hour), SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + interval, err := time.ParseDuration(s) + if err != nil { + return err + } + TTLJobRunInterval.Store(interval) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + interval := TTLJobRunInterval.Load() + + return interval.String(), nil + }, + }, + { + Scope: ScopeGlobal, Name: TiDBTTLJobScheduleWindowStartTime, Value: DefTiDBTTLJobScheduleWindowStartTime, Type: TypeTime, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + startTime, err := time.ParseInLocation(FullDayTimeFormat, s, time.UTC) + if err != nil { + return err + } + TTLJobScheduleWindowStartTime.Store(startTime) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + startTime := TTLJobScheduleWindowStartTime.Load() + return startTime.Format(FullDayTimeFormat), nil + }, + }, + { + Scope: ScopeGlobal, Name: TiDBTTLJobScheduleWindowEndTime, Value: DefTiDBTTLJobScheduleWindowEndTime, Type: TypeTime, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + endTime, err := time.ParseInLocation(FullDayTimeFormat, s, time.UTC) + if err != nil { + return err + } + TTLJobScheduleWindowEndTime.Store(endTime) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + endTime := TTLJobScheduleWindowEndTime.Load() + return endTime.Format(FullDayTimeFormat), nil + }, + }, + { + Scope: ScopeGlobal, Name: TiDBTTLScanWorkerCount, Value: strconv.Itoa(DefTiDBTTLScanWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: 256, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + TTLScanWorkerCount.Store(int32(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(TTLScanWorkerCount.Load())), nil + }, + }, + { + Scope: ScopeGlobal, Name: TiDBTTLDeleteWorkerCount, Value: strconv.Itoa(DefTiDBTTLDeleteWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: 256, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + TTLDeleteWorkerCount.Store(int32(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(TTLDeleteWorkerCount.Load())), nil + }, + }, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 3613870004bad..f6da8c87495c7 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -22,6 +22,7 @@ import ( "strconv" "sync/atomic" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -1051,3 +1052,43 @@ func TestSetAggPushDownGlobally(t *testing.T) { require.NoError(t, err) require.Equal(t, "ON", val) } + +func TestSetJobScheduleWindow(t *testing.T) { + vars := NewSessionVars(nil) + mock := NewMockGlobalAccessor4Tests() + mock.SessionVars = vars + vars.GlobalVarsAccessor = mock + + // default value + val, err := mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime) + require.NoError(t, err) + require.Equal(t, "00:00 +0000", val) + + // set and get variable in UTC + vars.TimeZone = time.UTC + err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11") + require.NoError(t, err) + val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime) + require.NoError(t, err) + require.Equal(t, "16:11 +0000", val) + + // set variable in UTC, get it in Asia/Shanghai + vars.TimeZone = time.UTC + err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11") + require.NoError(t, err) + vars.TimeZone, err = time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime) + require.NoError(t, err) + require.Equal(t, "16:11 +0000", val) + + // set variable in Asia/Shanghai, get it it UTC + vars.TimeZone, err = time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11") + require.NoError(t, err) + vars.TimeZone = time.UTC + val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime) + require.NoError(t, err) + require.Equal(t, "16:11 +0800", val) +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 7c9175487d79f..47be64227ed2e 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -16,7 +16,9 @@ package variable import ( "context" + "fmt" "math" + "time" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" @@ -876,6 +878,16 @@ const ( TiDBTTLDeleteBatchSize = "tidb_ttl_delete_batch_size" // TiDBTTLDeleteRateLimit is used to control the delete rate limit for TTL jobs in each node TiDBTTLDeleteRateLimit = "tidb_ttl_delete_rate_limit" + // TiDBTTLJobRunInterval represents the schedule interval between two jobs for one TTL table + TiDBTTLJobRunInterval = "tidb_ttl_job_run_interval" + // TiDBTTLJobScheduleWindowStartTime is used to restrict the start time of the time window of scheduling the ttl jobs. + TiDBTTLJobScheduleWindowStartTime = "tidb_ttl_job_schedule_window_start_time" + // TiDBTTLJobScheduleWindowEndTime is used to restrict the end time of the time window of scheduling the ttl jobs. + TiDBTTLJobScheduleWindowEndTime = "tidb_ttl_job_schedule_window_end_time" + // TiDBTTLScanWorkerCount indicates the count of the scan workers in each TiDB node + TiDBTTLScanWorkerCount = "tidb_ttl_scan_worker_count" + // TiDBTTLDeleteWorkerCount indicates the count of the delete workers in each TiDB node + TiDBTTLDeleteWorkerCount = "tidb_ttl_delete_worker_count" // PasswordReuseHistory limit a few passwords to reuse. PasswordReuseHistory = "password_history" // PasswordReuseTime limit how long passwords can be reused. @@ -1134,6 +1146,11 @@ const ( DefPasswordReuseHistory = 0 DefPasswordReuseTime = 0 DefTiDBStoreBatchSize = 0 + DefTiDBTTLJobRunInterval = "1h0m0s" + DefTiDBTTLJobScheduleWindowStartTime = "00:00 +0000" + DefTiDBTTLJobScheduleWindowEndTime = "23:59 +0000" + DefTiDBTTLScanWorkerCount = 4 + DefTiDBTTLDeleteWorkerCount = 4 ) // Process global variables. @@ -1200,6 +1217,11 @@ var ( TTLScanBatchSize = atomic.NewInt64(DefTiDBTTLScanBatchSize) TTLDeleteBatchSize = atomic.NewInt64(DefTiDBTTLDeleteBatchSize) TTLDeleteRateLimit = atomic.NewInt64(DefTiDBTTLDeleteRateLimit) + TTLJobRunInterval = atomic.NewDuration(mustParseDuration(DefTiDBTTLJobRunInterval)) + TTLJobScheduleWindowStartTime = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowStartTime)) + TTLJobScheduleWindowEndTime = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowEndTime)) + TTLScanWorkerCount = atomic.NewInt32(DefTiDBTTLScanWorkerCount) + TTLDeleteWorkerCount = atomic.NewInt32(DefTiDBTTLDeleteWorkerCount) PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory) PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime) IsSandBoxModeEnabled = atomic.NewBool(false) @@ -1235,3 +1257,21 @@ func serverMemoryLimitDefaultValue() string { } return "0" } + +func mustParseDuration(str string) time.Duration { + duration, err := time.ParseDuration(str) + if err != nil { + panic(fmt.Sprintf("%s is not a duration", str)) + } + + return duration +} + +func mustParseTime(layout string, str string) time.Time { + time, err := time.ParseInLocation(layout, str, time.UTC) + if err != nil { + panic(fmt.Sprintf("%s is not in %s duration format", str, layout)) + } + + return time +} diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index f051716ced40b..8dd9724472dba 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/pingcap/tidb/ttl/cache", visibility = ["//visibility:public"], deps = [ + "//infoschema", "//kv", "//parser/ast", "//parser/model", @@ -46,7 +47,6 @@ go_test( deps = [ "//infoschema", "//kv", - "//parser", "//parser/model", "//server", "//store/helper", diff --git a/ttl/cache/infoschema.go b/ttl/cache/infoschema.go index 5fdf8d2081dc6..87535a4e951b3 100644 --- a/ttl/cache/infoschema.go +++ b/ttl/cache/infoschema.go @@ -17,6 +17,7 @@ package cache import ( "time" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" @@ -40,7 +41,7 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache { // Update updates the info schema cache func (isc *InfoSchemaCache) Update(se session.Session) error { - is := se.SessionInfoSchema() + is := se.GetDomainInfoSchema().(infoschema.InfoSchema) if isc.schemaVer == is.SchemaMetaVersion() { return nil diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go index 7e811050b4601..4cec3db563fa7 100644 --- a/ttl/cache/infoschema_test.go +++ b/ttl/cache/infoschema_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" @@ -27,8 +26,6 @@ import ( ) func TestInfoSchemaCache(t *testing.T) { - parser.TTLFeatureGate = true - store, dom := testkit.CreateMockStoreAndDomain(t) sv := server.CreateMockServer(t, store) sv.SetDomain(dom) diff --git a/ttl/cache/split_test.go b/ttl/cache/split_test.go index 2e8e37655a31c..1d2279eb8d0f9 100644 --- a/ttl/cache/split_test.go +++ b/ttl/cache/split_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/tablecodec" @@ -268,11 +267,6 @@ func checkRange(t *testing.T, r cache.ScanRange, start, end types.Datum) { } func TestSplitTTLScanRangesWithSignedInt(t *testing.T) { - parser.TTLFeatureGate = true - defer func() { - parser.TTLFeatureGate = false - }() - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -336,11 +330,6 @@ func TestSplitTTLScanRangesWithSignedInt(t *testing.T) { } func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) { - parser.TTLFeatureGate = true - defer func() { - parser.TTLFeatureGate = false - }() - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -406,11 +395,6 @@ func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) { } func TestSplitTTLScanRangesWithBytes(t *testing.T) { - parser.TTLFeatureGate = true - defer func() { - parser.TTLFeatureGate = false - }() - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -461,11 +445,6 @@ func TestSplitTTLScanRangesWithBytes(t *testing.T) { } func TestNoTTLSplitSupportTables(t *testing.T) { - parser.TTLFeatureGate = true - defer func() { - parser.TTLFeatureGate = false - }() - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/ttl/cache/table_test.go b/ttl/cache/table_test.go index f79d4c3bf4256..ca280d9b36251 100644 --- a/ttl/cache/table_test.go +++ b/ttl/cache/table_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" @@ -29,8 +28,6 @@ import ( ) func TestNewTTLTable(t *testing.T) { - parser.TTLFeatureGate = true - cases := []struct { db string tbl string @@ -166,8 +163,6 @@ func TestNewTTLTable(t *testing.T) { } func TestEvalTTLExpireTime(t *testing.T) { - parser.TTLFeatureGate = true - store, do := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("create table test.t(a int, t datetime) ttl = `t` + interval 1 day") diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go index cb1b8ef5942fe..1657105e6c3e7 100644 --- a/ttl/cache/ttlstatus.go +++ b/ttl/cache/ttlstatus.go @@ -40,7 +40,7 @@ const ( JobStatusTimeout = "timeout" ) -const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" +const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" // SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id func SelectFromTTLTableStatusWithID(tableID int64) string { diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index fa8cf07751d02..8b7c39270807e 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@org_golang_x_time//rate", - "@org_uber_go_atomic//:atomic", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", ], diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index e7a8e344c3e16..a92f362241fcf 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -16,35 +16,14 @@ package ttlworker import ( "time" - - "go.uber.org/atomic" ) -// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL -// some of them are only used in test - const jobManagerLoopTickerInterval = 10 * time.Second -const updateInfoSchemaCacheInterval = time.Minute -const updateTTLTableStatusCacheInterval = 10 * time.Minute +const updateInfoSchemaCacheInterval = 2 * time.Minute +const updateTTLTableStatusCacheInterval = 2 * time.Minute const ttlInternalSQLTimeout = 30 * time.Second -const ttlJobTimeout = 6 * time.Hour - -// TODO: add this variable to the sysvar -const ttlJobInterval = time.Hour - -// TODO: add these variables to the sysvar -var ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2006-01-02 00:00:00") -var ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2006-01-02 23:59:00") - -// TODO: migrate these two count to sysvar - -// ScanWorkersCount defines the count of scan worker -var ScanWorkersCount = atomic.NewUint64(0) - -// DeleteWorkerCount defines the count of delete worker -var DeleteWorkerCount = atomic.NewUint64(0) - const resizeWorkersInterval = 30 * time.Second const splitScanCount = 64 +const ttlJobTimeout = 6 * time.Hour diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 0d9d65bf54fe9..5551984cceb08 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -83,7 +83,11 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status } func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { - _, err := se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, job.statistics.String(), job.ownerID)) + jsonStatistics, err := job.statistics.MarshalJSON() + if err != nil { + return err + } + _, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, string(jsonStatistics), job.ownerID)) if err != nil { return errors.Trace(err) } diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index f71f79bd2e78c..dda3ffd8f69f5 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" @@ -154,11 +155,11 @@ func (m *JobManager) jobLoop() error { m.checkFinishedJob(se, now) m.checkNotOwnJob() case <-resizeWorkersTicker: - err := m.resizeScanWorkers(int(ScanWorkersCount.Load())) + err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load())) if err != nil { logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) } - err = m.resizeDelWorkers(int(DeleteWorkerCount.Load())) + err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load())) if err != nil { logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) } @@ -331,7 +332,7 @@ func (m *JobManager) checkFinishedJob(se session.Session, now time.Time) { } func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { - if !timeutil.WithinDayTimePeriod(ttlJobScheduleWindowStartTime, ttlJobScheduleWindowEndTime, now) { + if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) { // Local jobs will also not run, but as the server is still sending heartbeat, // and keep the job in memory, it could start the left task in the next window. return @@ -438,7 +439,18 @@ func (m *JobManager) localJobs() []*ttlJob { // readyForNewJobTables returns all tables which should spawn a TTL job according to cache func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable { tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables)) + +tblLoop: for _, table := range m.infoSchemaCache.Tables { + // If this node already has a job for this table, just ignore. + // Actually, the logic should ensure this condition never meet, we still add the check here to keep safety + // (especially when the content of the status table is incorrect) + for _, job := range m.runningJobs { + if job.tbl.ID == table.ID { + continue tblLoop + } + } + status := m.tableStatusCache.Tables[table.ID] ok := m.couldTrySchedule(status, now) if ok { @@ -473,7 +485,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b finishTime := table.LastJobFinishTime - return finishTime.Add(ttlJobInterval).Before(now) + return finishTime.Add(variable.TTLJobRunInterval.Load()).Before(now) } // occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index f1566b2bc02bf..87bc19ca08261 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -132,7 +133,7 @@ func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row { return rows } -var updateStatusSQL = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" +var updateStatusSQL = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" func (m *JobManager) SetScanWorkers4Test(workers []worker) { m.scanWorkers = workers @@ -444,16 +445,18 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) { }, } m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)} - savedttlJobScheduleWindowStartTime := ttlJobScheduleWindowStartTime - savedttlJobScheduleWindowEndTime := ttlJobScheduleWindowEndTime - ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2022-12-06 12:00:00") - ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2022-12-06 12:05:00") + savedttlJobScheduleWindowStartTime := variable.TTLJobScheduleWindowStartTime.Load() + savedttlJobScheduleWindowEndTime := variable.TTLJobScheduleWindowEndTime.Load() + ttlJobScheduleWindowStartTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:00 +0000", time.UTC) + variable.TTLJobScheduleWindowStartTime.Store(ttlJobScheduleWindowStartTime) + ttlJobScheduleWindowEndTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:05 +0000", time.UTC) + variable.TTLJobScheduleWindowEndTime.Store(ttlJobScheduleWindowEndTime) defer func() { - ttlJobScheduleWindowStartTime = savedttlJobScheduleWindowStartTime - ttlJobScheduleWindowEndTime = savedttlJobScheduleWindowEndTime + variable.TTLJobScheduleWindowStartTime.Store(savedttlJobScheduleWindowStartTime) + variable.TTLJobScheduleWindowEndTime.Store(savedttlJobScheduleWindowEndTime) }() - now, _ := time.Parse(timeFormat, "2022-12-06 12:06:00") + now, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:06 +0000", time.UTC) m.rescheduleJobs(se, now) scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) scanWorker1.checkPollResult(false, "") @@ -461,7 +464,7 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) { scanWorker2.checkPollResult(false, "") // jobs will be scheduled within the time window - now, _ = time.Parse(timeFormat, "2022-12-06 12:02:00") + now, _ = time.ParseInLocation(variable.FullDayTimeFormat, "12:02 +0000", time.UTC) m.rescheduleJobs(se, now) scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) scanWorker1.checkPollResult(false, "") diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 38a4fd544535d..242c51fb8b686 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "encoding/json" "fmt" "strconv" "sync/atomic" @@ -71,6 +72,20 @@ func (s *ttlStatistics) String() string { return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load()) } +func (s *ttlStatistics) MarshalJSON() ([]byte, error) { + type jsonStatistics struct { + TotalRows uint64 `json:"total_rows"` + SuccessRows uint64 `json:"success_rows"` + ErrorRows uint64 `json:"error_rows"` + } + + return json.Marshal(jsonStatistics{ + TotalRows: s.TotalRows.Load(), + SuccessRows: s.SuccessRows.Load(), + ErrorRows: s.ErrorRows.Load(), + }) +} + type ttlScanTask struct { ctx context.Context diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 66582084b18f3..34a25a2539612 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -403,3 +403,13 @@ func TestScanTaskDoScan(t *testing.T) { task.schemaChangeInRetry = 2 task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist") } + +func TestTTLStatisticsMarshalJSON(t *testing.T) { + statistics := &ttlStatistics{} + statistics.TotalRows.Store(1) + statistics.ErrorRows.Store(255) + statistics.SuccessRows.Store(128) + j, err := statistics.MarshalJSON() + require.NoError(t, err) + require.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255}`, string(j)) +} diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go index 8cceaed7ac72b..877fd7996eaa7 100644 --- a/ttl/ttlworker/session_test.go +++ b/ttl/ttlworker/session_test.go @@ -154,6 +154,10 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { } } +func (s *mockSession) GetDomainInfoSchema() sessionctx.InfoschemaMetaVersion { + return s.sessionInfoSchema +} + func (s *mockSession) SessionInfoSchema() infoschema.InfoSchema { require.False(s.t, s.closed) return s.sessionInfoSchema