From 02eccb863ab3df55be2ee8588ce7a4f7fb96f4a8 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Wed, 27 Oct 2021 16:27:23 +0800
Subject: [PATCH 01/13] ddl_fc_stale_read

---
 ddl/db_test.go            |  12 +++++
 ddl/ddl_api.go            |   3 ++
 ddl/table.go              |   4 +-
 executor/builder.go       |  27 ++++++----
 executor/executor_test.go | 107 ++++++++++++++++++++++++++++++++++++++
 executor/point_get.go     |   2 +-
 6 files changed, 143 insertions(+), 12 deletions(-)

diff --git a/ddl/db_test.go b/ddl/db_test.go
index 6defdb38266e5..514be7ae43fbc 100644
--- a/ddl/db_test.go
+++ b/ddl/db_test.go
@@ -5917,6 +5917,18 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) {
 	// Multiple alter cache is okay
 	tk.MustExec("alter table t cache")
 	tk.MustExec("alter table t cache")
+	// Test a temporary table
+	tk.MustExec("set @@tidb_enable_global_temporary_table=1")
+	tk.MustExec("drop table if exists t")
+	tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)")
+	tk.MustExec("drop table if exists tmp1")
+	// local temporary table alter is not supported
+	tk.MustGetErrCode("alter table t cache", errno.ErrUnsupportedDDLOperation)
+	// test global temporary table
+	tk.MustExec("create global temporary table tmp1 " +
+		"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
+		"on commit delete rows")
+	tk.MustGetErrMsg("alter table tmp1 cache", ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache").Error())
 
 }
 
diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go
index 2cb7ebc63856f..0be9d949a9827 100644
--- a/ddl/ddl_api.go
+++ b/ddl/ddl_api.go
@@ -6633,6 +6633,9 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
 	if t.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
 		return nil
 	}
+	if t.Meta().TempTableType != model.TempTableNone {
+		return ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache")
+	}
 	job := &model.Job{
 		SchemaID:   schema.ID,
 		SchemaName: schema.Name.L,
diff --git a/ddl/table.go b/ddl/table.go
index 9ae952fac6f2a..5f7d55a7ad005 100644
--- a/ddl/table.go
+++ b/ddl/table.go
@@ -1502,7 +1502,9 @@ func onAlterCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) {
 		job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
 		return ver, nil
 	}
-
+	if tbInfo.TempTableType != model.TempTableNone {
+		return ver, errors.Trace(ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache"))
+	}
 	switch tbInfo.TableCacheStatusType {
 	case model.TableCacheStatusDisable:
 		// disable -> switching
diff --git a/executor/builder.go b/executor/builder.go
index 491f438067d6a..452bab5831fa0 100644
--- a/executor/builder.go
+++ b/executor/builder.go
@@ -2813,7 +2813,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
 		return nil, err
 	}
 	ts := v.GetTableScan()
-	if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
+	if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
 		return nil, err
 	}
 
@@ -2929,7 +2929,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
 	}
 
 	ts := v.GetTableScan()
-	if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
+	if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
 		b.err = err
 		return nil
 	}
@@ -3152,7 +3152,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
 
 func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor {
 	is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
-	if err := b.validCanReadTemporaryTable(is.Table); err != nil {
+	if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
 		b.err = err
 		return nil
 	}
@@ -3311,7 +3311,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
 
 func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
 	is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
-	if err := b.validCanReadTemporaryTable(is.Table); err != nil {
+	if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
 		b.err = err
 		return nil
 	}
@@ -3426,7 +3426,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
 
 func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
 	ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
-	if err := b.validCanReadTemporaryTable(ts.Table); err != nil {
+	if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
 		b.err = err
 		return nil
 	}
@@ -4231,7 +4231,7 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
 }
 
 func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
-	if err := b.validCanReadTemporaryTable(plan.TblInfo); err != nil {
+	if err := b.validCanReadTemporaryOrCacheTable(plan.TblInfo); err != nil {
 		b.err = err
 		return nil
 	}
@@ -4517,16 +4517,23 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E
 	}
 }
 
