diff --git a/executor/slow_query.go b/executor/slow_query.go index 9ecdfcc840cd3..126f039f3b741 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -944,18 +944,6 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co e.stats.totalFileNum = totalFileNum }() } - if e.extractor == nil || !e.extractor.Enable { - totalFileNum = 1 - //nolint: gosec - file, err := os.Open(logFilePath) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - return nil, err - } - return []logFile{{file: file}}, nil - } var logFiles []logFile logDir := filepath.Dir(logFilePath) ext := filepath.Ext(logFilePath) @@ -999,17 +987,20 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co return handleErr(err) } start := types.NewTime(types.FromGoTime(fileStartTime), mysql.TypeDatetime, types.MaxFsp) - notInAllTimeRanges := true - for _, tr := range e.checker.timeRanges { - if start.Compare(tr.endTime) <= 0 { - notInAllTimeRanges = false - break + if e.checker.enableTimeCheck { + notInAllTimeRanges := true + for _, tr := range e.checker.timeRanges { + if start.Compare(tr.endTime) <= 0 { + notInAllTimeRanges = false + break + } + } + if notInAllTimeRanges { + return nil } - } - if notInAllTimeRanges { - return nil } +<<<<<<< HEAD:executor/slow_query.go // Get the file end time. fileEndTime, err := e.getFileEndTime(ctx, file) if err != nil { @@ -1021,6 +1012,28 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) { inTimeRanges = true break +======= + // If we want to get the end time from a compressed file, + // we need uncompress the whole file which is very slow and consume a lot of memory. + if !compressed { + // Get the file end time. + fileEndTime, err := e.getFileEndTime(ctx, file) + if err != nil { + return handleErr(err) + } + if e.checker.enableTimeCheck { + end := types.NewTime(types.FromGoTime(fileEndTime), mysql.TypeDatetime, types.MaxFsp) + inTimeRanges := false + for _, tr := range e.checker.timeRanges { + if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) { + inTimeRanges = true + break + } + } + if !inTimeRanges { + return nil + } +>>>>>>> 484c1ae88ee (executor: fix issue that query slow_query table return wrong result (#56356)):pkg/executor/slow_query.go } } if !inTimeRanges { @@ -1048,7 +1061,32 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co slices.SortFunc(logFiles, func(i, j logFile) int { return cmp.Compare(i.start.UnixNano(), j.start.UnixNano()) }) +<<<<<<< HEAD:executor/slow_query.go return logFiles, err +======= + // Assume no time range overlap in log files and remove unnecessary log files for compressed files. + var ret []logFile + for i, file := range logFiles { + if i == len(logFiles)-1 || !file.compressed || !e.checker.enableTimeCheck { + ret = append(ret, file) + continue + } + start := types.NewTime(types.FromGoTime(logFiles[i].start), mysql.TypeDatetime, types.MaxFsp) + // use next file.start as endTime + end := types.NewTime(types.FromGoTime(logFiles[i+1].start), mysql.TypeDatetime, types.MaxFsp) + inTimeRanges := false + for _, tr := range e.checker.timeRanges { + if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) { + inTimeRanges = true + break + } + } + if inTimeRanges { + ret = append(ret, file) + } + } + return ret, err +>>>>>>> 484c1ae88ee (executor: fix issue that query slow_query table return wrong result (#56356)):pkg/executor/slow_query.go } func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File) (time.Time, error) { diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index f81fb5e35dbda..8dfeb5f5078ff 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -351,6 +351,7 @@ select 6; select 7;` logData := []string{logData0, logData1, logData2, logData3} +<<<<<<< HEAD:executor/slow_query_test.go fileName0 := "tidb-slow-2020-02-14T19-04-05.01.log" fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log" fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log" @@ -358,6 +359,166 @@ select 7;` fileNames := []string{fileName0, fileName1, fileName2, fileName3} prepareLogs(t, logData, fileNames) defer func() { +======= + fileName0 := "tidb-slow-retriever-2020-02-14T19-04-05.01.log" + fileName1 := "tidb-slow-retriever-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-retriever-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow-retriever.log" + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowQueryFile = fileName3 + }) + for k := 0; k < 2; k++ { + // k = 0 for normal files + // k = 1 for compressed files + var fileNames []string + if k == 0 { + fileNames = []string{fileName0, fileName1, fileName2, fileName3} + } else { + fileNames = []string{fileName0 + ".gz", fileName1 + ".gz", fileName2 + ".gz", fileName3} + } + prepareLogs(t, logData, fileNames) + + cases := []struct { + startTime string + endTime string + files []string + querys []string + }{ + { + startTime: "2020-02-15T18:00:00.000000+08:00", + endTime: "2020-02-17T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 1;", + "select 2;", + "select 3;", + "select 4;", + "select 5;", + "select 6;", + }, + }, + { + startTime: "2020-02-15T18:00:02.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 2;", + "select 3;", + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T18:59:00.000000+08:00", + files: []string{fileName2}, + querys: []string{ + "select 4;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName2, fileName3}, + querys: []string{ + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T19:00:00.000000+08:00", + endTime: "2020-02-17T17:00:00.000000+08:00", + files: []string{fileName3}, + querys: []string{ + "select 5;", + }, + }, + { + startTime: "2010-01-01T00:00:00.000000+08:00", + endTime: "2010-01-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "2020-03-01T00:00:00.000000+08:00", + endTime: "2010-03-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "", + endTime: "", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 1;", + "select 2;", + "select 3;", + "select 4;", + "select 5;", + "select 6;", + "select 7;", + }, + }, + { + startTime: "2020-04-15T18:00:05.299063744+08:00", + endTime: "2020-04-15T18:00:05.299063744+08:00", + files: []string{fileName3}, + querys: []string{ + "select 7;", + }, + }, + } + + loc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + sctx := mock.NewContext() + sctx.ResetSessionAndStmtTimeZone(loc) + sctx.GetSessionVars().SlowQueryFile = fileName3 + for i, cas := range cases { + extractor := &plannercore.SlowQueryExtractor{Enable: len(cas.startTime) > 0 && len(cas.endTime) > 0} + if extractor.Enable { + startTime, err := ParseTime(cas.startTime) + require.NoError(t, err) + endTime, err := ParseTime(cas.endTime) + require.NoError(t, err) + extractor.TimeRanges = []*plannercore.TimeRange{{StartTime: startTime, EndTime: endTime}} + } + retriever, err := newSlowQueryRetriever() + require.NoError(t, err) + retriever.extractor = extractor + err = retriever.initialize(context.Background(), sctx) + require.NoError(t, err) + comment := fmt.Sprintf("compressed: %v, case id: %v", k, i) + if len(retriever.files) > 0 { + var reader *bufio.Reader + reader, err := retriever.getNextReader() + require.NoError(t, err, comment) + rows, err := parseLog(retriever, sctx, reader) + require.NoError(t, err, comment) + require.Equal(t, len(rows), len(cas.querys), comment) + for i, row := range rows { + require.Equal(t, row[len(row)-1].GetString(), cas.querys[i], comment) + } + } + + if k == 0 { + require.Equal(t, len(retriever.files), len(cas.files), comment) + for i, file := range retriever.files { + require.Equal(t, file.file.Name(), cas.files[i], comment) + } + } else { + // for compressed file, sometimes it will contains one more file. + require.True(t, (len(retriever.files) == len(cas.files)) || (len(retriever.files) == len(cas.files)+1), comment) + var fileNames []string + for _, file := range retriever.files { + fileNames = append(fileNames, strings.TrimSuffix(file.file.Name(), ".gz")) + } + for _, file := range cas.files { + require.Contains(t, fileNames, file, comment) + } + } + require.NoError(t, retriever.close()) + } +>>>>>>> 484c1ae88ee (executor: fix issue that query slow_query table return wrong result (#56356)):pkg/executor/slow_query_test.go removeFiles(fileNames) }() diff --git a/pkg/executor/cluster_table_test.go b/pkg/executor/cluster_table_test.go new file mode 100644 index 0000000000000..62acd59b4bac5 --- /dev/null +++ b/pkg/executor/cluster_table_test.go @@ -0,0 +1,356 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "compress/gzip" + "context" + "fmt" + "net" + "os" + "strings" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/auth" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/server" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func createRPCServer(t *testing.T, dom *domain.Domain) *grpc.Server { + sm := &testkit.MockSessionManager{} + sm.PS = append(sm.PS, &util.ProcessInfo{ + ID: 1, + User: "root", + Host: "127.0.0.1", + Command: mysql.ComQuery, + }) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv := server.NewRPCServer(config.GetGlobalConfig(), dom, sm) + port := lis.Addr().(*net.TCPAddr).Port + go func() { + err = srv.Serve(lis) + require.NoError(t, err) + }() + + config.UpdateGlobal(func(conf *config.Config) { + conf.Status.StatusPort = uint(port) + }) + + return srv +} + +func TestClusterTableSlowQuery(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + srv := createRPCServer(t, dom) + defer srv.Stop() + + logData0 := "" + logData1 := ` +# Time: 2020-02-15T18:00:01.000000+08:00 +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +select 2;` + logData2 := ` +# Time: 2020-02-16T18:00:01.000000+08:00 +select 3; +# Time: 2020-02-16T18:00:05.000000+08:00 +select 4;` + logData3 := ` +# Time: 2020-02-16T19:00:00.000000+08:00 +select 5; +# Time: 2020-02-17T18:00:05.000000+08:00 +select 6;` + logData4 := ` +# Time: 2020-05-14T19:03:54.314615176+08:00 +select 7;` + logData := []string{logData0, logData1, logData2, logData3, logData4} + + fileName0 := "tidb-slow-query-2020-02-14T19-04-05.01.log" + fileName1 := "tidb-slow-query-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-query-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow-query-2020-02-17T18-00-05.01.log" + fileName4 := "tidb-slow-query.log" + fileNames := []string{fileName0, fileName1, fileName2, fileName3, fileName4} + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowQueryFile = fileName4 + }) + + prepareLogs(t, logData, fileNames) + defer func() { + removeFiles(t, fileNames) + }() + tk := testkit.NewTestKit(t, store) + loc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + tk.Session().GetSessionVars().TimeZone = loc + tk.MustExec("use information_schema") + cases := []struct { + prepareSQL string + sql string + result []string + }{ + { + sql: "select count(*),min(time),max(time) from %s where time > '2019-01-26 21:51:00' and time < now()", + result: []string{"7|2020-02-15 18:00:01.000000|2020-05-14 19:03:54.314615"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-15 19:00:00' and time < '2020-02-16 18:00:02'", + result: []string{"2|2020-02-15 19:00:05.000000|2020-02-16 18:00:01.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 17:00:00'", + result: []string{"2|2020-02-16 18:00:05.000000|2020-02-16 19:00:00.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 18:00:02' and time < '2020-02-17 20:00:00'", + result: []string{"3|2020-02-16 18:00:05.000000|2020-02-17 18:00:05.000000"}, + }, + { + sql: "select count(*),min(time),max(time) from %s", + result: []string{"7|2020-02-15 18:00:01.000000|2020-05-14 19:03:54.314615"}, + }, + { + sql: "select count(*),min(time),max(time) from %s where time > '2020-02-16 20:00:00'", + result: []string{"2|2020-02-17 18:00:05.000000|2020-05-14 19:03:54.314615"}, + }, + { + sql: "select count(*) from %s where time > '2020-02-17 20:00:00'", + result: []string{"1"}, + }, + { + sql: "select count(*) from %s where time > '1980-01-11 00:00:00'", + result: []string{"7"}, + }, + { + sql: "select count(*) from %s where time < '2024-01-01 00:00:00'", + result: []string{"7"}, + }, + { + sql: "select query from %s where time > '2019-01-26 21:51:00' and time < now()", + result: []string{"select 1;", "select 2;", "select 3;", "select 4;", "select 5;", "select 6;", "select 7;"}, + }, + // Test for different timezone. + { + prepareSQL: "set @@time_zone = '+00:00'", + sql: "select time from %s where time = '2020-02-17 10:00:05.000000'", + result: []string{"2020-02-17 10:00:05.000000"}, + }, + { + prepareSQL: "set @@time_zone = '+02:00'", + sql: "select time from %s where time = '2020-02-17 12:00:05.000000'", + result: []string{"2020-02-17 12:00:05.000000"}, + }, + // Test for issue 17224 + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from %s where time = '2020-05-14 19:03:54.314615'", + result: []string{"2020-05-14 19:03:54.314615"}, + }, + } + for _, cas := range cases { + if len(cas.prepareSQL) > 0 { + tk.MustExec(cas.prepareSQL) + } + sql := fmt.Sprintf(cas.sql, "slow_query") + tk.MustQuery(sql).Check(testkit.RowsWithSep("|", cas.result...)) + sql = fmt.Sprintf(cas.sql, "cluster_slow_query") + tk.MustQuery(sql).Check(testkit.RowsWithSep("|", cas.result...)) + } +} + +func TestIssue20236(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + srv := createRPCServer(t, dom) + defer srv.Stop() + + logData0 := "" + logData1 := ` +# Time: 2020-02-15T18:00:01.000000+08:00 +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +select 2; +# Time: 2020-02-15T20:00:05.000000+08:00` + logData2 := `select 3; +# Time: 2020-02-16T18:00:01.000000+08:00 +select 4; +# Time: 2020-02-16T18:00:05.000000+08:00 +select 5;` + logData3 := ` +# Time: 2020-02-16T19:00:00.000000+08:00 +select 6; +# Time: 2020-02-17T18:00:05.000000+08:00 +select 7; +# Time: 2020-02-17T19:00:00.000000+08:00` + logData4 := `select 8; +# Time: 2020-02-17T20:00:00.000000+08:00 +select 9 +# Time: 2020-05-14T19:03:54.314615176+08:00 +select 10;` + logData := []string{logData0, logData1, logData2, logData3, logData4} + + fileName0 := "tidb-slow-20236-2020-02-14T19-04-05.01.log" + fileName1 := "tidb-slow-20236-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-20236-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow-20236-2020-02-17T18-00-05.01.log" + fileName4 := "tidb-slow-20236.log" + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowQueryFile = fileName4 + }) + for k := 0; k < 2; k++ { + // k = 0 for normal files + // k = 1 for compressed files + var fileNames []string + if k == 0 { + fileNames = []string{fileName0, fileName1, fileName2, fileName3, fileName4} + } else { + fileNames = []string{fileName0 + ".gz", fileName1 + ".gz", fileName2 + ".gz", fileName3 + ".gz", fileName4} + } + prepareLogs(t, logData, fileNames) + tk := testkit.NewTestKit(t, store) + loc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + tk.Session().GetSessionVars().TimeZone = loc + tk.MustExec("use information_schema") + cases := []struct { + prepareSQL string + sql string + result []string + }{ + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from cluster_slow_query where time > '2020-02-17 12:00:05.000000' and time < '2020-05-14 20:00:00.000000'", + result: []string{"2020-02-17 18:00:05.000000", "2020-02-17 19:00:00.000000", "2020-05-14 19:03:54.314615"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from cluster_slow_query where time > '2020-02-17 12:00:05.000000' and time < '2020-05-14 20:00:00.000000' order by time desc", + result: []string{"2020-05-14 19:03:54.314615", "2020-02-17 19:00:00.000000", "2020-02-17 18:00:05.000000"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from cluster_slow_query where (time > '2020-02-15 18:00:00' and time < '2020-02-15 20:00:00') or (time > '2020-02-17 18:00:00' and time < '2020-05-14 20:00:00') order by time", + result: []string{"2020-02-15 18:00:01.000000", "2020-02-15 19:00:05.000000", "2020-02-17 18:00:05.000000", "2020-02-17 19:00:00.000000", "2020-05-14 19:03:54.314615"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from cluster_slow_query where (time > '2020-02-15 18:00:00' and time < '2020-02-15 20:00:00') or (time > '2020-02-17 18:00:00' and time < '2020-05-14 20:00:00') order by time desc", + result: []string{"2020-05-14 19:03:54.314615", "2020-02-17 19:00:00.000000", "2020-02-17 18:00:05.000000", "2020-02-15 19:00:05.000000", "2020-02-15 18:00:01.000000"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select count(*) from cluster_slow_query where time > '2020-02-15 18:00:00.000000' and time < '2020-05-14 20:00:00.000000' order by time desc", + result: []string{"9"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select count(*) from cluster_slow_query where (time > '2020-02-16 18:00:00' and time < '2020-05-14 20:00:00') or (time > '2020-02-17 18:00:00' and time < '2020-05-17 20:00:00')", + result: []string{"6"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select count(*) from cluster_slow_query where time > '2020-02-16 18:00:00.000000' and time < '2020-02-17 20:00:00.000000' order by time desc", + result: []string{"5"}, + }, + { + prepareSQL: "set @@time_zone = '+08:00'", + sql: "select time from cluster_slow_query where time > '2020-02-16 18:00:00.000000' and time < '2020-05-14 20:00:00.000000' order by time desc limit 3", + result: []string{"2020-05-14 19:03:54.314615", "2020-02-17 19:00:00.000000", "2020-02-17 18:00:05.000000"}, + }, + } + for _, cas := range cases { + if len(cas.prepareSQL) > 0 { + tk.MustExec(cas.prepareSQL) + } + tk.MustQuery(cas.sql).Check(testkit.RowsWithSep("|", cas.result...)) + } + removeFiles(t, fileNames) + } +} + +func TestSQLDigestTextRetriever(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + srv := createRPCServer(t, dom) + defer srv.Stop() + + tkInit := testkit.NewTestKit(t, store) + tkInit.MustExec("use test") + tkInit.MustExec("set global tidb_enable_stmt_summary = 1") + tkInit.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) + tkInit.MustExec("drop table if exists test_sql_digest_text_retriever") + tkInit.MustExec("create table test_sql_digest_text_retriever (id int primary key, v int)") + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + tk.MustExec("insert into test_sql_digest_text_retriever values (1, 1)") + + insertNormalized, insertDigest := parser.NormalizeDigest("insert into test_sql_digest_text_retriever values (1, 1)") + _, updateDigest := parser.NormalizeDigest("update test_sql_digest_text_retriever set v = v + 1 where id = 1") + r := &expression.SQLDigestTextRetriever{ + SQLDigestsMap: map[string]string{ + insertDigest.String(): "", + updateDigest.String(): "", + }, + } + + err := r.RetrieveLocal(context.Background(), tk.Session().GetRestrictedSQLExecutor()) + require.NoError(t, err) + require.Equal(t, insertNormalized, r.SQLDigestsMap[insertDigest.String()]) + require.Equal(t, "", r.SQLDigestsMap[updateDigest.String()]) +} + +func prepareLogs(t *testing.T, logData []string, fileNames []string) { + writeFile := func(file string, data string) { + if strings.HasSuffix(file, ".gz") { + f, err := os.Create(file) + require.NoError(t, err) + gz := gzip.NewWriter(f) + _, err = gz.Write([]byte(data)) + require.NoError(t, err) + require.NoError(t, gz.Close()) + require.NoError(t, f.Close()) + } else { + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + require.NoError(t, err) + _, err = f.Write([]byte(data)) + require.NoError(t, err) + require.NoError(t, f.Close()) + } + } + + for i, log := range logData { + writeFile(fileNames[i], log) + } +} + +func removeFiles(t *testing.T, fileNames []string) { + for _, fileName := range fileNames { + require.NoError(t, os.Remove(fileName)) + } +} diff --git a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go new file mode 100644 index 0000000000000..08e58f5c613dd --- /dev/null +++ b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go @@ -0,0 +1,1974 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clustertablestest + +import ( + "context" + "fmt" + "math/rand" + "net" + "net/http/httptest" + "os" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/failpoint" + "github.com/pingcap/fn" + "github.com/pingcap/kvproto/pkg/deadlock" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/infoschema/internal" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/auth" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/privilege/privileges" + "github.com/pingcap/tidb/pkg/server" + "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/store/mockstore/mockstorage" + "github.com/pingcap/tidb/pkg/store/mockstore/unistore" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/resourcegrouptag" + "github.com/pingcap/tidb/pkg/util/set" + "github.com/pingcap/tidb/pkg/util/stmtsummary" + "github.com/pingcap/tipb/go-tipb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" + pd "github.com/tikv/pd/client/http" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type clusterTablesSuite struct { + store kv.Storage + dom *domain.Domain + rpcserver *grpc.Server + httpServer *httptest.Server + mockAddr string + listenAddr string + startTime time.Time +} + +func TestForClusterServerInfo(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + tk := testkit.NewTestKit(t, s.store) + instances := []string{ + strings.Join([]string{"tidb", s.listenAddr, s.listenAddr, "mock-version,mock-githash,1001"}, ","), + strings.Join([]string{"pd", s.listenAddr, s.listenAddr, "mock-version,mock-githash,0"}, ","), + strings.Join([]string{"tikv", s.listenAddr, s.listenAddr, "mock-version,mock-githash,0"}, ","), + } + + fpExpr := `return("` + strings.Join(instances, ";") + `")` + fpName := "github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo" + require.NoError(t, failpoint.Enable(fpName, fpExpr)) + defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + + cases := []struct { + sql string + types set.StringSet + addrs set.StringSet + names set.StringSet + skipOnDist set.StringSet + }{ + { + sql: "select * from information_schema.CLUSTER_LOAD;", + types: set.NewStringSet("tidb", "tikv", "pd"), + addrs: set.NewStringSet(s.listenAddr), + names: set.NewStringSet("cpu", "memory", "net"), + }, + { + sql: "select * from information_schema.CLUSTER_HARDWARE;", + types: set.NewStringSet("tidb", "tikv", "pd"), + addrs: set.NewStringSet(s.listenAddr), + names: set.NewStringSet("cpu", "memory", "net", "disk"), + // The sysutil package will filter out all disk don't have /dev prefix. + // gopsutil cpu.Info will fail on mac M1 + skipOnDist: set.NewStringSet("windows", "darwin/arm64"), + }, + { + sql: "select * from information_schema.CLUSTER_SYSTEMINFO;", + types: set.NewStringSet("tidb", "tikv", "pd"), + addrs: set.NewStringSet(s.listenAddr), + names: set.NewStringSet("system"), + // This test get empty result and fails on the windows platform. + // Because the underlying implementation use `sysctl` command to get the result + // and there is no such command on windows. + // https://github.com/pingcap/sysutil/blob/2bfa6dc40bcd4c103bf684fba528ae4279c7ec9f/system_info.go#L50 + skipOnDist: set.NewStringSet("windows"), + }, + } + + for _, cas := range cases { + if cas.skipOnDist.Exist(runtime.GOOS+"/"+runtime.GOARCH) || cas.skipOnDist.Exist(runtime.GOOS) { + continue + } + + result := tk.MustQuery(cas.sql) + rows := result.Rows() + require.Greater(t, len(rows), 0) + + gotTypes := set.StringSet{} + gotAddrs := set.StringSet{} + gotNames := set.StringSet{} + + for _, row := range rows { + gotTypes.Insert(row[0].(string)) + gotAddrs.Insert(row[1].(string)) + gotNames.Insert(row[2].(string)) + } + + require.Equalf(t, cas.types, gotTypes, "sql: %s", cas.sql) + require.Equalf(t, cas.addrs, gotAddrs, "sql: %s", cas.sql) + require.Equalf(t, cas.names, gotNames, "sql: %s", cas.sql) + } +} + +func TestTestDataLockWaits(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + _, digest1 := parser.NormalizeDigest("select * from test_data_lock_waits for update") + _, digest2 := parser.NormalizeDigest("update test_data_lock_waits set f1=1 where id=2") + s.store.(mockstorage.MockLockWaitSetter).SetMockLockWaits([]*deadlock.WaitForEntry{ + {Txn: 1, WaitForTxn: 2, Key: []byte("key1"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)}, + {Txn: 3, WaitForTxn: 4, Key: []byte("key2"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)}, + // Invalid digests + {Txn: 5, WaitForTxn: 6, Key: []byte("key3"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(nil, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)}, + {Txn: 7, WaitForTxn: 8, Key: []byte("key4"), ResourceGroupTag: []byte("asdfghjkl")}, + }) + tk := s.newTestKitWithRoot(t) + + // Execute one of the query once, so it's stored into statements_summary. + tk.MustExec("create table test_data_lock_waits (id int primary key, f1 int)") + tk.MustExec("select * from test_data_lock_waits for update") + + tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS").Check(testkit.Rows( + "6B657931 1 2 "+digest1.String()+" select * from `test_data_lock_waits` for update", + "6B657932 3 4 "+digest2.String()+" ", + "6B657933 5 6 ", + "6B657934 7 8 ")) +} + +func TestDataLockWaitsPrivilege(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + dropUserTk := s.newTestKitWithRoot(t) + + tk := s.newTestKitWithRoot(t) + + tk.MustExec("create user 'testuser'@'localhost'") + defer dropUserTk.MustExec("drop user 'testuser'@'localhost'") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: "testuser", + Hostname: "localhost", + }, nil, nil, nil)) + err := tk.QueryToErr("select * from information_schema.DATA_LOCK_WAITS") + require.EqualError(t, err, "[planner:1227]Access denied; you need (at least one of) the PROCESS privilege(s) for this operation") + + tk = s.newTestKitWithRoot(t) + tk.MustExec("create user 'testuser2'@'localhost'") + defer dropUserTk.MustExec("drop user 'testuser2'@'localhost'") + tk.MustExec("grant process on *.* to 'testuser2'@'localhost'") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: "testuser2", + Hostname: "localhost", + }, nil, nil, nil)) + _ = tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS") +} + +func TestSelectClusterTable(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{ + { + ID: 1, + User: "root", + Host: "127.0.0.1", + Command: mysql.ComQuery, + SessionAlias: "alias456", + }, + }, + }) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := s.newTestKitWithRoot(t) + slowLogFileName := "tidb-slow0.log" + internal.PrepareSlowLogfile(t, slowLogFileName) + defer func() { require.NoError(t, os.Remove(slowLogFileName)) }() + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowQueryFile = slowLogFileName + }) + + tk.MustExec("use information_schema") + tk.MustExec("set @@global.tidb_enable_stmt_summary=1") + tk.MustExec("set time_zone = '+08:00';") + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2")) + tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testkit.RowsWithSep("|", "2019-02-12 19:33:56.571953")) + tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1")) + // skip instance and host column because it now includes the TCP socket details (unstable) + tk.MustQuery("select id, user, db, command, time, state, info, digest, mem, disk, txnstart, session_alias from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf("1 root Query 9223372036 %s 0 0 alias456", ""))) + tk.MustQuery("select query_time, conn_id, session_alias from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6 ")) + tk.MustQuery("select query_time, conn_id, session_alias from `CLUSTER_SLOW_QUERY` order by time desc limit 1").Check(testkit.Rows("25.571605962 40507 alias123")) + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1")) + tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1")) + tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953", "16 2021-09-08 14:39:54.506967")) + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows()) + re := tk.MustQuery("select * from `CLUSTER_statements_summary`") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + re = tk.MustQuery("select * from `CLUSTER_statements_summary` where table_names REGEXP '\\binformation_schema\\.'") + require.NotNil(t, re) + require.Equal(t, len(re.Rows()), 0) + re = tk.MustQuery("select * from `CLUSTER_statements_summary` where table_names REGEXP 'information_schema\\.'") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + // Test for TiDB issue 14915. + re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + tk.MustQuery("select * from `CLUSTER_statements_summary_history`") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + tk.MustExec("set @@global.tidb_enable_stmt_summary=0") + re = tk.MustQuery("select * from `CLUSTER_statements_summary`") + require.NotNil(t, re) + require.Equal(t, 0, len(re.Rows())) + tk.MustQuery("select * from `CLUSTER_statements_summary_history`") + require.NotNil(t, re) + require.Equal(t, 0, len(re.Rows())) + + // Test for https://github.com/pingcap/tidb/issues/33974 + instanceAddr, err := infoschema.GetInstanceAddr(tk.Session()) + require.NoError(t, err) + tk.MustQuery("select instance from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testkit.Rows(instanceAddr)) +} + +func TestSelectClusterTablePrivilege(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := testkit.NewTestKit(t, s.store) + slowLogFileName := "tidb-slow.log" + f, err := os.OpenFile(slowLogFileName, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + _, err = f.Write([]byte( + `# Time: 2019-02-12T19:33:57.571953+08:00 +# User@Host: user2 [user2] @ 127.0.0.1 [127.0.0.1] +select * from t2; +# Time: 2019-02-12T19:33:56.571953+08:00 +# User@Host: user1 [user1] @ 127.0.0.1 [127.0.0.1] +select * from t1; +# Time: 2019-02-12T19:33:58.571953+08:00 +# User@Host: user2 [user2] @ 127.0.0.1 [127.0.0.1] +select * from t3; +# Time: 2019-02-12T19:33:59.571953+08:00 +select * from t3; +`)) + require.NoError(t, f.Close()) + require.NoError(t, err) + defer func() { require.NoError(t, os.Remove(slowLogFileName)) }() + tk.MustExec("use information_schema") + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("4")) + tk.MustQuery("select count(*) from `SLOW_QUERY`").Check(testkit.Rows("4")) + tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1")) + tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf( + ":10080 1 root 127.0.0.1 Query 9223372036 %s 0 0 0 0", ""))) + tk.MustExec("create user user1") + tk.MustExec("create user user2") + user1 := testkit.NewTestKit(t, s.store) + user1.MustExec("use information_schema") + require.NoError(t, user1.Session().Auth(&auth.UserIdentity{ + Username: "user1", + Hostname: "127.0.0.1", + }, nil, nil, nil)) + user1.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("1")) + user1.MustQuery("select count(*) from `SLOW_QUERY`").Check(testkit.Rows("1")) + user1.MustQuery("select user,query from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("user1 select * from t1;")) + + user2 := testkit.NewTestKit(t, s.store) + user2.MustExec("use information_schema") + require.NoError(t, user2.Session().Auth(&auth.UserIdentity{ + Username: "user2", + Hostname: "127.0.0.1", + }, nil, nil, nil)) + user2.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2")) + user2.MustQuery("select user,query from `CLUSTER_SLOW_QUERY` order by query").Check(testkit.Rows("user2 select * from t2;", "user2 select * from t3;")) +} + +func TestStmtSummaryEvictedCountTable(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + tk := s.newTestKitWithRoot(t) + // disable refreshing + tk.MustExec("set global tidb_stmt_summary_refresh_interval=9999") + // set information_schema.statements_summary's size to 2 + tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 2") + // no evict happened, no record in cluster evicted table. + tk.MustQuery("select count(*) from information_schema.cluster_statements_summary_evicted;").Check(testkit.Rows("0")) + tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 1") + // cleanup side effects + defer tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 100") + defer tk.MustExec("set global tidb_stmt_summary_refresh_interval = 1800") + // clear information_schema.statements_summary + tk.MustExec("set global tidb_enable_stmt_summary=0") + // statements_summary is off, statements_summary_evicted is empty. + tk.MustQuery("select count(*) from information_schema.cluster_statements_summary_evicted;").Check(testkit.Rows("0")) + tk.MustExec("set global tidb_enable_stmt_summary=1") + + // make a new session for test... + tk = s.newTestKitWithRoot(t) + // first sql + tk.MustExec("show databases;") + // second sql, evict former sql from stmt_summary + tk.MustQuery("select evicted_count from information_schema.cluster_statements_summary_evicted;"). + Check(testkit.Rows("1")) + // after executed the sql above + tk.MustQuery("select evicted_count from information_schema.cluster_statements_summary_evicted;"). + Check(testkit.Rows("2")) + // TODO: Add more tests. + + tk.MustExec("create user 'testuser'@'localhost'") + tk.MustExec("create user 'testuser2'@'localhost'") + tk.MustExec("grant process on *.* to 'testuser2'@'localhost'") + tk1 := s.newTestKitWithRoot(t) + defer tk1.MustExec("drop user 'testuser'@'localhost'") + defer tk1.MustExec("drop user 'testuser2'@'localhost'") + + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: "testuser", + Hostname: "localhost", + }, nil, nil, nil)) + + err := tk.QueryToErr("select * from information_schema.CLUSTER_STATEMENTS_SUMMARY_EVICTED") + // This error is come from cop(TiDB) fetch from rpc server. + require.ErrorContains(t, err, "other error: [planner:1227]Access denied; you need (at least one of) the PROCESS privilege(s) for this operation") + + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: "testuser2", + Hostname: "localhost", + }, nil, nil, nil)) + require.NoError(t, tk.QueryToErr("select * from information_schema.CLUSTER_STATEMENTS_SUMMARY_EVICTED")) +} + +func TestStmtSummaryIssue35340(t *testing.T) { + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + + tk := s.newTestKitWithRoot(t) + tk.MustExec("set global tidb_stmt_summary_refresh_interval=1800") + tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000") + for i := 0; i < 100; i++ { + user := "user" + strconv.Itoa(i) + tk.MustExec(fmt.Sprintf("create user '%v'@'localhost'", user)) + } + tk.MustExec("flush privileges") + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tk := s.newTestKitWithRoot(t) + for j := 0; j < 100; j++ { + user := "user" + strconv.Itoa(j) + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: user, + Hostname: "localhost", + }, nil, nil, nil)) + tk.MustQuery("select count(*) from information_schema.statements_summary;") + } + }() + } + wg.Wait() +} + +func TestStmtSummaryHistoryTableWithUserTimezone(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + tk := s.newTestKitWithRoot(t) + tk.MustExec("drop table if exists test_summary") + tk.MustExec("create table test_summary(a int, b varchar(10), key k(a))") + + tk.MustExec("set global tidb_enable_stmt_summary = 1") + tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) + + // Disable refreshing summary. + tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") + tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) + + // Create a new session to test. + tk = s.newTestKitWithRoot(t) + tk.MustExec("use test;") + tk.MustExec("set time_zone = '+08:00';") + tk.MustExec("select sleep(0.1);") + r := tk.MustQuery("select FIRST_SEEN, LAST_SEEN, SUMMARY_BEGIN_TIME, SUMMARY_END_TIME from INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY order by LAST_SEEN limit 1;") + date8First, err := time.Parse(time.DateTime, r.Rows()[0][0].(string)) + require.NoError(t, err) + date8Last, err := time.Parse(time.DateTime, r.Rows()[0][1].(string)) + require.NoError(t, err) + date8Begin, err := time.Parse(time.DateTime, r.Rows()[0][2].(string)) + require.NoError(t, err) + date8End, err := time.Parse(time.DateTime, r.Rows()[0][3].(string)) + require.NoError(t, err) + tk.MustExec("set time_zone = '+01:00';") + r = tk.MustQuery("select FIRST_SEEN, LAST_SEEN, SUMMARY_BEGIN_TIME, SUMMARY_END_TIME from INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY order by LAST_SEEN limit 1;") + date1First, err := time.Parse(time.DateTime, r.Rows()[0][0].(string)) + require.NoError(t, err) + date1Last, err := time.Parse(time.DateTime, r.Rows()[0][1].(string)) + require.NoError(t, err) + date1Begin, err := time.Parse(time.DateTime, r.Rows()[0][2].(string)) + require.NoError(t, err) + date1End, err := time.Parse(time.DateTime, r.Rows()[0][3].(string)) + require.NoError(t, err) + + require.Less(t, date1First.Unix(), date8First.Unix()) + require.Less(t, date1Last.Unix(), date8Last.Unix()) + require.Less(t, date1Begin.Unix(), date8Begin.Unix()) + require.Less(t, date1End.Unix(), date8End.Unix()) +} + +func TestStmtSummaryHistoryTable(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + tk := s.newTestKitWithRoot(t) + tk.MustExec("drop table if exists test_summary") + tk.MustExec("create table test_summary(a int, b varchar(10), key k(a))") + + tk.MustExec("set global tidb_enable_stmt_summary = 1") + tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) + + // Disable refreshing summary. + tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") + tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) + + // Create a new session to test. + tk = s.newTestKitWithRoot(t) + + // Test INSERT + tk.MustExec("insert into test_summary values(1, 'a')") + tk.MustExec("insert into test_summary values(2, 'b')") + tk.MustExec("insert into TEST_SUMMARY VALUES(3, 'c')") + tk.MustExec("/**/insert into test_summary values(4, 'd')") + + sql := "select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys," + + "max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions," + + "max_prewrite_regions, avg_affected_rows, query_sample_text " + + "from information_schema.statements_summary_history " + + "where digest_text like 'insert into `test_summary`%'" + tk.MustQuery(sql).Check(testkit.Rows("Insert test test.test_summary 4 0 0 0 0 0 2 2 1 1 1 insert into test_summary values(1, 'a')")) + + tk.MustExec("set global tidb_stmt_summary_history_size = 0") + tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys, + max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions, + max_prewrite_regions, avg_affected_rows, query_sample_text, plan + from information_schema.statements_summary_history`, + ).Check(testkit.Rows()) + + tk.MustExec("set global tidb_enable_stmt_summary = 0") + tk.MustExec("drop table if exists `table`") + tk.MustExec("set global tidb_stmt_summary_history_size = 1") + tk.MustExec("set global tidb_enable_stmt_summary = 1") + tk.MustExec("create table `table`(`insert` int)") + tk.MustExec("select `insert` from `table`") + + sql = "select digest_text from information_schema.statements_summary_history;" + tk.MustQuery(sql).Check(testkit.Rows( + "select `insert` from `table`", + "create table `table` ( `insert` int )", + "set global `tidb_enable_stmt_summary` = ?", + )) +} + +func TestIssue26379(t *testing.T) { + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := s.newTestKitWithRoot(t) + + // Clear all statements. + tk.MustExec("set global tidb_enable_stmt_summary = 0") + tk.MustExec("set global tidb_enable_stmt_summary = 1") + tk.MustExec("set @@global.tidb_stmt_summary_max_stmt_count=10") + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10), c int, d int, key k(a))") + + _, digest1 := parser.NormalizeDigest("select * from t where a = 3") + _, digest2 := parser.NormalizeDigest("select * from t where b = 'b'") + _, digest3 := parser.NormalizeDigest("select * from t where c = 6") + _, digest4 := parser.NormalizeDigest("select * from t where d = 5") + fillStatementCache := func() { + tk.MustQuery("select * from t where a = 3") + tk.MustQuery("select * from t where b = 'b'") + tk.MustQuery("select * from t where c = 6") + tk.MustQuery("select * from t where d = 5") + } + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest1.String())).Check(testkit.Rows(digest1.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest1.String())).Check(testkit.Rows(digest1.String())) + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest2.String())).Check(testkit.Rows(digest2.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest2.String())).Check(testkit.Rows(digest2.String())) + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest3.String())).Check(testkit.Rows(digest3.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest3.String())).Check(testkit.Rows(digest3.String())) + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest4.String())).Check(testkit.Rows(digest4.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest4.String())).Check(testkit.Rows(digest4.String())) + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s' or digest = '%s'", digest1.String(), digest2.String())).Sort().Check(testkit.Rows(digest1.String(), digest2.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' or digest = '%s'", digest1.String(), digest2.String())).Sort().Check(testkit.Rows(digest1.String(), digest2.String())) + re := tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' and digest = '%s'", digest1.String(), digest2.String())) + require.Equal(t, 0, len(re.Rows())) + re = tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' and digest = '%s'", digest1.String(), digest2.String())) + require.Equal(t, 0, len(re.Rows())) + fillStatementCache() + tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest in ('%s', '%s', '%s', '%s')", digest1.String(), digest2.String(), digest3.String(), digest4.String())).Sort().Check(testkit.Rows(digest1.String(), digest4.String(), digest2.String(), digest3.String())) + tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest in ('%s', '%s', '%s', '%s')", digest1.String(), digest2.String(), digest3.String(), digest4.String())).Sort().Check(testkit.Rows(digest1.String(), digest4.String(), digest2.String(), digest3.String())) + fillStatementCache() + tk.MustQuery("select count(*) from information_schema.statements_summary where digest=''").Check(testkit.Rows("0")) + tk.MustQuery("select count(*) from information_schema.statements_summary where digest is null").Check(testkit.Rows("1")) + tk.MustQuery("select count(*) from information_schema.cluster_statements_summary where digest=''").Check(testkit.Rows("0")) + tk.MustQuery("select count(*) from information_schema.cluster_statements_summary where digest is null").Check(testkit.Rows("1")) +} + +func TestStmtSummaryResultRows(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + + tk := s.newTestKitWithRoot(t) + tk.MustExec("set global tidb_stmt_summary_refresh_interval=999999999") + tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000") + tk.MustExec("set global tidb_stmt_summary_history_size=24") + tk.MustExec("set global tidb_stmt_summary_max_sql_length=4096") + tk.MustExec("set global tidb_enable_stmt_summary=0") + tk.MustExec("set global tidb_enable_stmt_summary=1") + if !config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() { + tk.MustExec("set @@tidb_enable_collect_execution_info=1") + defer tk.MustExec("set @@tidb_enable_collect_execution_info=0") + } + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + for i := 1; i <= 30; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%v)", i)) + } + + tk.MustQuery("select * from test.t limit 10;") + tk.MustQuery("select * from test.t limit 20;") + tk.MustQuery("select * from test.t limit 30;") + tk.MustQuery("select MIN_RESULT_ROWS,MAX_RESULT_ROWS,AVG_RESULT_ROWS from information_schema.statements_summary where query_sample_text like 'select%test.t limit%' and MAX_RESULT_ROWS > 10"). + Check(testkit.Rows("10 30 20")) + tk.MustQuery("select MIN_RESULT_ROWS,MAX_RESULT_ROWS,AVG_RESULT_ROWS from information_schema.cluster_statements_summary where query_sample_text like 'select%test.t limit%' and MAX_RESULT_ROWS > 10"). + Check(testkit.Rows("10 30 20")) +} + +func TestSlowQueryOOM(t *testing.T) { + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := s.newTestKitWithRoot(t) + + f, err := os.CreateTemp("", "tidb-slow-*.log") + require.NoError(t, err) + _, err = f.WriteString(` +# Time: 2022-04-14T10:50:28.185954+08:00 +# Txn_start_ts: 432512598850928660 +# User@Host: root[root] @ 127.0.0.1 [127.0.0.1] +# Conn_ID: 465 +# Query_time: 0.000955269 +# Parse_time: 0 +# Compile_time: 0.000486719 +# Rewrite_time: 0.000142467 +# Optimize_time: 0.000312527 +# Wait_TS: 0.000004489 +# Cop_time: 0.000169235 Request_count: 2 +# DB: test +# Index_names: [t_normal_oltp:idx0] +# Is_internal: false +# Digest: dcb13f841a568ec94baec50c88d0679c533bbd65539ba8fee6deb2e39881acdd +# Stats: t_normal_oltp:432512598027796484 +# Num_cop_tasks: 2 +# Cop_proc_avg: 0 Cop_proc_p90: 0 Cop_proc_max: 0 Cop_proc_addr: store1 +# Cop_wait_avg: 0 Cop_wait_p90: 0 Cop_wait_max: 0 Cop_wait_addr: store1 +# Mem_max: 11372 +# Prepared: true +# Plan_from_cache: false +# Plan_from_binding: false +# Has_more_results: false +# KV_total: 0 +# PD_total: 0.000000671 +# Backoff_total: 0 +# Write_sql_response_total: 0.000000606 +# Result_rows: 1 +# Succ: true +# IsExplicitTxn: false +# Plan: tidb_decode_plan('lQeAMAk1XzEwCTAJMQlmdW5jczpzdW0oQ29sdW1uIzEyKS0+DQzwUjYJMQl0aW1lOjQyMS45wrVzLCBsb29wczoyCTEuNDUgS0IJTi9BCjEJM18yNwkwCTAJY2FzdChwbHVzKHRlc3QudF9ub3JtYWxfb2x0cC5hLCB0RhYAXGIpLCBkZWNpbWFsKDIwLDApIEJJTkFSWRmHDDEyCTERiQQxOC6HAGwsIENvbmN1cnJlbmN5Ok9GRgk3NjAgQnl0ZXMJAZoYMgkzMF8yNAWbGUQMMDkuNzZGAEhpbmRleF90YXNrOiB7dG90YWxfBfgUIDEwMS4yBSwsZmV0Y2hfaGFuZGxlARgIMC4xBRiAYnVpbGQ6IDQ2OW5zLCB3YWl0OiA1OTVuc30sIHRhYmxlTlcADDI1Ny4pUCBudW06IDEsIGMdyBwgNX0JOC45MTFgODMJNDdfMjIJMV8wCTAJdAFVADoyWgEALAnBwDppZHgwKGEpLCByYW5nZTpbNjY4Mzk4LDY2ODQwOF0sIGtlZXAgb3JkZXI6ZmFsc2U1Ewg5Ni4uWAEAMwGQAHARuRGjGG1heDogNzQF9kRwcm9jX2tleXM6IDAsIHJwY18RJgEMKTwENjMN5GRjb3ByX2NhY2hlX2hpdF9yYXRpbzogMC4wMCEkCGlrdglqAHsFNwA1LokAFDB9CU4vQQEEIQUMNV8yM24FAWbfAAA1LcVNvmrfAAAwot8ADDU5LjYFLbbfAAQzLlrhAA==') +# Plan_digest: e7b1a5789200cb6d91aaac8af3f5560af51870369bac2e247b84fe9b5e754cbe +select sum(a+b) from test.t_normal_oltp where a >= ? and a <= ? [arguments: (668398, 668408)]; +# Time: 2022-04-14T10:50:28.185987+08:00 +select * from t; +# Time: 2022-04-14T10:50:28.186028+08:00 +select * from t1; +`) + require.NoError(t, err) + require.NoError(t, f.Close()) + executor.ParseSlowLogBatchSize = 1 + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + newCfg.Log.SlowQueryFile = f.Name() + config.StoreGlobalConfig(&newCfg) + defer func() { + executor.ParseSlowLogBatchSize = 64 + config.StoreGlobalConfig(originCfg) + require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile)) + }() + // The server default is CANCEL, but the testsuite defaults to LOG + tk.MustExec("set global tidb_mem_oom_action='CANCEL'") + defer tk.MustExec("set global tidb_mem_oom_action='LOG'") + tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name())) + // Align with the timezone in slow log files + tk.MustExec("set @@time_zone='+08:00'") + checkFn := func(quota int) { + tk.MustExec("set tidb_mem_quota_query=" + strconv.Itoa(quota)) // session + + err = tk.QueryToErr("select * from `information_schema`.`slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'") + require.Error(t, err, quota) + require.True(t, exeerrors.ErrMemoryExceedForQuery.Equal(err)) + } + memQuotas := []int{128, 512, 1024, 2048, 4096} + for _, quota := range memQuotas { + checkFn(quota) + } + for i := 0; i < 100; i++ { + quota := rand.Int()%8192 + 1 + checkFn(quota) + } + + newMemQuota := 1024 * 1024 * 1024 + tk.MustExec("set @@tidb_mem_quota_query=" + strconv.Itoa(newMemQuota)) + tk.MustQuery("select * from `information_schema`.`slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'") + mem := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed() + require.Equal(t, mem, int64(0)) + tk.MustQuery("select * from `information_schema`.`cluster_slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'") +} + +func (s *clusterTablesSuite) setUpRPCService(t *testing.T, addr string, sm util.SessionManager) (*grpc.Server, string) { + lis, err := net.Listen("tcp", addr) + require.NoError(t, err) + // Fix issue 9836 + if sm == nil { + sm = &testkit.MockSessionManager{PS: make([]*util.ProcessInfo, 1)} + sm.(*testkit.MockSessionManager).PS[0] = &util.ProcessInfo{ + ID: 1, + User: "root", + Host: "127.0.0.1", + Command: mysql.ComQuery, + } + } + srv := server.NewRPCServer(config.GetGlobalConfig(), s.dom, sm) + port := lis.Addr().(*net.TCPAddr).Port + addr = fmt.Sprintf("127.0.0.1:%d", port) + go func() { + err = srv.Serve(lis) + require.NoError(t, err) + }() + config.UpdateGlobal(func(conf *config.Config) { + conf.Status.StatusPort = uint(port) + conf.AdvertiseAddress = "127.0.0.1" + }) + return srv, addr +} + +func (s *clusterTablesSuite) setUpMockPDHTTPServer() (*httptest.Server, string) { + // mock PD http server + router := mux.NewRouter() + srv := httptest.NewServer(router) + // mock store stats stat + mockAddr := strings.TrimPrefix(srv.URL, "http://") + router.Handle(pd.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { + return &pd.StoresInfo{ + Count: 1, + Stores: []pd.StoreInfo{ + { + Store: pd.MetaStore{ + ID: 1, + Address: "127.0.0.1:20160", + State: 0, + StateName: "Up", + Version: "4.0.0-alpha", + StatusAddress: mockAddr, + GitHash: "mock-tikv-githash", + StartTimestamp: s.startTime.Unix(), + }, + }, + }, + }, nil + })) + // mock PD API + router.Handle(pd.Status, fn.Wrap(func() (any, error) { + return struct { + Version string `json:"version"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` + }{ + Version: "4.0.0-alpha", + GitHash: "mock-pd-githash", + StartTimestamp: s.startTime.Unix(), + }, nil + })) + var mockConfig = func() (map[string]any, error) { + configuration := map[string]any{ + "key1": "value1", + "key2": map[string]string{ + "nest1": "n-value1", + "nest2": "n-value2", + }, + "key3": map[string]any{ + "nest1": "n-value1", + "nest2": "n-value2", + "key4": map[string]string{ + "nest3": "n-value4", + "nest4": "n-value5", + }, + }, + } + return configuration, nil + } + // pd config + router.Handle(pd.Config, fn.Wrap(mockConfig)) + // TiDB/TiKV config + router.Handle("/config", fn.Wrap(mockConfig)) + return srv, mockAddr +} + +func (s *clusterTablesSuite) newTestKitWithRoot(t *testing.T) *testkit.TestKit { + tk := testkit.NewTestKit(t, s.store) + tk.MustExec("use test") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + return tk +} + +func TestMDLView(t *testing.T) { + testCases := []struct { + name string + createTable []string + ddl string + queryInTxn []string + sqlDigest string + }{ + {"add column", []string{"create table t(a int)"}, "alter table test.t add column b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"}, + {"change column in 1 step", []string{"create table t(a int)"}, "alter table test.t change column a b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"}, + {"rename tables", []string{"create table t(a int)", "create table t1(a int)"}, "rename table test.t to test.t2, test.t1 to test.t3", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"}, + {"err don't show rollbackdone ddl", []string{"create table t(a int)", "insert into t values (1);", "insert into t values (1);", "alter table t add unique idx(id);"}, "alter table test.t add column b int", []string{"select 1", "select * from t"}, "[\"begin\",\"select ?\",\"select * from `t`\"]"}, + } + save := privileges.SkipWithGrant + privileges.SkipWithGrant = true + defer func() { + privileges.SkipWithGrant = save + }() + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + // setup suite + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + + tk := s.newTestKitWithRoot(t) + tkDDL := s.newTestKitWithRoot(t) + tk3 := s.newTestKitWithRoot(t) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_metadata_lock=1") + for _, cr := range c.createTable { + if strings.Contains(c.name, "err") { + _, _ = tk.Exec(cr) + } else { + tk.MustExec(cr) + } + } + + tk.MustExec("begin") + for _, q := range c.queryInTxn { + tk.MustQuery(q) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + tkDDL.MustExec(c.ddl) + wg.Done() + }() + + time.Sleep(200 * time.Millisecond) + + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", tk3.Session().GetSessionManager()) + defer s.rpcserver.Stop() + + tk3.MustQuery("select DB_NAME, QUERY, SQL_DIGESTS from mysql.tidb_mdl_view").Check(testkit.Rows( + strings.Join([]string{ + "test", + c.ddl, + c.sqlDigest, + }, " "), + )) + + tk.MustExec("commit") + + wg.Wait() + }) + } +} + +func TestMDLViewWithNoPrivilege(t *testing.T) { + // It's with TestMDLViewWithPrivilege. Split to two tests just because it runs too much time. + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + tk.MustQuery("select * from mysql.tidb_mdl_view;").Check(testkit.Rows()) + tk.MustExec("create user 'test'@'%' identified by '';") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "test", Hostname: "%"}, nil, nil, nil)) + _, err := tk.Exec("select * from mysql.tidb_mdl_view;") + require.ErrorContains(t, err, "view lack rights") +} + +func TestMDLViewWithPrivilege(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + // grant all privileges to test user. + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + tk.MustExec("create user 'test'@'%' identified by '';") + tk.MustExec("grant all privileges on *.* to 'test'@'%';") + tk.MustExec("flush privileges;") + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "test", Hostname: "%"}, nil, nil, nil)) + tk.MustQuery("select * from mysql.tidb_mdl_view;").Check(testkit.Rows()) +} + +func TestQuickBinding(t *testing.T) { + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := s.newTestKitWithRoot(t) + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil, nil)) + + tk.MustExec("set tidb_opt_projection_push_down = 0") + tk.MustExec("use test") + tk.MustExec(`create table t1 (pk int, a int, b int, c int, primary key(pk), key k_a(a), key k_bc(b, c))`) + tk.MustExec(`create table t2 (a int, b int, c int, key k_a(a), key k_bc(b, c))`) // no primary key + tk.MustExec(`create table t3 (a int, b int, c int, key k_a(a), key k_bc(b, c))`) + tk.MustExec(`create table t4 (a int, b int, c int, key k_a(a), key k_bc(b, c))`) + + type testCase struct { + template string + expectedHint string + dmlAndSubqueryTemplates []string + } + subQueryTemp := []string{ + "select a from (?) tx where tx.c<100", + "select * from (?) tx1, (?) tx2 where tx1.c<100 and tx2.c<100", + } + testCases := []testCase{ + // access path selection with use_index / ignore_index + {`select /*+ use_index(t1, k_a) */ * from t1 where b=?`, "use_index(@`sel_1` `test`.`t1` `k_a`), no_order_index(@`sel_1` `test`.`t1` `k_a`)", subQueryTemp}, + {`select /*+ use_index(t1, k_bc) */ * from t1 where a=?`, "use_index(@`sel_1` `test`.`t1` `k_bc`), no_order_index(@`sel_1` `test`.`t1` `k_bc`)", subQueryTemp}, + {`select /*+ use_index(t1, primary) */ * from t1 where a=? and b=?`, "use_index(@`sel_1` `test`.`t1` ), no_order_index(@`sel_1` `test`.`t1` `primary`)", subQueryTemp}, + {`select /*+ ignore_index(t1, k_a, k_bc) */ * from t1 where a=? and b=?`, "use_index(@`sel_1` `test`.`t1` ), no_order_index(@`sel_1` `test`.`t1` `primary`), ignore_index(`t1` `k_a`, `k_bc`)", subQueryTemp}, + {`select /*+ use_index(t1) */ * from t1 where a=? and b=?`, "use_index(@`sel_1` `test`.`t1` ), no_order_index(@`sel_1` `test`.`t1` `primary`)", subQueryTemp}, + {`select /*+ use_index(t2) */ * from t2 where a=? and b=?`, "use_index(@`sel_1` `test`.`t2` )", subQueryTemp}, + + // aggregation + {`select /*+ hash_agg(), use_index(t1, primary), agg_to_cop() */ count(*) from t1 where a= 0 and id1 < 10") + // range scan 10-30 through t1 id2 + tk.MustQuery("select * from t1 use index(id2) where id2 >= 10 and id2 < 30") + // range scan 30-60 through t2 id1 + tk.MustQuery("select * from t2 use index(id1) where id1 >= 30 and id1 < 60") + // range scan 60-100 through t2 id2 + tk.MustQuery("select * from t2 use index(id2) where id2 >= 60 and id2 < 100") + tk.RefreshSession() + + require.Eventually(t, func() bool { + result := tk.MustQuery(fmt.Sprintf(`select + query_total, + rows_access_total, + percentage_access_0, + percentage_access_0_1, + percentage_access_1_10, + percentage_access_10_20, + percentage_access_20_50, + percentage_access_50_100, + percentage_access_100 + from information_schema.%s + where table_schema='test' and + (table_name='t1' or table_name='t2') and + (index_name='id1' or index_name='id2') and + last_access_time is not null + order by table_name, index_name;`, tableName)) + expectedResult := testkit.Rows( + "1 10 0 0 0 1 0 0 0", + "1 20 0 0 0 0 1 0 0", + "1 30 0 0 0 0 1 0 0", + "1 40 0 0 0 0 1 0 0") + if !result.Equal(expectedResult) { + logutil.BgLogger().Warn("result not equal", zap.Any("rows", result.Rows())) + return false + } + return true + }, time.Second*5, time.Millisecond*100) + + // use another less-privileged user to select + tk.MustExec("create user test_user") + tk.MustExec("grant all privileges on test.t1 to test_user") + tk.RefreshSession() + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{ + Username: "test_user", + Hostname: "127.0.0.1", + }, nil, nil, nil)) + // `test_user` cannot see table `t2`. + tk.MustQuery(fmt.Sprintf(`select + query_total, + rows_access_total, + percentage_access_0, + percentage_access_0_1, + percentage_access_1_10, + percentage_access_10_20, + percentage_access_20_50, + percentage_access_50_100, + percentage_access_100 + from information_schema.%s + where table_schema='test' and + (table_name='t1' or table_name='t2') and + (index_name='id1' or index_name='id2') and + last_access_time is not null + order by table_name, index_name;`, tableName)).Check(testkit.Rows( + "1 10 0 0 0 1 0 0 0", + "1 20 0 0 0 0 1 0 0")) +} + +func TestIndexUsageTable(t *testing.T) { + testIndexUsageTable(t, false) +} + +func TestClusterIndexUsageTable(t *testing.T) { + testIndexUsageTable(t, true) +} + +func TestUnusedIndexView(t *testing.T) { + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", nil) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + defer s.rpcserver.Stop() + tk := s.newTestKitWithRoot(t) + + tk.MustExec("use test") + tk.MustExec("create table t(id1 int unique, id2 int unique)") + for i := 0; i < 100; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("analyze table t") + tk.RefreshSession() + tk.MustExec("use test") + // range scan 0-10 through t1 id1 + tk.MustQuery("select * from t use index(id1) where id1 >= 0 and id1 < 10") + tk.MustHavePlan("select * from t use index(id1) where id1 >= 0 and id1 < 10", "IndexLookUp") + tk.RefreshSession() + // the index `id2` is unused + require.Eventually(t, func() bool { + result := tk.MustQuery(`select * from sys.schema_unused_indexes where object_name = 't'`) + logutil.BgLogger().Info("select schema_unused_indexes", zap.Any("row", result.Rows())) + expectedResult := testkit.Rows("test t id2") + return result.Equal(expectedResult) + }, 5*time.Second, 100*time.Millisecond) +} + +func TestMDLViewIDConflict(t *testing.T) { + save := privileges.SkipWithGrant + privileges.SkipWithGrant = true + defer func() { + privileges.SkipWithGrant = save + }() + + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + tk := s.newTestKitWithRoot(t) + + tk.MustExec("use test") + tk.MustExec("create table t(a int);") + tbl, err := s.dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tk.MustExec("insert into t values (1)") + + bigID := tbl.Meta().ID * 10 + bigTableName := "" + // set a hard limitation on 10000 to avoid using too much resource + for i := 0; i < 10000; i++ { + bigTableName = fmt.Sprintf("t%d", i) + tk.MustExec(fmt.Sprintf("create table %s(a int);", bigTableName)) + + tbl, err := s.dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr(bigTableName)) + require.NoError(t, err) + + require.LessOrEqual(t, tbl.Meta().ID, bigID) + if tbl.Meta().ID == bigID { + break + } + } + tk.MustExec("insert into t1 values (1)") + tk.MustExec(fmt.Sprintf("insert into %s values (1)", bigTableName)) + + // Now we have two table: t and `bigTableName`. The later one's ID is 10 times the former one. + // Then create two session to run TXNs on these two tables + txnTK1 := s.newTestKitWithRoot(t) + txnTK2 := s.newTestKitWithRoot(t) + txnTK1.MustExec("use test") + txnTK1.MustExec("BEGIN") + // this transaction will query `t` and one another table. Then the `related_table_ids` is `smallID|anotherID` + txnTK1.MustQuery("SELECT * FROM t").Check(testkit.Rows("1")) + txnTK1.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1")) + txnTK2.MustExec("use test") + txnTK2.MustExec("BEGIN") + txnTK2.MustQuery("SELECT * FROM " + bigTableName).Check(testkit.Rows("1")) + + testTK := s.newTestKitWithRoot(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", testTK.Session().GetSessionManager()) + defer s.rpcserver.Stop() + testTK.MustQuery("select table_name from mysql.tidb_mdl_view").Check(testkit.Rows()) + + // run a DDL on the table with smallID + ddlTK1 := s.newTestKitWithRoot(t) + ddlTK1.MustExec("use test") + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + ddlTK1.MustExec("ALTER TABLE t ADD index(a);") + wg.Done() + }() + ddlTK2 := s.newTestKitWithRoot(t) + ddlTK2.MustExec("use test") + wg.Add(1) + go func() { + ddlTK2.MustExec(fmt.Sprintf("ALTER TABLE %s ADD index(a);", bigTableName)) + wg.Done() + }() + + require.Eventually(t, func() bool { + rows := testTK.MustQuery("select table_ids from mysql.tidb_mdl_info").Rows() + return len(rows) == 2 + }, time.Second*10, time.Second) + + // it only contains the table with smallID + require.Eventually(t, func() bool { + rows := testTK.MustQuery("select table_name, query, start_time from mysql.tidb_mdl_view order by table_name").Rows() + return len(rows) == 2 + }, time.Second*10, time.Second) + txnTK1.MustExec("COMMIT") + txnTK2.MustExec("COMMIT") + wg.Wait() +} diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index 07ad325f92163..987d6222139f5 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -1191,22 +1191,19 @@ func (e *SlowQueryExtractor) Extract( } func (e *SlowQueryExtractor) setTimeRange(start, end int64) { - const defaultSlowQueryDuration = 24 * time.Hour - var startTime, endTime time.Time if start == 0 && end == 0 { return } + var startTime, endTime time.Time if start != 0 { startTime = e.convertToTime(start) + } else { + startTime, _ = types.MinDatetime.GoTime(time.UTC) } if end != 0 { endTime = e.convertToTime(end) - } - if start == 0 { - startTime = endTime.Add(-defaultSlowQueryDuration) - } - if end == 0 { - endTime = startTime.Add(defaultSlowQueryDuration) + } else { + endTime, _ = types.MaxDatetime.GoTime(time.UTC) } timeRange := &TimeRange{ StartTime: startTime,