Skip to content

Commit

Permalink
ddl,mdl: fix the issue that the table_ids overlap regex is not correct (
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored and terry1purcell committed May 17, 2024
1 parent e6d6097 commit 0a9ed8a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 4 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,5 +425,5 @@ func TestGetExistedUserDBs(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(196), session.CurrentBootstrapVersion)
require.Equal(t, int64(197), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
}
// Check if there is any running job works on the same table.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',(', REPLACE(t1.table_ids, ',', '|'), '),') != 0)"+
"or (type = %d and processing)", job.ID, model.ActionFlashbackCluster)
rows, err := sess.Execute(d.ctx, sql, "check conflict jobs")
return len(rows) == 0, err
Expand Down
89 changes: 89 additions & 0 deletions pkg/infoschema/test/clustertablestest/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/privilege/privileges"
"github.com/pingcap/tidb/pkg/server"
Expand Down Expand Up @@ -1694,3 +1695,91 @@ func TestUnusedIndexView(t *testing.T) {
return result.Equal(expectedResult)
}, 5*time.Second, 100*time.Millisecond)
}

func TestMDLViewIDConflict(t *testing.T) {
save := privileges.SkipWithGrant
privileges.SkipWithGrant = true
defer func() {
privileges.SkipWithGrant = save
}()

s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()
tk := s.newTestKitWithRoot(t)

tk.MustExec("use test")
tk.MustExec("create table t(a int);")
tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tk.MustExec("insert into t values (1)")

bigID := tbl.Meta().ID * 10
bigTableName := ""
// set a hard limitation on 10000 to avoid using too much resource
for i := 0; i < 10000; i++ {
bigTableName = fmt.Sprintf("t%d", i)
tk.MustExec(fmt.Sprintf("create table %s(a int);", bigTableName))

tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(bigTableName))
require.NoError(t, err)

require.LessOrEqual(t, tbl.Meta().ID, bigID)
if tbl.Meta().ID == bigID {
break
}
}
tk.MustExec("insert into t1 values (1)")
tk.MustExec(fmt.Sprintf("insert into %s values (1)", bigTableName))

// Now we have two table: t and `bigTableName`. The later one's ID is 10 times the former one.
// Then create two session to run TXNs on these two tables
txnTK1 := s.newTestKitWithRoot(t)
txnTK2 := s.newTestKitWithRoot(t)
txnTK1.MustExec("use test")
txnTK1.MustExec("BEGIN")
// this transaction will query `t` and one another table. Then the `related_table_ids` is `smallID|anotherID`
txnTK1.MustQuery("SELECT * FROM t").Check(testkit.Rows("1"))
txnTK1.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
txnTK2.MustExec("use test")
txnTK2.MustExec("BEGIN")
txnTK2.MustQuery("SELECT * FROM " + bigTableName).Check(testkit.Rows("1"))

testTK := s.newTestKitWithRoot(t)
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", testTK.Session().GetSessionManager())
defer s.rpcserver.Stop()
testTK.MustQuery("select table_name from mysql.tidb_mdl_view").Check(testkit.Rows())

// run a DDL on the table with smallID
ddlTK1 := s.newTestKitWithRoot(t)
ddlTK1.MustExec("use test")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
ddlTK1.MustExec("ALTER TABLE t ADD COLUMN b INT;")
wg.Done()
}()
ddlTK2 := s.newTestKitWithRoot(t)
ddlTK2.MustExec("use test")
wg.Add(1)
go func() {
ddlTK2.MustExec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN b INT;", bigTableName))
wg.Done()
}()

require.Eventually(t, func() bool {
rows := testTK.MustQuery("select table_ids from mysql.tidb_mdl_info").Rows()
return len(rows) == 2
}, time.Second*10, time.Second)

// it only contains the table with smallID
require.Eventually(t, func() bool {
rows := testTK.MustQuery("select table_name, query, start_time from mysql.tidb_mdl_view order by table_name").Rows()
return len(rows) == 2
}, time.Second*10, time.Second)
txnTK1.MustExec("COMMIT")
txnTK2.MustExec("COMMIT")
wg.Wait()
}
17 changes: 15 additions & 2 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ const (
mysql.tidb_mdl_info,
information_schema.cluster_tidb_trx
WHERE tidb_ddl_job.job_id=tidb_mdl_info.job_id
AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), ',') != 0
AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',(', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), '),') != 0
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
Expand Down Expand Up @@ -1089,11 +1089,15 @@ const (
// add column `target_scope` for 'mysql.tidb_global_task` table
// add column `target_scope` for 'mysql.tidb_global_task_history` table
version196 = 196

// version 197
// replace `mysql.tidb_mdl_view` table
version197 = 197
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version196
var currentBootstrapVersion int64 = version197

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1255,6 +1259,7 @@ var (
upgradeToVer194,
upgradeToVer195,
upgradeToVer196,
upgradeToVer197,
}
)

Expand Down Expand Up @@ -3090,6 +3095,14 @@ func upgradeToVer196(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN target_scope VARCHAR(256) DEFAULT '' AFTER `step`;", infoschema.ErrColumnExists)
}

func upgradeToVer197(s sessiontypes.Session, ver int64) {
if ver >= version197 {
return
}

doReentrantDDL(s, CreateMDLView)
}

func writeOOMAction(s sessiontypes.Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down

0 comments on commit 0a9ed8a

Please sign in to comment.