diff --git a/domain/domain.go b/domain/domain.go
index de448c530c801..c68f6c1349d31 100644
--- a/domain/domain.go
+++ b/domain/domain.go
@@ -2385,10 +2385,6 @@ func (do *Domain) runTTLJobManager(ctx context.Context) {
 	ttlJobManager.Start()
 	do.ttlJobManager = ttlJobManager
 
-	// TODO: read the worker count from `do.sysVarCache` and resize the workers
-	ttlworker.ScanWorkersCount.Store(4)
-	ttlworker.DeleteWorkerCount.Store(4)
-
 	<-do.exit
 
 	ttlJobManager.Stop()
diff --git a/executor/ddl_test.go b/executor/ddl_test.go
index 92f4a0d944adf..6f4badaa475ed 100644
--- a/executor/ddl_test.go
+++ b/executor/ddl_test.go
@@ -34,7 +34,6 @@ import (
 	"github.com/pingcap/tidb/kv"
 	"github.com/pingcap/tidb/meta"
 	"github.com/pingcap/tidb/meta/autoid"
-	"github.com/pingcap/tidb/parser"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/parser/mysql"
 	"github.com/pingcap/tidb/parser/terror"
@@ -1525,8 +1524,6 @@ func TestRenameMultiTables(t *testing.T) {
 }
 
 func TestCreateTableWithTTL(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 	tk.MustExec("use test")
@@ -1546,8 +1543,6 @@ func TestCreateTableWithTTL(t *testing.T) {
 }
 
 func TestAlterTTLInfo(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 	tk.MustExec("use test")
@@ -1584,8 +1579,6 @@ func TestAlterTTLInfo(t *testing.T) {
 }
 
 func TestDisableTTLForTempTable(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 	tk.MustExec("use test")
@@ -1594,8 +1587,6 @@ func TestDisableTTLForTempTable(t *testing.T) {
 }
 
 func TestDisableTTLForFKParentTable(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 	tk.MustExec("use test")
diff --git a/executor/showtest/BUILD.bazel b/executor/showtest/BUILD.bazel
index aa61ad5bacd3b..807e00c8e88ec 100644
--- a/executor/showtest/BUILD.bazel
+++ b/executor/showtest/BUILD.bazel
@@ -15,7 +15,6 @@ go_test(
         "//executor",
         "//infoschema",
         "//meta/autoid",
-        "//parser",
         "//parser/auth",
         "//parser/model",
         "//parser/mysql",
diff --git a/executor/showtest/show_test.go b/executor/showtest/show_test.go
index 1a8778599d39a..11b2db520d723 100644
--- a/executor/showtest/show_test.go
+++ b/executor/showtest/show_test.go
@@ -23,7 +23,6 @@ import (
 	"github.com/pingcap/failpoint"
 	"github.com/pingcap/tidb/executor"
 	"github.com/pingcap/tidb/infoschema"
-	"github.com/pingcap/tidb/parser"
 	"github.com/pingcap/tidb/parser/auth"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/parser/mysql"
@@ -2020,8 +2019,6 @@ func TestShowLimitReturnRow(t *testing.T) {
 }
 
 func TestShowTTLOption(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store := testkit.CreateMockStore(t)
 
 	tk := testkit.NewTestKit(t, store)
diff --git a/parser/BUILD.bazel b/parser/BUILD.bazel
index e6b5bed7c4075..f52b1fc9ac4f3 100644
--- a/parser/BUILD.bazel
+++ b/parser/BUILD.bazel
@@ -9,7 +9,6 @@ go_library(
         "lexer.go",
         "misc.go",
         "parser.go",
-        "ttlfeaturegate.go",
         "yy_parser.go",
     ],
     importpath = "github.com/pingcap/tidb/parser",
diff --git a/parser/ast/ddl_test.go b/parser/ast/ddl_test.go
index fed5c1c759fbf..e6107f34513ec 100644
--- a/parser/ast/ddl_test.go
+++ b/parser/ast/ddl_test.go
@@ -16,7 +16,6 @@ package ast_test
 import (
 	"testing"
 
-	"github.com/pingcap/tidb/parser"
 	. "github.com/pingcap/tidb/parser/ast"
 	"github.com/pingcap/tidb/parser/format"
 	"github.com/stretchr/testify/require"
@@ -843,8 +842,6 @@ func TestFlashBackDatabaseRestore(t *testing.T) {
 }
 
 func TestTableOptionTTLRestore(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	sourceSQL1 := "create table t (created_at datetime) ttl = created_at + INTERVAL 1 YEAR"
 	sourceSQL2 := "alter table t ttl_enable = 'OFF'"
 	sourceSQL3 := "alter table t remove ttl"
diff --git a/parser/parser.go b/parser/parser.go
index 05688d8fc2ec3..e58fd78da70c8 100644
--- a/parser/parser.go
+++ b/parser/parser.go
@@ -11962,10 +11962,6 @@ yynewstate:
 		}
 	case 38:
 		{
-			if !TTLFeatureGate {
-				yylex.AppendError(ErrSyntax)
-				return 1
-			}
 			parser.yyVAL.item = &ast.AlterTableSpec{
 				Tp: ast.AlterTableRemoveTTL,
 			}
@@ -20080,10 +20076,6 @@ yynewstate:
 		}
 	case 2156:
 		{
-			if !TTLFeatureGate {
-				yylex.AppendError(ErrSyntax)
-				return 1
-			}
 			parser.yyVAL.item = &ast.TableOption{
 				Tp:            ast.TableOptionTTL,
 				ColumnName:    &ast.ColumnName{Name: model.NewCIStr(yyS[yypt-4].ident)},
@@ -20093,10 +20085,6 @@ yynewstate:
 		}
 	case 2157:
 		{
-			if !TTLFeatureGate {
-				yylex.AppendError(ErrSyntax)
-				return 1
-			}
 			onOrOff := strings.ToLower(yyS[yypt-0].ident)
 			if onOrOff == "on" {
 				parser.yyVAL.item = &ast.TableOption{Tp: ast.TableOptionTTLEnable, BoolValue: true}
diff --git a/parser/parser.y b/parser/parser.y
index a833e06cf727d..514103f7196b6 100644
--- a/parser/parser.y
+++ b/parser/parser.y
@@ -1742,10 +1742,6 @@ AlterTableSpecSingleOpt:
 	}
 |	"REMOVE" "TTL"
 	{
-		if !TTLFeatureGate {
-			yylex.AppendError(ErrSyntax)
-			return 1
-		}
 		$$ = &ast.AlterTableSpec{
 			Tp: ast.AlterTableRemoveTTL,
 		}
@@ -11787,10 +11783,6 @@ TableOption:
 	}
 |	"TTL" EqOpt Identifier '+' "INTERVAL" Literal TimeUnit
 	{
-		if !TTLFeatureGate {
-			yylex.AppendError(ErrSyntax)
-			return 1
-		}
 		$$ = &ast.TableOption{
 			Tp:            ast.TableOptionTTL,
 			ColumnName:    &ast.ColumnName{Name: model.NewCIStr($3)},
@@ -11800,10 +11792,6 @@ TableOption:
 	}
 |	"TTL_ENABLE" EqOpt stringLit
 	{
-		if !TTLFeatureGate {
-			yylex.AppendError(ErrSyntax)
-			return 1
-		}
 		onOrOff := strings.ToLower($3)
 		if onOrOff == "on" {
 			$$ = &ast.TableOption{Tp: ast.TableOptionTTLEnable, BoolValue: true}
diff --git a/parser/parser_test.go b/parser/parser_test.go
index 36170ad1d6883..b5975c475c06a 100644
--- a/parser/parser_test.go
+++ b/parser/parser_test.go
@@ -7021,8 +7021,6 @@ func TestIntervalPartition(t *testing.T) {
 }
 
 func TestTTLTableOption(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	table := []testCase{
 		// create table with various temporal interval
 		{"create table t (created_at datetime) TTL = created_at + INTERVAL 3.1415 YEAR", true, "CREATE TABLE `t` (`created_at` DATETIME) TTL = `created_at` + INTERVAL 3.1415 YEAR"},
@@ -7050,21 +7048,3 @@ func TestTTLTableOption(t *testing.T) {
 
 	RunTest(t, table, false)
 }
-
-func TestTTLFeatureGate(t *testing.T) {
-	parser.TTLFeatureGate = false
-
-	table := []testCase{
-		{"create table t (created_at datetime) TTL = created_at + INTERVAL 3.1415 YEAR", false, ""},
-		{"create table t (created_at datetime) TTL_ENABLE = 'OFF'", false, ""},
-		{"create table t (created_at datetime) TTL created_at + INTERVAL 1 YEAR TTL_ENABLE 'OFF'", false, ""},
-		{"create table t (created_at datetime) /*T![ttl] ttl=created_at + INTERVAL 1 YEAR ttl_enable='ON'*/", false, ""},
-		{"alter table t TTL = created_at + INTERVAL 1 MONTH", false, ""},
-		{"alter table t TTL_ENABLE = 'ON'", false, ""},
-		{"alter table t TTL = created_at + INTERVAL 1 MONTH TTL_ENABLE 'OFF'", false, ""},
-		{"alter table t /*T![ttl] ttl=created_at + INTERVAL 1 YEAR ttl_enable='ON'*/", false, ""},
-		{"alter table t remove ttl", false, ""},
-	}
-
-	RunTest(t, table, false)
-}
diff --git a/parser/ttlfeaturegate.go b/parser/ttlfeaturegate.go
deleted file mode 100644
index d18633f137afe..0000000000000
--- a/parser/ttlfeaturegate.go
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright 2019 PingCAP, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package parser
-
-// TTLFeatureGate determines whether to enable the ttl related syntax in parser
-var TTLFeatureGate = false
diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go
index b11af728bc5c2..25e868b7c21f2 100644
--- a/sessionctx/variable/sysvar.go
+++ b/sessionctx/variable/sysvar.go
@@ -2173,6 +2173,70 @@ var defaultSysVars = []*SysVar{
 			return nil
 		},
 	},
+	{
+		Scope: ScopeGlobal, Name: TiDBTTLJobRunInterval, Value: DefTiDBTTLJobRunInterval, Type: TypeDuration, MinValue: int64(10 * time.Minute), MaxValue: uint64(8760 * time.Hour), SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
+			interval, err := time.ParseDuration(s)
+			if err != nil {
+				return err
+			}
+			TTLJobRunInterval.Store(interval)
+			return nil
+		}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
+			interval := TTLJobRunInterval.Load()
+
+			return interval.String(), nil
+		},
+	},
+	{
+		Scope: ScopeGlobal, Name: TiDBTTLJobScheduleWindowStartTime, Value: DefTiDBTTLJobScheduleWindowStartTime, Type: TypeTime, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
+			startTime, err := time.ParseInLocation(FullDayTimeFormat, s, time.UTC)
+			if err != nil {
+				return err
+			}
+			TTLJobScheduleWindowStartTime.Store(startTime)
+			return nil
+		}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
+			startTime := TTLJobScheduleWindowStartTime.Load()
+			return startTime.Format(FullDayTimeFormat), nil
+		},
+	},
+	{
+		Scope: ScopeGlobal, Name: TiDBTTLJobScheduleWindowEndTime, Value: DefTiDBTTLJobScheduleWindowEndTime, Type: TypeTime, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
+			endTime, err := time.ParseInLocation(FullDayTimeFormat, s, time.UTC)
+			if err != nil {
+				return err
+			}
+			TTLJobScheduleWindowEndTime.Store(endTime)
+			return nil
+		}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
+			endTime := TTLJobScheduleWindowEndTime.Load()
+			return endTime.Format(FullDayTimeFormat), nil
+		},
+	},
+	{
+		Scope: ScopeGlobal, Name: TiDBTTLScanWorkerCount, Value: strconv.Itoa(DefTiDBTTLScanWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: 256, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
+			val, err := strconv.ParseInt(s, 10, 64)
+			if err != nil {
+				return err
+			}
+			TTLScanWorkerCount.Store(int32(val))
+			return nil
+		}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
+			return strconv.Itoa(int(TTLScanWorkerCount.Load())), nil
+		},
+	},
+	{
+		Scope: ScopeGlobal, Name: TiDBTTLDeleteWorkerCount, Value: strconv.Itoa(DefTiDBTTLDeleteWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: 256, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
+			val, err := strconv.ParseInt(s, 10, 64)
+			if err != nil {
+				return err
+			}
+			TTLDeleteWorkerCount.Store(int32(val))
+			return nil
+		}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
+			return strconv.Itoa(int(TTLDeleteWorkerCount.Load())), nil
+		},
+	},
 }
 
 // FeedbackProbability points to the FeedbackProbability in statistics package.
diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go
index 3613870004bad..f6da8c87495c7 100644
--- a/sessionctx/variable/sysvar_test.go
+++ b/sessionctx/variable/sysvar_test.go
@@ -22,6 +22,7 @@ import (
 	"strconv"
 	"sync/atomic"
 	"testing"
+	"time"
 
 	"github.com/pingcap/failpoint"
 	"github.com/pingcap/tidb/config"
@@ -1051,3 +1052,43 @@ func TestSetAggPushDownGlobally(t *testing.T) {
 	require.NoError(t, err)
 	require.Equal(t, "ON", val)
 }
+
+func TestSetJobScheduleWindow(t *testing.T) {
+	vars := NewSessionVars(nil)
+	mock := NewMockGlobalAccessor4Tests()
+	mock.SessionVars = vars
+	vars.GlobalVarsAccessor = mock
+
+	// default value
+	val, err := mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime)
+	require.NoError(t, err)
+	require.Equal(t, "00:00 +0000", val)
+
+	// set and get variable in UTC
+	vars.TimeZone = time.UTC
+	err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11")
+	require.NoError(t, err)
+	val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime)
+	require.NoError(t, err)
+	require.Equal(t, "16:11 +0000", val)
+
+	// set variable in UTC, get it in Asia/Shanghai
+	vars.TimeZone = time.UTC
+	err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11")
+	require.NoError(t, err)
+	vars.TimeZone, err = time.LoadLocation("Asia/Shanghai")
+	require.NoError(t, err)
+	val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime)
+	require.NoError(t, err)
+	require.Equal(t, "16:11 +0000", val)
+
+	// set variable in Asia/Shanghai, get it it UTC
+	vars.TimeZone, err = time.LoadLocation("Asia/Shanghai")
+	require.NoError(t, err)
+	err = mock.SetGlobalSysVar(context.Background(), TiDBTTLJobScheduleWindowStartTime, "16:11")
+	require.NoError(t, err)
+	vars.TimeZone = time.UTC
+	val, err = mock.GetGlobalSysVar(TiDBTTLJobScheduleWindowStartTime)
+	require.NoError(t, err)
+	require.Equal(t, "16:11 +0800", val)
+}
diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go
index 7c9175487d79f..47be64227ed2e 100644
--- a/sessionctx/variable/tidb_vars.go
+++ b/sessionctx/variable/tidb_vars.go
@@ -16,7 +16,9 @@ package variable
 
 import (
 	"context"
+	"fmt"
 	"math"
+	"time"
 
 	"github.com/pingcap/tidb/config"
 	"github.com/pingcap/tidb/parser/mysql"
@@ -876,6 +878,16 @@ const (
 	TiDBTTLDeleteBatchSize = "tidb_ttl_delete_batch_size"
 	// TiDBTTLDeleteRateLimit is used to control the delete rate limit for TTL jobs in each node
 	TiDBTTLDeleteRateLimit = "tidb_ttl_delete_rate_limit"
+	// TiDBTTLJobRunInterval represents the schedule interval between two jobs for one TTL table
+	TiDBTTLJobRunInterval = "tidb_ttl_job_run_interval"
+	// TiDBTTLJobScheduleWindowStartTime is used to restrict the start time of the time window of scheduling the ttl jobs.
+	TiDBTTLJobScheduleWindowStartTime = "tidb_ttl_job_schedule_window_start_time"
+	// TiDBTTLJobScheduleWindowEndTime is used to restrict the end time of the time window of scheduling the ttl jobs.
+	TiDBTTLJobScheduleWindowEndTime = "tidb_ttl_job_schedule_window_end_time"
+	// TiDBTTLScanWorkerCount indicates the count of the scan workers in each TiDB node
+	TiDBTTLScanWorkerCount = "tidb_ttl_scan_worker_count"
+	// TiDBTTLDeleteWorkerCount indicates the count of the delete workers in each TiDB node
+	TiDBTTLDeleteWorkerCount = "tidb_ttl_delete_worker_count"
 	// PasswordReuseHistory limit a few passwords to reuse.
 	PasswordReuseHistory = "password_history"
 	// PasswordReuseTime limit how long passwords can be reused.
@@ -1134,6 +1146,11 @@ const (
 	DefPasswordReuseHistory                          = 0
 	DefPasswordReuseTime                             = 0
 	DefTiDBStoreBatchSize                            = 0
+	DefTiDBTTLJobRunInterval                         = "1h0m0s"
+	DefTiDBTTLJobScheduleWindowStartTime             = "00:00 +0000"
+	DefTiDBTTLJobScheduleWindowEndTime               = "23:59 +0000"
+	DefTiDBTTLScanWorkerCount                        = 4
+	DefTiDBTTLDeleteWorkerCount                      = 4
 )
 
 // Process global variables.
@@ -1200,6 +1217,11 @@ var (
 	TTLScanBatchSize                   = atomic.NewInt64(DefTiDBTTLScanBatchSize)
 	TTLDeleteBatchSize                 = atomic.NewInt64(DefTiDBTTLDeleteBatchSize)
 	TTLDeleteRateLimit                 = atomic.NewInt64(DefTiDBTTLDeleteRateLimit)
+	TTLJobRunInterval                  = atomic.NewDuration(mustParseDuration(DefTiDBTTLJobRunInterval))
+	TTLJobScheduleWindowStartTime      = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowStartTime))
+	TTLJobScheduleWindowEndTime        = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowEndTime))
+	TTLScanWorkerCount                 = atomic.NewInt32(DefTiDBTTLScanWorkerCount)
+	TTLDeleteWorkerCount               = atomic.NewInt32(DefTiDBTTLDeleteWorkerCount)
 	PasswordHistory                    = atomic.NewInt64(DefPasswordReuseHistory)
 	PasswordReuseInterval              = atomic.NewInt64(DefPasswordReuseTime)
 	IsSandBoxModeEnabled               = atomic.NewBool(false)
@@ -1235,3 +1257,21 @@ func serverMemoryLimitDefaultValue() string {
 	}
 	return "0"
 }
+
+func mustParseDuration(str string) time.Duration {
+	duration, err := time.ParseDuration(str)
+	if err != nil {
+		panic(fmt.Sprintf("%s is not a duration", str))
+	}
+
+	return duration
+}
+
+func mustParseTime(layout string, str string) time.Time {
+	time, err := time.ParseInLocation(layout, str, time.UTC)
+	if err != nil {
+		panic(fmt.Sprintf("%s is not in %s duration format", str, layout))
+	}
+
+	return time
+}
diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel
index f051716ced40b..8dd9724472dba 100644
--- a/ttl/cache/BUILD.bazel
+++ b/ttl/cache/BUILD.bazel
@@ -11,6 +11,7 @@ go_library(
     importpath = "github.com/pingcap/tidb/ttl/cache",
     visibility = ["//visibility:public"],
     deps = [
+        "//infoschema",
         "//kv",
         "//parser/ast",
         "//parser/model",
@@ -46,7 +47,6 @@ go_test(
     deps = [
         "//infoschema",
         "//kv",
-        "//parser",
         "//parser/model",
         "//server",
         "//store/helper",
diff --git a/ttl/cache/infoschema.go b/ttl/cache/infoschema.go
index 5fdf8d2081dc6..87535a4e951b3 100644
--- a/ttl/cache/infoschema.go
+++ b/ttl/cache/infoschema.go
@@ -17,6 +17,7 @@ package cache
 import (
 	"time"
 
+	"github.com/pingcap/tidb/infoschema"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/ttl/session"
 	"github.com/pingcap/tidb/util/logutil"
@@ -40,7 +41,7 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache {
 
 // Update updates the info schema cache
 func (isc *InfoSchemaCache) Update(se session.Session) error {
-	is := se.SessionInfoSchema()
+	is := se.GetDomainInfoSchema().(infoschema.InfoSchema)
 
 	if isc.schemaVer == is.SchemaMetaVersion() {
 		return nil
diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go
index 7e811050b4601..4cec3db563fa7 100644
--- a/ttl/cache/infoschema_test.go
+++ b/ttl/cache/infoschema_test.go
@@ -18,7 +18,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/pingcap/tidb/parser"
 	"github.com/pingcap/tidb/server"
 	"github.com/pingcap/tidb/testkit"
 	"github.com/pingcap/tidb/ttl/cache"
@@ -27,8 +26,6 @@ import (
 )
 
 func TestInfoSchemaCache(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store, dom := testkit.CreateMockStoreAndDomain(t)
 	sv := server.CreateMockServer(t, store)
 	sv.SetDomain(dom)
diff --git a/ttl/cache/split_test.go b/ttl/cache/split_test.go
index 2e8e37655a31c..1d2279eb8d0f9 100644
--- a/ttl/cache/split_test.go
+++ b/ttl/cache/split_test.go
@@ -24,7 +24,6 @@ import (
 	"github.com/pingcap/kvproto/pkg/metapb"
 	"github.com/pingcap/tidb/infoschema"
 	"github.com/pingcap/tidb/kv"
-	"github.com/pingcap/tidb/parser"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/store/helper"
 	"github.com/pingcap/tidb/tablecodec"
@@ -268,11 +267,6 @@ func checkRange(t *testing.T, r cache.ScanRange, start, end types.Datum) {
 }
 
 func TestSplitTTLScanRangesWithSignedInt(t *testing.T) {
-	parser.TTLFeatureGate = true
-	defer func() {
-		parser.TTLFeatureGate = false
-	}()
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 
@@ -336,11 +330,6 @@ func TestSplitTTLScanRangesWithSignedInt(t *testing.T) {
 }
 
 func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) {
-	parser.TTLFeatureGate = true
-	defer func() {
-		parser.TTLFeatureGate = false
-	}()
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 
@@ -406,11 +395,6 @@ func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) {
 }
 
 func TestSplitTTLScanRangesWithBytes(t *testing.T) {
-	parser.TTLFeatureGate = true
-	defer func() {
-		parser.TTLFeatureGate = false
-	}()
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 
@@ -461,11 +445,6 @@ func TestSplitTTLScanRangesWithBytes(t *testing.T) {
 }
 
 func TestNoTTLSplitSupportTables(t *testing.T) {
-	parser.TTLFeatureGate = true
-	defer func() {
-		parser.TTLFeatureGate = false
-	}()
-
 	store := testkit.CreateMockStore(t)
 	tk := testkit.NewTestKit(t, store)
 
diff --git a/ttl/cache/table_test.go b/ttl/cache/table_test.go
index f79d4c3bf4256..ca280d9b36251 100644
--- a/ttl/cache/table_test.go
+++ b/ttl/cache/table_test.go
@@ -20,7 +20,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/pingcap/tidb/parser"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/testkit"
 	"github.com/pingcap/tidb/ttl/cache"
@@ -29,8 +28,6 @@ import (
 )
 
 func TestNewTTLTable(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	cases := []struct {
 		db      string
 		tbl     string
@@ -166,8 +163,6 @@ func TestNewTTLTable(t *testing.T) {
 }
 
 func TestEvalTTLExpireTime(t *testing.T) {
-	parser.TTLFeatureGate = true
-
 	store, do := testkit.CreateMockStoreAndDomain(t)
 	tk := testkit.NewTestKit(t, store)
 	tk.MustExec("create table test.t(a int, t datetime) ttl = `t` + interval 1 day")
diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go
index cb1b8ef5942fe..1657105e6c3e7 100644
--- a/ttl/cache/ttlstatus.go
+++ b/ttl/cache/ttlstatus.go
@@ -40,7 +40,7 @@ const (
 	JobStatusTimeout = "timeout"
 )
 
-const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
+const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
 
 // SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id
 func SelectFromTTLTableStatusWithID(tableID int64) string {
diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel
index fa8cf07751d02..8b7c39270807e 100644
--- a/ttl/ttlworker/BUILD.bazel
+++ b/ttl/ttlworker/BUILD.bazel
@@ -31,7 +31,6 @@ go_library(
         "@com_github_ngaut_pools//:pools",
         "@com_github_pingcap_errors//:errors",
         "@org_golang_x_time//rate",
-        "@org_uber_go_atomic//:atomic",
         "@org_uber_go_multierr//:multierr",
         "@org_uber_go_zap//:zap",
     ],
diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go
index e7a8e344c3e16..a92f362241fcf 100644
--- a/ttl/ttlworker/config.go
+++ b/ttl/ttlworker/config.go
@@ -16,35 +16,14 @@ package ttlworker
 
 import (
 	"time"
-
-	"go.uber.org/atomic"
 )
 
-// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL
-// some of them are only used in test
-
 const jobManagerLoopTickerInterval = 10 * time.Second
 
-const updateInfoSchemaCacheInterval = time.Minute
-const updateTTLTableStatusCacheInterval = 10 * time.Minute
+const updateInfoSchemaCacheInterval = 2 * time.Minute
+const updateTTLTableStatusCacheInterval = 2 * time.Minute
 
 const ttlInternalSQLTimeout = 30 * time.Second
-const ttlJobTimeout = 6 * time.Hour
-
-// TODO: add this variable to the sysvar
-const ttlJobInterval = time.Hour
-
-// TODO: add these variables to the sysvar
-var ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2006-01-02 00:00:00")
-var ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2006-01-02 23:59:00")
-
-// TODO: migrate these two count to sysvar
-
-// ScanWorkersCount defines the count of scan worker
-var ScanWorkersCount = atomic.NewUint64(0)
-
-// DeleteWorkerCount defines the count of delete worker
-var DeleteWorkerCount = atomic.NewUint64(0)
-
 const resizeWorkersInterval = 30 * time.Second
 const splitScanCount = 64
+const ttlJobTimeout = 6 * time.Hour
diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go
index 0d9d65bf54fe9..5551984cceb08 100644
--- a/ttl/ttlworker/job.go
+++ b/ttl/ttlworker/job.go
@@ -83,7 +83,11 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status
 }
 
 func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
-	_, err := se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, job.statistics.String(), job.ownerID))
+	jsonStatistics, err := job.statistics.MarshalJSON()
+	if err != nil {
+		return err
+	}
+	_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, string(jsonStatistics), job.ownerID))
 	if err != nil {
 		return errors.Trace(err)
 	}
diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go
index f71f79bd2e78c..dda3ffd8f69f5 100644
--- a/ttl/ttlworker/job_manager.go
+++ b/ttl/ttlworker/job_manager.go
@@ -21,6 +21,7 @@ import (
 
 	"github.com/pingcap/errors"
 	"github.com/pingcap/tidb/kv"
+	"github.com/pingcap/tidb/sessionctx/variable"
 	"github.com/pingcap/tidb/ttl/cache"
 	"github.com/pingcap/tidb/ttl/metrics"
 	"github.com/pingcap/tidb/ttl/session"
@@ -154,11 +155,11 @@ func (m *JobManager) jobLoop() error {
 			m.checkFinishedJob(se, now)
 			m.checkNotOwnJob()
 		case <-resizeWorkersTicker:
-			err := m.resizeScanWorkers(int(ScanWorkersCount.Load()))
+			err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load()))
 			if err != nil {
 				logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err))
 			}
-			err = m.resizeDelWorkers(int(DeleteWorkerCount.Load()))
+			err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load()))
 			if err != nil {
 				logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err))
 			}
@@ -331,7 +332,7 @@ func (m *JobManager) checkFinishedJob(se session.Session, now time.Time) {
 }
 
 func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
-	if !timeutil.WithinDayTimePeriod(ttlJobScheduleWindowStartTime, ttlJobScheduleWindowEndTime, now) {
+	if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) {
 		// Local jobs will also not run, but as the server is still sending heartbeat,
 		// and keep the job in memory, it could start the left task in the next window.
 		return
@@ -438,7 +439,18 @@ func (m *JobManager) localJobs() []*ttlJob {
 // readyForNewJobTables returns all tables which should spawn a TTL job according to cache
 func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable {
 	tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables))
+
+tblLoop:
 	for _, table := range m.infoSchemaCache.Tables {
+		// If this node already has a job for this table, just ignore.
+		// Actually, the logic should ensure this condition never meet, we still add the check here to keep safety
+		// (especially when the content of the status table is incorrect)
+		for _, job := range m.runningJobs {
+			if job.tbl.ID == table.ID {
+				continue tblLoop
+			}
+		}
+
 		status := m.tableStatusCache.Tables[table.ID]
 		ok := m.couldTrySchedule(status, now)
 		if ok {
@@ -473,7 +485,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
 
 	finishTime := table.LastJobFinishTime
 
-	return finishTime.Add(ttlJobInterval).Before(now)
+	return finishTime.Add(variable.TTLJobRunInterval.Load()).Before(now)
 }
 
 // occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go
index f1566b2bc02bf..87bc19ca08261 100644
--- a/ttl/ttlworker/job_manager_test.go
+++ b/ttl/ttlworker/job_manager_test.go
@@ -22,6 +22,7 @@ import (
 	"github.com/pingcap/errors"
 	"github.com/pingcap/tidb/parser/model"
 	"github.com/pingcap/tidb/parser/mysql"
+	"github.com/pingcap/tidb/sessionctx/variable"
 	"github.com/pingcap/tidb/ttl/cache"
 	"github.com/pingcap/tidb/types"
 	"github.com/pingcap/tidb/util/chunk"
@@ -132,7 +133,7 @@ func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row {
 	return rows
 }
 
-var updateStatusSQL = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
+var updateStatusSQL = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
 
 func (m *JobManager) SetScanWorkers4Test(workers []worker) {
 	m.scanWorkers = workers
@@ -444,16 +445,18 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) {
 		},
 	}
 	m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)}
-	savedttlJobScheduleWindowStartTime := ttlJobScheduleWindowStartTime
-	savedttlJobScheduleWindowEndTime := ttlJobScheduleWindowEndTime
-	ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2022-12-06 12:00:00")
-	ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2022-12-06 12:05:00")
+	savedttlJobScheduleWindowStartTime := variable.TTLJobScheduleWindowStartTime.Load()
+	savedttlJobScheduleWindowEndTime := variable.TTLJobScheduleWindowEndTime.Load()
+	ttlJobScheduleWindowStartTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:00 +0000", time.UTC)
+	variable.TTLJobScheduleWindowStartTime.Store(ttlJobScheduleWindowStartTime)
+	ttlJobScheduleWindowEndTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:05 +0000", time.UTC)
+	variable.TTLJobScheduleWindowEndTime.Store(ttlJobScheduleWindowEndTime)
 	defer func() {
-		ttlJobScheduleWindowStartTime = savedttlJobScheduleWindowStartTime
-		ttlJobScheduleWindowEndTime = savedttlJobScheduleWindowEndTime
+		variable.TTLJobScheduleWindowStartTime.Store(savedttlJobScheduleWindowStartTime)
+		variable.TTLJobScheduleWindowEndTime.Store(savedttlJobScheduleWindowEndTime)
 	}()
 
-	now, _ := time.Parse(timeFormat, "2022-12-06 12:06:00")
+	now, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:06 +0000", time.UTC)
 	m.rescheduleJobs(se, now)
 	scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil)
 	scanWorker1.checkPollResult(false, "")
@@ -461,7 +464,7 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) {
 	scanWorker2.checkPollResult(false, "")
 
 	// jobs will be scheduled within the time window
-	now, _ = time.Parse(timeFormat, "2022-12-06 12:02:00")
+	now, _ = time.ParseInLocation(variable.FullDayTimeFormat, "12:02 +0000", time.UTC)
 	m.rescheduleJobs(se, now)
 	scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0])
 	scanWorker1.checkPollResult(false, "")
diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go
index 38a4fd544535d..242c51fb8b686 100644
--- a/ttl/ttlworker/scan.go
+++ b/ttl/ttlworker/scan.go
@@ -16,6 +16,7 @@ package ttlworker
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"strconv"
 	"sync/atomic"
@@ -71,6 +72,20 @@ func (s *ttlStatistics) String() string {
 	return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load())
 }
 
+func (s *ttlStatistics) MarshalJSON() ([]byte, error) {
+	type jsonStatistics struct {
+		TotalRows   uint64 `json:"total_rows"`
+		SuccessRows uint64 `json:"success_rows"`
+		ErrorRows   uint64 `json:"error_rows"`
+	}
+
+	return json.Marshal(jsonStatistics{
+		TotalRows:   s.TotalRows.Load(),
+		SuccessRows: s.SuccessRows.Load(),
+		ErrorRows:   s.ErrorRows.Load(),
+	})
+}
+
 type ttlScanTask struct {
 	ctx context.Context
 
diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go
index 66582084b18f3..34a25a2539612 100644
--- a/ttl/ttlworker/scan_test.go
+++ b/ttl/ttlworker/scan_test.go
@@ -403,3 +403,13 @@ func TestScanTaskDoScan(t *testing.T) {
 	task.schemaChangeInRetry = 2
 	task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist")
 }
+
+func TestTTLStatisticsMarshalJSON(t *testing.T) {
+	statistics := &ttlStatistics{}
+	statistics.TotalRows.Store(1)
+	statistics.ErrorRows.Store(255)
+	statistics.SuccessRows.Store(128)
+	j, err := statistics.MarshalJSON()
+	require.NoError(t, err)
+	require.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255}`, string(j))
+}
diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go
index 8cceaed7ac72b..877fd7996eaa7 100644
--- a/ttl/ttlworker/session_test.go
+++ b/ttl/ttlworker/session_test.go
@@ -154,6 +154,10 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
 	}
 }
 
+func (s *mockSession) GetDomainInfoSchema() sessionctx.InfoschemaMetaVersion {
+	return s.sessionInfoSchema
+}
+
 func (s *mockSession) SessionInfoSchema() infoschema.InfoSchema {
 	require.False(s.t, s.closed)
 	return s.sessionInfoSchema