-func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error {
-	if tbl.TempTableType == model.TempTableNone {
+func (b *executorBuilder) validCanReadTemporaryOrCacheTable(tbl *model.TableInfo) error {
+	if tbl.TempTableType == model.TempTableNone && tbl.TableCacheStatusType == model.TableCacheStatusDisable {
 		return nil
 	}
-
 	// Some tools like dumpling use history read to dump all table's records and will be fail if we return an error.
 	// So we do not check SnapshotTS here
 
 	sessionVars := b.ctx.GetSessionVars()
-
+	// Temporary table can't switch into cache table. so the following code will not cause confusion
+	if tbl.TableCacheStatusType != model.TableCacheStatusDisable {
+		if sessionVars.SnapshotTS != 0 {
+			return errors.New("can not read cache table when 'tidb_snapshot' is set")
+		}
+		if sessionVars.TxnCtx.IsStaleness || b.isStaleness {
+			return errors.New("can not stale read cache table")
+		}
+	}
 	if tbl.TempTableType == model.TempTableLocal && sessionVars.SnapshotTS != 0 {
 		return errors.New("can not read local temporary table when 'tidb_snapshot' is set")
 	}
diff --git a/executor/executor_test.go b/executor/executor_test.go
index 5a254d96713aa..09688c781e27f 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -9153,3 +9153,110 @@ func (s *testSuiteP1) TestIssue28935(c *C) {
 	tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows("<nil> <nil> <nil>"))
 	tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("<nil>"))
 }
+func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
+	tk := testkit.NewTestKit(c, s.store)
+	// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
+	safePointName := "tikv_gc_safe_point"
+	safePointValue := "20160102-15:04:05 -0700"
+	safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
+	updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
+	ON DUPLICATE KEY
+	UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
+	tk.MustExec(updateSafePoint)
+	tk.MustExec("use test")
+	tk.MustExec("drop table if exists tmp1")
+	tk.MustExec("create table tmp1 " +
+		"(id int not null primary key, code int not null, value int default null, unique key code(code))")
+	tk.MustExec("alter table tmp1 cache")
+	tk.MustExec("use test")
+	tk.MustExec("drop table if exists tmp2")
+	tk.MustExec("create table tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("alter table tmp2 cache")
+	tk.MustExec("create table tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table tmp5(id int);")
+	tk.MustExec("alter table tmp5 cache;")
+	tk.MustExec("create table tmp6 (id int primary key);")
+	// sleep 1us to make test stale
+	time.Sleep(time.Microsecond)
+
+	queries := []struct {
+		sql string
+	}{
+		{
+			sql: "select * from tmp1 where id=1",
+		},
+		{
+			sql: "select * from tmp1 where code=1",
+		},
+		{
+			sql: "select * from tmp1 where id in (1, 2, 3)",
+		},
+		{
+			sql: "select * from tmp1 where code in (1, 2, 3)",
+		},
+		{
+			sql: "select * from tmp1 where id > 1",
+		},
+		{
+			sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
+		},
+		{
+			sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
+		},
+		{
+			sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
+		},
+	}
+
+	addStaleReadToSQL := func(sql string) string {
+		idx := strings.Index(sql, " where ")
+		if idx < 0 {
+			return ""
+		}
+		return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:]
+	}
+	for _, query := range queries {
+		sql := addStaleReadToSQL(query.sql)
+		if sql != "" {
+			tk.MustGetErrMsg(sql, "can not stale read cache table")
+		}
+	}
+
+	tk.MustExec("start transaction read only as of timestamp NOW(6)")
+	for _, query := range queries {
+		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
+	}
+	tk.MustExec("commit")
+
+	for _, query := range queries {
+		tk.MustExec(query.sql)
+	}
+
+	// Test normal table when cache table exits.
+	tk.MustExec("insert into tmp6 values(1);")
+	tk.MustExec("set @a=now(6);")
+	time.Sleep(time.Microsecond)
+	tk.MustExec("drop table tmp6")
+	tk.MustExec("create table tmp6 (id int primary key);")
+	tk.MustQuery("select * from tmp6 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
+	tk.MustQuery("select * from tmp4 as of timestamp(@a), tmp3 as of timestamp(@a) where tmp3.id=1;")
+	tk.MustGetErrMsg("select * from tmp4 as of timestamp(@a), tmp2 as of timestamp(@a) where tmp2.id=1;", "can not stale read cache table")
+
+	tk.MustExec("set transaction read only as of timestamp NOW(6)")
+	tk.MustExec("start transaction")
+	for _, query := range queries {
+		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
+	}
+	tk.MustExec("commit")
+
+	for _, query := range queries {
+		tk.MustExec(query.sql)
+	}
+
+	tk.MustExec("set @@tidb_snapshot=NOW(6)")
+	for _, query := range queries {
+		// forbidden historical read cache table
+		tk.MustGetErrMsg(query.sql, "can not read cache table when 'tidb_snapshot' is set")
+	}
+}
diff --git a/executor/point_get.go b/executor/point_get.go
index 6b6a5248e19df..0d1de79d0d1cd 100644
--- a/executor/point_get.go
+++ b/executor/point_get.go
@@ -43,7 +43,7 @@ import (
 )
 
 func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
-	if err := b.validCanReadTemporaryTable(p.TblInfo); err != nil {
+	if err := b.validCanReadTemporaryOrCacheTable(p.TblInfo); err != nil {
 		b.err = err
 		return nil
 	}

From 5ffd9a1711bed4fffd9e4a4e1ea8c68b20ced3b4 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Wed, 27 Oct 2021 20:08:54 +0800
Subject: [PATCH 02/13] fix

---
 executor/executor_test.go | 44 +++++++++++++++++----------------------
 1 file changed, 19 insertions(+), 25 deletions(-)

diff --git a/executor/executor_test.go b/executor/executor_test.go
index 09688c781e27f..1eedad21d024a 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -9164,19 +9164,17 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 	UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
 	tk.MustExec(updateSafePoint)
 	tk.MustExec("use test")
-	tk.MustExec("drop table if exists tmp1")
-	tk.MustExec("create table tmp1 " +
+	tk.MustExec("drop table if exists cache_tmp1")
+	tk.MustExec("create table cache_tmp1 " +
 		"(id int not null primary key, code int not null, value int default null, unique key code(code))")
-	tk.MustExec("alter table tmp1 cache")
-	tk.MustExec("use test")
-	tk.MustExec("drop table if exists tmp2")
-	tk.MustExec("create table tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("alter table tmp2 cache")
+	tk.MustExec("alter table cache_tmp1 cache")
+	tk.MustExec("drop table if exists cache_tmp2")
+	tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("alter table cache_tmp2 cache")
+	tk.MustExec("drop table if exists tmp3 , tmp4, tmp5")
 	tk.MustExec("create table tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
 	tk.MustExec("create table tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("create table tmp5(id int);")
-	tk.MustExec("alter table tmp5 cache;")
-	tk.MustExec("create table tmp6 (id int primary key);")
+	tk.MustExec("create table tmp5 (id int primary key);")
 	// sleep 1us to make test stale
 	time.Sleep(time.Microsecond)
 
@@ -9184,28 +9182,25 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 		sql string
 	}{
 		{
-			sql: "select * from tmp1 where id=1",
+			sql: "select * from cache_tmp1 where id=1",
 		},
 		{
-			sql: "select * from tmp1 where code=1",
-		},
-		{
-			sql: "select * from tmp1 where id in (1, 2, 3)",
+			sql: "select * from cache_tmp1 where code=1",
 		},
 		{
-			sql: "select * from tmp1 where code in (1, 2, 3)",
+			sql: "select * from cache_tmp1 where id in (1, 2, 3)",
 		},
 		{
-			sql: "select * from tmp1 where id > 1",
+			sql: "select * from cache_tmp1 where code in (1, 2, 3)",
 		},
 		{
-			sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
+			sql: "select * from cache_tmp1 where id > 1",
 		},
 		{
-			sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
+			sql: "select /*+use_index(tmp1, code)*/ * from cache_tmp1 where code > 1",
 		},
 		{
-			sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
+			sql: "select /*+use_index(tmp1, code)*/ code from cache_tmp1 where code > 1",
 		},
 	}
 
@@ -9234,15 +9229,14 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 	}
 
 	// Test normal table when cache table exits.
-	tk.MustExec("insert into tmp6 values(1);")
+	tk.MustExec("insert into tmp5 values(1);")
 	tk.MustExec("set @a=now(6);")
 	time.Sleep(time.Microsecond)
-	tk.MustExec("drop table tmp6")
+	tk.MustExec("drop table tmp5")
 	tk.MustExec("create table tmp6 (id int primary key);")
-	tk.MustQuery("select * from tmp6 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
+	tk.MustQuery("select * from tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
 	tk.MustQuery("select * from tmp4 as of timestamp(@a), tmp3 as of timestamp(@a) where tmp3.id=1;")
-	tk.MustGetErrMsg("select * from tmp4 as of timestamp(@a), tmp2 as of timestamp(@a) where tmp2.id=1;", "can not stale read cache table")
-
+	tk.MustGetErrMsg("select * from tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
 	tk.MustExec("set transaction read only as of timestamp NOW(6)")
 	tk.MustExec("start transaction")
 	for _, query := range queries {

From 3aab60ab5961b3d65ffb129002cfc35a41c8e38f Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Wed, 27 Oct 2021 20:26:20 +0800
Subject: [PATCH 03/13] fix

---
 executor/executor_test.go | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/executor/executor_test.go b/executor/executor_test.go
index 1eedad21d024a..d0b11fb300b51 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -9171,10 +9171,10 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 	tk.MustExec("drop table if exists cache_tmp2")
 	tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
 	tk.MustExec("alter table cache_tmp2 cache")
-	tk.MustExec("drop table if exists tmp3 , tmp4, tmp5")
-	tk.MustExec("create table tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("create table tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("create table tmp5 (id int primary key);")
+	tk.MustExec("drop table if exists cache_tmp3 , cache_tmp4, cache_tmp5")
+	tk.MustExec("create table cache_tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table cache_tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table cache_tmp5 (id int primary key);")
 	// sleep 1us to make test stale
 	time.Sleep(time.Microsecond)
 
@@ -9197,10 +9197,10 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 			sql: "select * from cache_tmp1 where id > 1",
 		},
 		{
-			sql: "select /*+use_index(tmp1, code)*/ * from cache_tmp1 where code > 1",
+			sql: "select /*+use_index(cache_tmp1, code)*/ * from cache_tmp1 where code > 1",
 		},
 		{
-			sql: "select /*+use_index(tmp1, code)*/ code from cache_tmp1 where code > 1",
+			sql: "select /*+use_index(cache_tmp1, code)*/ code from cache_tmp1 where code > 1",
 		},
 	}
 
@@ -9229,14 +9229,14 @@ func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 	}
 
 	// Test normal table when cache table exits.
-	tk.MustExec("insert into tmp5 values(1);")
+	tk.MustExec("insert into cache_tmp5 values(1);")
 	tk.MustExec("set @a=now(6);")
 	time.Sleep(time.Microsecond)
-	tk.MustExec("drop table tmp5")
-	tk.MustExec("create table tmp6 (id int primary key);")
-	tk.MustQuery("select * from tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
-	tk.MustQuery("select * from tmp4 as of timestamp(@a), tmp3 as of timestamp(@a) where tmp3.id=1;")
-	tk.MustGetErrMsg("select * from tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
+	tk.MustExec("drop table cache_tmp5")
+	tk.MustExec("create table cache_tmp5 (id int primary key);")
+	tk.MustQuery("select * from cache_tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
+	tk.MustQuery("select * from cache_tmp4 as of timestamp(@a), cache_tmp3 as of timestamp(@a) where cache_tmp3.id=1;")
+	tk.MustGetErrMsg("select * from cache_tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
 	tk.MustExec("set transaction read only as of timestamp NOW(6)")
 	tk.MustExec("start transaction")
 	for _, query := range queries {

From 0d402c63a39daceec68a93937ebf761bbce14957 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Wed, 27 Oct 2021 21:12:33 +0800
Subject: [PATCH 04/13] fix

---
 executor/admin_test.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/executor/admin_test.go b/executor/admin_test.go
index 8d2f9c89b6e85..0fcbcab0dbfc0 100644
--- a/executor/admin_test.go
+++ b/executor/admin_test.go
@@ -137,6 +137,7 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
 	tk.MustExec("admin check table cache_admin_test;")
 	tk.MustExec("admin check index cache_admin_test c1;")
 	tk.MustExec("admin check index cache_admin_test c2;")
+	tk.MustExec("drop table if exists cache_admin_test;")
 	tk.MustExec(`drop table if exists check_index_test;`)
 	tk.MustExec(`create table check_index_test (a int, b varchar(10), index a_b (a, b), index b (b))`)
 	tk.MustExec(`insert check_index_test values (3, "ab"),(2, "cd"),(1, "ef"),(-1, "hi")`)

From 666cb49c3677b2266cf7c1800c50830691f10d7b Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Thu, 28 Oct 2021 10:28:42 +0800
Subject: [PATCH 05/13] fix_bug

---
 executor/admin_test.go    |   4 +-
 executor/builder.go       |   5 +-
 executor/executor_test.go | 208 ++++++++++++++++++++------------------
 3 files changed, 111 insertions(+), 106 deletions(-)

diff --git a/executor/admin_test.go b/executor/admin_test.go
index 0fcbcab0dbfc0..481be377d0f50 100644
--- a/executor/admin_test.go
+++ b/executor/admin_test.go
@@ -138,6 +138,7 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
 	tk.MustExec("admin check index cache_admin_test c1;")
 	tk.MustExec("admin check index cache_admin_test c2;")
 	tk.MustExec("drop table if exists cache_admin_test;")
+
 	tk.MustExec(`drop table if exists check_index_test;`)
 	tk.MustExec(`create table check_index_test (a int, b varchar(10), index a_b (a, b), index b (b))`)
 	tk.MustExec(`insert check_index_test values (3, "ab"),(2, "cd"),(1, "ef"),(-1, "hi")`)
@@ -146,7 +147,8 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
 	result.Check(testkit.Rows("1 ef 3", "2 cd 2"))
 	result = tk.MustQuery("admin check index check_index_test a_b (3, 5);")
 	result.Check(testkit.Rows("-1 hi 4", "1 ef 3"))
-	tk.MustExec("drop table if exists cache_admin_test;")
+	tk.MustExec("drop table if exists check_index_test;")
+
 	tk.MustExec("drop table if exists cache_admin_table_with_index_test;")
 	tk.MustExec("drop table if exists cache_admin_table_without_index_test;")
 	tk.MustExec("create table cache_admin_table_with_index_test (id int, count int, PRIMARY KEY(id), KEY(count))")
diff --git a/executor/builder.go b/executor/builder.go
index 452bab5831fa0..8b514cf2dddff 100644
--- a/executor/builder.go
+++ b/executor/builder.go
@@ -4527,11 +4527,8 @@ func (b *executorBuilder) validCanReadTemporaryOrCacheTable(tbl *model.TableInfo
 	sessionVars := b.ctx.GetSessionVars()
 	// Temporary table can't switch into cache table. so the following code will not cause confusion
 	if tbl.TableCacheStatusType != model.TableCacheStatusDisable {
-		if sessionVars.SnapshotTS != 0 {
-			return errors.New("can not read cache table when 'tidb_snapshot' is set")
-		}
 		if sessionVars.TxnCtx.IsStaleness || b.isStaleness {
-			return errors.New("can not stale read cache table")
+			return errors.Trace(errors.New("can not stale read cache table"))
 		}
 	}
 	if tbl.TempTableType == model.TempTableLocal && sessionVars.SnapshotTS != 0 {
diff --git a/executor/executor_test.go b/executor/executor_test.go
index d0b11fb300b51..d2bff8fef7863 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -240,6 +240,7 @@ func (s *testSuiteP1) TestPessimisticSelectForUpdate(c *C) {
 	tk.MustQuery("select a from t where id=1").Check(testkit.Rows("2"))
 }
 
+
 func (s *testSuite) TearDownTest(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
 	tk.MustExec("use test")
@@ -997,6 +998,7 @@ func (s *testSuiteP1) TestSelectOrderBy(c *C) {
 	r = tk.MustQuery("select id from select_order_test order by id desc limit 1 ")
 	r.Check(testkit.Rows("2"))
 
+
 	r = tk.MustQuery("select id from select_order_test order by id + 1 desc limit 1 ")
 	r.Check(testkit.Rows("2"))
 
@@ -1124,6 +1126,7 @@ func (s *testSuiteP1) TestSelectErrorRow(c *C) {
 	err = tk.ExecToErr("select (select 1, 1) from test;")
 	c.Assert(err, NotNil)
 
+
 	err = tk.ExecToErr("select * from test group by (select 1, 1);")
 	c.Assert(err, NotNil)
 
@@ -1388,6 +1391,7 @@ func (s *testSuiteP2) TestUnion(c *C) {
 	r = tk.MustQuery("SELECT 'a' UNION SELECT CONCAT('a', -4)")
 	r.Sort().Check(testkit.Rows("a", "a-4"))
 
+
 	// test race
 	tk.MustQuery("SELECT @x:=0 UNION ALL SELECT @x:=0 UNION ALL SELECT @x")
 
@@ -1524,6 +1528,7 @@ func (s *testSuiteP2) TestUnion(c *C) {
 	tk.MustExec("create table t(a bit(20), b float, c double, d int)")
 	tk.MustExec("insert into t values(10, 10, 10, 10), (1, -1, 2, -2), (2, -2, 1, 1), (2, 1.1, 2.1, 10.1)")
 	tk.MustQuery("select a from t union select 10 order by a").Check(testkit.Rows("1", "2", "10"))
+
 }
 
 func (s *testSuite2) TestUnionLimit(c *C) {
@@ -8972,6 +8977,108 @@ func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) {
 		tk.MustQuery(query.sql).Check(testkit.Rows())
 	}
 }
+func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
+	tk := testkit.NewTestKit(c, s.store)
+	// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
+	safePointName := "tikv_gc_safe_point"
+	safePointValue := "20160102-15:04:05 -0700"
+	safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
+	updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
+	ON DUPLICATE KEY
+	UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
+	tk.MustExec(updateSafePoint)
+	tk.MustExec("use test")
+	tk.MustExec("drop table if exists cache_tmp1")
+	tk.MustExec("create table cache_tmp1 " +
+		"(id int not null primary key, code int not null, value int default null, unique key code(code))")
+	tk.MustExec("alter table cache_tmp1 cache")
+	tk.MustExec("drop table if exists cache_tmp2")
+	tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("alter table cache_tmp2 cache")
+	tk.MustExec("drop table if exists cache_tmp3 , cache_tmp4, cache_tmp5")
+	tk.MustExec("create table cache_tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table cache_tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
+	tk.MustExec("create table cache_tmp5 (id int primary key);")
+	// sleep 1us to make test stale
+	time.Sleep(time.Microsecond)
+
+	queries := []struct {
+		sql string
+	}{
+		{
+			sql: "select * from cache_tmp1 where id=1",
+		},
+		{
+			sql: "select * from cache_tmp1 where code=1",
+		},
+		{
+			sql: "select * from cache_tmp1 where id in (1, 2, 3)",
+		},
+		{
+			sql: "select * from cache_tmp1 where code in (1, 2, 3)",
+		},
+		{
+			sql: "select * from cache_tmp1 where id > 1",
+		},
+		{
+			sql: "select /*+use_index(cache_tmp1, code)*/ * from cache_tmp1 where code > 1",
+		},
+		{
+			sql: "select /*+use_index(cache_tmp1, code)*/ code from cache_tmp1 where code > 1",
+		},
+	}
+
+	addStaleReadToSQL := func(sql string) string {
+		idx := strings.Index(sql, " where ")
+		if idx < 0 {
+			return ""
+		}
+		return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:]
+	}
+	for _, query := range queries {
+		sql := addStaleReadToSQL(query.sql)
+		if sql != "" {
+			tk.MustGetErrMsg(sql, "can not stale read cache table")
+		}
+	}
+
+	tk.MustExec("start transaction read only as of timestamp NOW(6)")
+	for _, query := range queries {
+		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
+	}
+	tk.MustExec("commit")
+
+	for _, query := range queries {
+		tk.MustExec(query.sql)
+	}
+
+	// Test normal table when cache table exits.
+	tk.MustExec("insert into cache_tmp5 values(1);")
+	tk.MustExec("set @a=now(6);")
+	time.Sleep(time.Microsecond)
+	tk.MustExec("drop table cache_tmp5")
+	tk.MustExec("create table cache_tmp5 (id int primary key);")
+	tk.MustQuery("select * from cache_tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
+	tk.MustQuery("select * from cache_tmp4 as of timestamp(@a), cache_tmp3 as of timestamp(@a) where cache_tmp3.id=1;")
+	tk.MustGetErrMsg("select * from cache_tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
+	tk.MustExec("set transaction read only as of timestamp NOW(6)")
+	tk.MustExec("start transaction")
+	for _, query := range queries {
+		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
+	}
+	tk.MustExec("commit")
+
+	for _, query := range queries {
+		tk.MustExec(query.sql)
+	}
+
+	tk.MustExec("set @@tidb_snapshot=NOW(6)")
+	for _, query := range queries {
+		// enable historical read cache table
+		tk.MustExec(query.sql)
+
+	}
+}
 
 func (s *testSuite) TestTableSampleTemporaryTable(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
@@ -9153,104 +9260,3 @@ func (s *testSuiteP1) TestIssue28935(c *C) {
 	tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows("<nil> <nil> <nil>"))
 	tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("<nil>"))
 }
-func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
-	tk := testkit.NewTestKit(c, s.store)
-	// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
-	safePointName := "tikv_gc_safe_point"
-	safePointValue := "20160102-15:04:05 -0700"
-	safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
-	updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
-	ON DUPLICATE KEY
-	UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
-	tk.MustExec(updateSafePoint)
-	tk.MustExec("use test")
-	tk.MustExec("drop table if exists cache_tmp1")
-	tk.MustExec("create table cache_tmp1 " +
-		"(id int not null primary key, code int not null, value int default null, unique key code(code))")
-	tk.MustExec("alter table cache_tmp1 cache")
-	tk.MustExec("drop table if exists cache_tmp2")
-	tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("alter table cache_tmp2 cache")
-	tk.MustExec("drop table if exists cache_tmp3 , cache_tmp4, cache_tmp5")
-	tk.MustExec("create table cache_tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("create table cache_tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
-	tk.MustExec("create table cache_tmp5 (id int primary key);")
-	// sleep 1us to make test stale
-	time.Sleep(time.Microsecond)
-
-	queries := []struct {
-		sql string
-	}{
-		{
-			sql: "select * from cache_tmp1 where id=1",
-		},
-		{
-			sql: "select * from cache_tmp1 where code=1",
-		},
-		{
-			sql: "select * from cache_tmp1 where id in (1, 2, 3)",
-		},
-		{
-			sql: "select * from cache_tmp1 where code in (1, 2, 3)",
-		},
-		{
-			sql: "select * from cache_tmp1 where id > 1",
-		},
-		{
-			sql: "select /*+use_index(cache_tmp1, code)*/ * from cache_tmp1 where code > 1",
-		},
-		{
-			sql: "select /*+use_index(cache_tmp1, code)*/ code from cache_tmp1 where code > 1",
-		},
-	}
-
-	addStaleReadToSQL := func(sql string) string {
-		idx := strings.Index(sql, " where ")
-		if idx < 0 {
-			return ""
-		}
-		return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:]
-	}
-	for _, query := range queries {
-		sql := addStaleReadToSQL(query.sql)
-		if sql != "" {
-			tk.MustGetErrMsg(sql, "can not stale read cache table")
-		}
-	}
-
-	tk.MustExec("start transaction read only as of timestamp NOW(6)")
-	for _, query := range queries {
-		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
-	}
-	tk.MustExec("commit")
-
-	for _, query := range queries {
-		tk.MustExec(query.sql)
-	}
-
-	// Test normal table when cache table exits.
-	tk.MustExec("insert into cache_tmp5 values(1);")
-	tk.MustExec("set @a=now(6);")
-	time.Sleep(time.Microsecond)
-	tk.MustExec("drop table cache_tmp5")
-	tk.MustExec("create table cache_tmp5 (id int primary key);")
-	tk.MustQuery("select * from cache_tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
-	tk.MustQuery("select * from cache_tmp4 as of timestamp(@a), cache_tmp3 as of timestamp(@a) where cache_tmp3.id=1;")
-	tk.MustGetErrMsg("select * from cache_tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
-	tk.MustExec("set transaction read only as of timestamp NOW(6)")
-	tk.MustExec("start transaction")
-	for _, query := range queries {
-		tk.MustGetErrMsg(query.sql, "can not stale read cache table")
-	}
-	tk.MustExec("commit")
-
-	for _, query := range queries {
-		tk.MustExec(query.sql)
-	}
-
-	tk.MustExec("set @@tidb_snapshot=NOW(6)")
-	for _, query := range queries {
-		// forbidden historical read cache table
-		tk.MustGetErrMsg(query.sql, "can not read cache table when 'tidb_snapshot' is set")
-	}
-}

From bbd2f0291e711dc28afdb14d511df2b1ddab0d1c Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Thu, 28 Oct 2021 10:33:59 +0800
Subject: [PATCH 06/13] fix_bug

---
 executor/executor_test.go | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/executor/executor_test.go b/executor/executor_test.go
index d2bff8fef7863..e5b9e71bd3fde 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -240,7 +240,6 @@ func (s *testSuiteP1) TestPessimisticSelectForUpdate(c *C) {
 	tk.MustQuery("select a from t where id=1").Check(testkit.Rows("2"))
 }
 
-
 func (s *testSuite) TearDownTest(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
 	tk.MustExec("use test")
@@ -998,7 +997,6 @@ func (s *testSuiteP1) TestSelectOrderBy(c *C) {
 	r = tk.MustQuery("select id from select_order_test order by id desc limit 1 ")
 	r.Check(testkit.Rows("2"))
 
-
 	r = tk.MustQuery("select id from select_order_test order by id + 1 desc limit 1 ")
 	r.Check(testkit.Rows("2"))
 
@@ -1126,7 +1124,6 @@ func (s *testSuiteP1) TestSelectErrorRow(c *C) {
 	err = tk.ExecToErr("select (select 1, 1) from test;")
 	c.Assert(err, NotNil)
 
-
 	err = tk.ExecToErr("select * from test group by (select 1, 1);")
 	c.Assert(err, NotNil)
 
@@ -1391,7 +1388,6 @@ func (s *testSuiteP2) TestUnion(c *C) {
 	r = tk.MustQuery("SELECT 'a' UNION SELECT CONCAT('a', -4)")
 	r.Sort().Check(testkit.Rows("a", "a-4"))
 
-
 	// test race
 	tk.MustQuery("SELECT @x:=0 UNION ALL SELECT @x:=0 UNION ALL SELECT @x")
 

From 5d1fd92771fa03109ddb67a27137ff6675309002 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Thu, 28 Oct 2021 11:27:56 +0800
Subject: [PATCH 07/13] fix_review

---
 ddl/db_test.go            | 1 -
 executor/executor_test.go | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/ddl/db_test.go b/ddl/db_test.go
index 514be7ae43fbc..4b9da4557f08f 100644
--- a/ddl/db_test.go
+++ b/ddl/db_test.go
@@ -5918,7 +5918,6 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) {
 	tk.MustExec("alter table t cache")
 	tk.MustExec("alter table t cache")
 	// Test a temporary table
-	tk.MustExec("set @@tidb_enable_global_temporary_table=1")
 	tk.MustExec("drop table if exists t")
 	tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)")
 	tk.MustExec("drop table if exists tmp1")
diff --git a/executor/executor_test.go b/executor/executor_test.go
index e5b9e71bd3fde..67a98fbbc7442 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -1524,7 +1524,6 @@ func (s *testSuiteP2) TestUnion(c *C) {
 	tk.MustExec("create table t(a bit(20), b float, c double, d int)")
 	tk.MustExec("insert into t values(10, 10, 10, 10), (1, -1, 2, -2), (2, -2, 1, 1), (2, 1.1, 2.1, 10.1)")
 	tk.MustQuery("select a from t union select 10 order by a").Check(testkit.Rows("1", "2", "10"))
-
 }
 
 func (s *testSuite2) TestUnionLimit(c *C) {
@@ -8974,6 +8973,7 @@ func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) {
 	}
 }
 func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
+
 	tk := testkit.NewTestKit(c, s.store)
 	// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
 	safePointName := "tikv_gc_safe_point"

From 591c01b0584780137353decb5033baf0769ea1a3 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Thu, 28 Oct 2021 17:54:22 +0800
Subject: [PATCH 08/13] some_changes_for_review

---
 executor/admin_test.go    |  2 ++
 executor/builder.go       | 34 ++++++++++++++++++++++++++--------
 executor/executor_test.go |  2 +-
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git a/executor/admin_test.go b/executor/admin_test.go
index 481be377d0f50..8707332688ad2 100644
--- a/executor/admin_test.go
+++ b/executor/admin_test.go
@@ -127,6 +127,7 @@ func (s *testSuite5) TestAdminCheckIndexInLocalTemporaryMode(c *C) {
 	c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("admin checksum table").Error())
 	tk.MustExec("drop table if exists local_temporary_admin_checksum_table_with_index_test,local_temporary_admin_checksum_table_without_index_test;")
 }
+
 func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
 	tk.MustExec("use test")
@@ -159,6 +160,7 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
 	tk.MustExec("admin checksum table cache_admin_table_without_index_test;")
 	tk.MustExec("drop table if exists cache_admin_table_with_index_test,cache_admin_table_without_index_test;")
 }
+
 func (s *testSuite5) TestAdminRecoverIndex(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
 	tk.MustExec("use test")
diff --git a/executor/builder.go b/executor/builder.go
index 8b514cf2dddff..6689714175dee 100644
--- a/executor/builder.go
+++ b/executor/builder.go
@@ -4516,21 +4516,39 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E
 		chkIdx:       0,
 	}
 }
-
 func (b *executorBuilder) validCanReadTemporaryOrCacheTable(tbl *model.TableInfo) error {
-	if tbl.TempTableType == model.TempTableNone && tbl.TableCacheStatusType == model.TableCacheStatusDisable {
+	err := b.validCanReadTemporaryTable(tbl)
+	if err != nil {
+		return err
+	}
+	return b.validCanReadCacheTable(tbl)
+}
+
+func (b *executorBuilder) validCanReadCacheTable(tbl *model.TableInfo) error {
+	if tbl.TableCacheStatusType == model.TableCacheStatusDisable {
 		return nil
 	}
-	// Some tools like dumpling use history read to dump all table's records and will be fail if we return an error.
-	// So we do not check SnapshotTS here
 
 	sessionVars := b.ctx.GetSessionVars()
+
 	// Temporary table can't switch into cache table. so the following code will not cause confusion
-	if tbl.TableCacheStatusType != model.TableCacheStatusDisable {
-		if sessionVars.TxnCtx.IsStaleness || b.isStaleness {
-			return errors.Trace(errors.New("can not stale read cache table"))
-		}
+	if sessionVars.TxnCtx.IsStaleness || b.isStaleness {
+		return errors.Trace(errors.New("can not stale read cache table"))
 	}
+
+	return nil
+}
+
+func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error {
+	if tbl.TempTableType == model.TempTableNone {
+		return nil
+	}
+
+	// Some tools like dumpling use history read to dump all table's records and will be fail if we return an error.
+	// So we do not check SnapshotTS here
+
+	sessionVars := b.ctx.GetSessionVars()
+
 	if tbl.TempTableType == model.TempTableLocal && sessionVars.SnapshotTS != 0 {
 		return errors.New("can not read local temporary table when 'tidb_snapshot' is set")
 	}
diff --git a/executor/executor_test.go b/executor/executor_test.go
index 67a98fbbc7442..142c29fc74ce9 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -8972,8 +8972,8 @@ func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) {
 		tk.MustQuery(query.sql).Check(testkit.Rows())
 	}
 }
-func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 
+func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
 	// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
 	safePointName := "tikv_gc_safe_point"

From 76e955b0fd0976d3f61f4bd9c599721bc60a2ad5 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Fri, 29 Oct 2021 21:59:33 +0800
Subject: [PATCH 09/13] forbid_partition_add_view_test

---
 ddl/db_cache_test.go | 110 +++++++++++++++++++++++++++++++++++++++++++
 ddl/db_test.go       |  50 --------------------
 ddl/ddl_api.go       |   7 ++-
 ddl/error.go         |   5 +-
 ddl/table.go         |   6 +++
 5 files changed, 125 insertions(+), 53 deletions(-)
 create mode 100644 ddl/db_cache_test.go

diff --git a/ddl/db_cache_test.go b/ddl/db_cache_test.go
new file mode 100644
index 0000000000000..ebe769840be5d
--- /dev/null
+++ b/ddl/db_cache_test.go
@@ -0,0 +1,110 @@
+// Copyright 2018 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ddl_test
+
+import (
+	. "github.com/pingcap/check"
+	"github.com/pingcap/tidb/ddl"
+	"github.com/pingcap/tidb/domain"
+	"github.com/pingcap/tidb/errno"
+	"github.com/pingcap/tidb/parser/terror"
+	"github.com/pingcap/tidb/util/testkit"
+)
+
+// test alter table cache
+func (s *testDBSuite2) TestAlterTableCache(c *C) {
+	tk := testkit.NewTestKit(c, s.store)
+	tk2 := testkit.NewTestKit(c, s.store)
+	tk.MustExec("use test")
+	tk.MustExec("drop table if exists t1")
+	tk2.MustExec("use test")
+	/* Test of cache table */
+	tk.MustExec("create table t1 ( n int auto_increment primary key)")
+	tk.MustGetErrCode("alter table t1 ca", errno.ErrParse)
+	tk.MustGetErrCode("alter table t2 cache", errno.ErrNoSuchTable)
+	tk.MustExec("alter table t1 cache")
+	checkTableCache(c, tk.Se, "test", "t1")
+	tk.MustExec("drop table if exists t1")
+	/*Test can't skip schema checker*/
+	tk.MustExec("drop table if exists t1,t2")
+	tk.MustExec("CREATE TABLE t1 (a int)")
+	tk.MustExec("CREATE TABLE t2 (a int)")
+	tk.MustExec("begin")
+	tk.MustExec("insert into t1 set a=1;")
+	tk2.MustExec("alter table t1 cache;")
+	_, err := tk.Exec("commit")
+	c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
+	/* Test can skip schema checker */
+	tk.MustExec("begin")
+	tk.MustExec("insert into t1 set a=2;")
+	tk2.MustExec("alter table t2 cache")
+	tk.MustExec("commit")
+	// Test if a table is not exists
+	tk.MustExec("drop table if exists t")
+	tk.MustGetErrCode("alter table t cache", errno.ErrNoSuchTable)
+	tk.MustExec("create table t (a int)")
+	tk.MustExec("alter table t cache")
+	// Multiple alter cache is okay
+	tk.MustExec("alter table t cache")
+	tk.MustExec("alter table t cache")
+	// Test a temporary table
+	tk.MustExec("drop table if exists t")
+	tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)")
+	tk.MustExec("drop table if exists tmp1")
+	// local temporary table alter is not supported
+	tk.MustGetErrCode("alter table t cache", errno.ErrUnsupportedDDLOperation)
+	// test global temporary table
+	tk.MustExec("create global temporary table tmp1 " +
+		"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
+		"on commit delete rows")
+	tk.MustGetErrMsg("alter table tmp1 cache", ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache").Error())
+
+}
+
+func (s *testDBSuite2) TestAlterPartitionCache(c *C) {
+	tk := testkit.NewTestKit(c, s.store)
+	tk.MustExec("use test;")
+	tk.MustExec("drop table if exists cache_partition_table;")
+	tk.MustExec("create table cache_partition_table (a int, b int) partition by hash(a) partitions 3;")
+	tk.MustGetErrCode("alter table cache_partition_table cache", errno.ErrOptOnCacheTable)
+	defer tk.MustExec("drop table if exists cache_partition_table;")
+	tk.MustExec("drop table if exists cache_partition_range_table;")
+	tk.MustExec(`create table cache_partition_range_table (c1 smallint(6) not null, c2 char(5) default null) partition by range ( c1 ) (
+			partition p0 values less than (10),
+			partition p1 values less than (20),
+			partition p2 values less than (30),
+			partition p3 values less than (MAXVALUE)
+	);`)
+	tk.MustGetErrCode("alter table cache_partition_range_table cache;", errno.ErrOptOnCacheTable)
+	defer tk.MustExec("drop table if exists cache_partition_range_table;")
+	tk.MustExec("drop table if exists partition_list_table;")
+	tk.MustExec("set @@session.tidb_enable_list_partition = ON")
+	tk.MustExec(`create table cache_partition_list_table (id int) partition by list  (id) (
+	    partition p0 values in (1,2),
+	    partition p1 values in (3,4),
+	    partition p3 values in (5,null)
+	);`)
+	tk.MustGetErrCode("alter table cache_partition_list_table cache", errno.ErrOptOnCacheTable)
+	tk.MustExec("drop table if exists cache_partition_list_table;")
+}
+
+func (s *testDBSuite2) TestAlterViewTableCache(c *C) {
+	tk := testkit.NewTestKit(c, s.store)
+	tk.MustExec("use test;")
+	tk.MustExec("drop table if exists cache_view_t")
+	tk.MustExec("create table cache_view_t (id int);")
+	tk.MustExec("create view v as select * from cache_view_t")
+	tk.MustGetErrCode("alter table v cache", errno.ErrWrongObject)
+}
diff --git a/ddl/db_test.go b/ddl/db_test.go
index 4b9da4557f08f..e85f5d226d4f0 100644
--- a/ddl/db_test.go
+++ b/ddl/db_test.go
@@ -5881,56 +5881,6 @@ func (s *testDBSuite2) TestTableLocksLostCommit(c *C) {
 	tk.MustExec("unlock tables")
 }
 
-// test alter table cache
-func (s *testDBSuite2) TestAlterTableCache(c *C) {
-	tk := testkit.NewTestKit(c, s.store)
-	tk2 := testkit.NewTestKit(c, s.store)
-	tk.MustExec("use test")
-	tk.MustExec("drop table if exists t1")
-	tk2.MustExec("use test")
-	/* Test of cache table */
-	tk.MustExec("create table t1 ( n int auto_increment primary key)")
-	tk.MustGetErrCode("alter table t1 ca", errno.ErrParse)
-	tk.MustGetErrCode("alter table t2 cache", errno.ErrNoSuchTable)
-	tk.MustExec("alter table t1 cache")
-	checkTableCache(c, tk.Se, "test", "t1")
-	tk.MustExec("drop table if exists t1")
-	/*Test can't skip schema checker*/
-	tk.MustExec("drop table if exists t1,t2")
-	tk.MustExec("CREATE TABLE t1 (a int)")
-	tk.MustExec("CREATE TABLE t2 (a int)")
-	tk.MustExec("begin")
-	tk.MustExec("insert into t1 set a=1;")
-	tk2.MustExec("alter table t1 cache;")
-	_, err := tk.Exec("commit")
-	c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
-	/* Test can skip schema checker */
-	tk.MustExec("begin")
-	tk.MustExec("insert into t1 set a=2;")
-	tk2.MustExec("alter table t2 cache")
-	tk.MustExec("commit")
-	// Test if a table is not exists
-	tk.MustExec("drop table if exists t")
-	tk.MustGetErrCode("alter table t cache", errno.ErrNoSuchTable)
-	tk.MustExec("create table t (a int)")
-	tk.MustExec("alter table t cache")
-	// Multiple alter cache is okay
-	tk.MustExec("alter table t cache")
-	tk.MustExec("alter table t cache")
-	// Test a temporary table
-	tk.MustExec("drop table if exists t")
-	tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)")
-	tk.MustExec("drop table if exists tmp1")
-	// local temporary table alter is not supported
-	tk.MustGetErrCode("alter table t cache", errno.ErrUnsupportedDDLOperation)
-	// test global temporary table
-	tk.MustExec("create global temporary table tmp1 " +
-		"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
-		"on commit delete rows")
-	tk.MustGetErrMsg("alter table tmp1 cache", ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache").Error())
-
-}
-
 // test write local lock
 func (s *testDBSuite2) TestWriteLocal(c *C) {
 	tk := testkit.NewTestKit(c, s.store)
diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go
index 081c5246b5eac..5935a738d7663 100644
--- a/ddl/ddl_api.go
+++ b/ddl/ddl_api.go
@@ -6637,8 +6637,13 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
 	if t.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
 		return nil
 	}
+
 	if t.Meta().TempTableType != model.TempTableNone {
-		return ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache")
+		return errors.Trace(ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache"))
+	}
+
+	if t.Meta().Partition != nil {
+		return errors.Trace(ErrOptOnCacheTable.GenWithStackByArgs("partition mode"))
 	}
 	job := &model.Job{
 		SchemaID:   schema.ID,
diff --git a/ddl/error.go b/ddl/error.go
index bd3b118a235d5..88e7494a1e3c0 100644
--- a/ddl/error.go
+++ b/ddl/error.go
@@ -287,8 +287,9 @@ var (
 	ErrPartitionNoTemporary = dbterror.ClassDDL.NewStd(mysql.ErrPartitionNoTemporary)
 
 	// ErrOptOnTemporaryTable returns when exec unsupported opt at temporary mode
-	ErrOptOnTemporaryTable = dbterror.ClassDDL.NewStd(mysql.ErrOptOnTemporaryTable)
-
+	ErrOptOnTemporaryTable              = dbterror.ClassDDL.NewStd(mysql.ErrOptOnTemporaryTable)
+	// ErrOptOnCacheTable returns when exec unsupported opt at cache mode
+	ErrOptOnCacheTable                  = dbterror.ClassDDL.NewStd(mysql.ErrOptOnCacheTable)
 	errUnsupportedOnCommitPreserve      = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("TiDB doesn't support ON COMMIT PRESERVE ROWS for now", nil))
 	errUnsupportedClusteredSecondaryKey = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("CLUSTERED/NONCLUSTERED keyword is only supported for primary key", nil))
 
diff --git a/ddl/table.go b/ddl/table.go
index 430f688c8fc2b..cadc688b3bc16 100644
--- a/ddl/table.go
+++ b/ddl/table.go
@@ -1475,9 +1475,15 @@ func onAlterCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) {
 		job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
 		return ver, nil
 	}
+
 	if tbInfo.TempTableType != model.TempTableNone {
 		return ver, errors.Trace(ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache"))
 	}
+
+	if tbInfo.Partition != nil {
+		return ver, errors.Trace(ErrOptOnCacheTable.GenWithStackByArgs("partition mode"))
+	}
+
 	switch tbInfo.TableCacheStatusType {
 	case model.TableCacheStatusDisable:
 		// disable -> switching

From 6f9aca1bed35e208166535b9d5a533d41b0ab642 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Fri, 29 Oct 2021 22:10:17 +0800
Subject: [PATCH 10/13] fix

---
 ddl/error.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ddl/error.go b/ddl/error.go
index 88e7494a1e3c0..6d21dca761aa0 100644
--- a/ddl/error.go
+++ b/ddl/error.go
@@ -287,7 +287,7 @@ var (
 	ErrPartitionNoTemporary = dbterror.ClassDDL.NewStd(mysql.ErrPartitionNoTemporary)
 
 	// ErrOptOnTemporaryTable returns when exec unsupported opt at temporary mode
-	ErrOptOnTemporaryTable              = dbterror.ClassDDL.NewStd(mysql.ErrOptOnTemporaryTable)
+	ErrOptOnTemporaryTable = dbterror.ClassDDL.NewStd(mysql.ErrOptOnTemporaryTable)
 	// ErrOptOnCacheTable returns when exec unsupported opt at cache mode
 	ErrOptOnCacheTable                  = dbterror.ClassDDL.NewStd(mysql.ErrOptOnCacheTable)
 	errUnsupportedOnCommitPreserve      = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("TiDB doesn't support ON COMMIT PRESERVE ROWS for now", nil))

From e0ff68daf4c6a5d26308e1006ba9d0f6db1e534d Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Fri, 29 Oct 2021 22:26:19 +0800
Subject: [PATCH 11/13] add_erros.toml

---
 errors.toml | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/errors.toml b/errors.toml
index 7f652f001fb0b..3190d921de409 100644
--- a/errors.toml
+++ b/errors.toml
@@ -711,6 +711,11 @@ error = '''
 Placement policy '%-.192s' is still in use
 '''
 
+["ddl:8242"]
+error = '''
+'%s' is unsupported on cache tables.
+'''
+
 ["domain:8027"]
 error = '''
 Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV

From 3b7c587359d9e2bd3f4299c2d63d8a25d46badcd Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Mon, 1 Nov 2021 18:29:22 +0800
Subject: [PATCH 12/13] Update db_cache_test.go

---
 ddl/db_cache_test.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ddl/db_cache_test.go b/ddl/db_cache_test.go
index ebe769840be5d..34a9a70be9696 100644
--- a/ddl/db_cache_test.go
+++ b/ddl/db_cache_test.go
@@ -1,4 +1,4 @@
-// Copyright 2018 PingCAP, Inc.
+// Copyright 2020 PingCAP, Inc.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.

From e733b35eaf3ebbeea60c594f0a0a6d3227e43eb8 Mon Sep 17 00:00:00 2001
From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com>
Date: Mon, 1 Nov 2021 18:33:20 +0800
Subject: [PATCH 13/13] Update db_cache_test.go

---
 ddl/db_cache_test.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ddl/db_cache_test.go b/ddl/db_cache_test.go
index 34a9a70be9696..d0a29b280a7ef 100644
--- a/ddl/db_cache_test.go
+++ b/ddl/db_cache_test.go
@@ -1,4 +1,4 @@
-// Copyright 2020 PingCAP, Inc.
+// Copyright 2021 PingCAP, Inc.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.