Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl,mdl: fix the issue that the table_ids overlap regex is not correct #52879

Merged
merged 1 commit into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,5 +425,5 @@ func TestGetExistedUserDBs(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(196), session.CurrentBootstrapVersion)
require.Equal(t, int64(197), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
}
// Check if there is any running job works on the same table.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+
"(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',(', REPLACE(t1.table_ids, ',', '|'), '),') != 0)"+
"or (type = %d and processing)", job.ID, model.ActionFlashbackCluster)
rows, err := sess.Execute(d.ctx, sql, "check conflict jobs")
return len(rows) == 0, err
Expand Down
89 changes: 89 additions & 0 deletions pkg/infoschema/test/clustertablestest/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/privilege/privileges"
"github.com/pingcap/tidb/pkg/server"
Expand Down Expand Up @@ -1694,3 +1695,91 @@ func TestUnusedIndexView(t *testing.T) {
return result.Equal(expectedResult)
}, 5*time.Second, 100*time.Millisecond)
}

func TestMDLViewIDConflict(t *testing.T) {
save := privileges.SkipWithGrant
privileges.SkipWithGrant = true
defer func() {
privileges.SkipWithGrant = save
}()

s := new(clusterTablesSuite)
s.store, s.dom = testkit.CreateMockStoreAndDomain(t)
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()
tk := s.newTestKitWithRoot(t)

tk.MustExec("use test")
tk.MustExec("create table t(a int);")
tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tk.MustExec("insert into t values (1)")

bigID := tbl.Meta().ID * 10
bigTableName := ""
// set a hard limitation on 10000 to avoid using too much resource
for i := 0; i < 10000; i++ {
bigTableName = fmt.Sprintf("t%d", i)
tk.MustExec(fmt.Sprintf("create table %s(a int);", bigTableName))

tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(bigTableName))
require.NoError(t, err)

require.LessOrEqual(t, tbl.Meta().ID, bigID)
if tbl.Meta().ID == bigID {
break
}
}
tk.MustExec("insert into t1 values (1)")
tk.MustExec(fmt.Sprintf("insert into %s values (1)", bigTableName))

// Now we have two table: t and `bigTableName`. The later one's ID is 10 times the former one.
// Then create two session to run TXNs on these two tables
txnTK1 := s.newTestKitWithRoot(t)
txnTK2 := s.newTestKitWithRoot(t)
txnTK1.MustExec("use test")
txnTK1.MustExec("BEGIN")
// this transaction will query `t` and one another table. Then the `related_table_ids` is `smallID|anotherID`
txnTK1.MustQuery("SELECT * FROM t").Check(testkit.Rows("1"))
txnTK1.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
txnTK2.MustExec("use test")
txnTK2.MustExec("BEGIN")
txnTK2.MustQuery("SELECT * FROM " + bigTableName).Check(testkit.Rows("1"))

testTK := s.newTestKitWithRoot(t)
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", testTK.Session().GetSessionManager())
defer s.rpcserver.Stop()
testTK.MustQuery("select table_name from mysql.tidb_mdl_view").Check(testkit.Rows())

// run a DDL on the table with smallID
ddlTK1 := s.newTestKitWithRoot(t)
ddlTK1.MustExec("use test")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
ddlTK1.MustExec("ALTER TABLE t ADD COLUMN b INT;")
wg.Done()
}()
ddlTK2 := s.newTestKitWithRoot(t)
ddlTK2.MustExec("use test")
wg.Add(1)
go func() {
ddlTK2.MustExec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN b INT;", bigTableName))
wg.Done()
}()

require.Eventually(t, func() bool {
rows := testTK.MustQuery("select table_ids from mysql.tidb_mdl_info").Rows()
return len(rows) == 2
}, time.Second*10, time.Second)

// it only contains the table with smallID
require.Eventually(t, func() bool {
rows := testTK.MustQuery("select table_name, query, start_time from mysql.tidb_mdl_view order by table_name").Rows()
return len(rows) == 2
}, time.Second*10, time.Second)
txnTK1.MustExec("COMMIT")
txnTK2.MustExec("COMMIT")
wg.Wait()
}
17 changes: 15 additions & 2 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ const (
mysql.tidb_mdl_info,
information_schema.cluster_tidb_trx
WHERE tidb_ddl_job.job_id=tidb_mdl_info.job_id
AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), ',') != 0
AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',(', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), '),') != 0
);`

// CreatePlanReplayerStatusTable is a table about plan replayer status
Expand Down Expand Up @@ -1089,11 +1089,15 @@ const (
// add column `target_scope` for 'mysql.tidb_global_task` table
// add column `target_scope` for 'mysql.tidb_global_task_history` table
version196 = 196

// version 197
// replace `mysql.tidb_mdl_view` table
version197 = 197
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version196
var currentBootstrapVersion int64 = version197

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1255,6 +1259,7 @@ var (
upgradeToVer194,
upgradeToVer195,
upgradeToVer196,
upgradeToVer197,
}
)

Expand Down Expand Up @@ -3090,6 +3095,14 @@ func upgradeToVer196(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN target_scope VARCHAR(256) DEFAULT '' AFTER `step`;", infoschema.ErrColumnExists)
}

func upgradeToVer197(s sessiontypes.Session, ver int64) {
if ver >= version197 {
return
}

doReentrantDDL(s, CreateMDLView)
}

func writeOOMAction(s sessiontypes.Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down
Loading