From c08c1a70bb5ef56a91c89e1e7514fa6a1b634180 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 18 Feb 2020 22:44:42 +0800 Subject: [PATCH 01/12] init Signed-off-by: crazycs --- executor/builder.go | 1 + executor/slow_query.go | 283 +++++++++++++++++-- executor/slow_query_test.go | 77 ++++- planner/core/logical_plan_builder.go | 2 + planner/core/memtable_predicate_extractor.go | 62 ++++ 5 files changed, 381 insertions(+), 44 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 1192f110f4bab..0f1ed88b5c1d4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1351,6 +1351,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo retriever: &SlowQueryRetriever{ table: v.Table, outputCols: v.Columns, + extractor: v.Extractor.(*plannercore.SlowQueryExtractor), }, } } diff --git a/executor/slow_query.go b/executor/slow_query.go index b3f94ce0f555e..8f15b9c9806ef 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -16,8 +16,13 @@ package executor import ( "bufio" "context" + "fmt" + "github.com/pingcap/parser/auth" + plannercore "github.com/pingcap/tidb/planner/core" "io" "os" + "path/filepath" + "sort" "strconv" "strings" "time" @@ -42,9 +47,16 @@ type SlowQueryRetriever struct { table *model.TableInfo outputCols []*model.ColumnInfo retrieved bool + extractor *plannercore.SlowQueryExtractor initialized bool - file *os.File + files []logFile + fileIdx int fileLine int + checker *slowLogChecker +} + +func NewSlowQueryRetrieverForTest(extractor *plannercore.SlowQueryExtractor) *SlowQueryRetriever { + return &SlowQueryRetriever{extractor: extractor} } func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { @@ -52,13 +64,16 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return nil, nil } if !e.initialized { - var err error - e.file, err = os.Open(sctx.GetSessionVars().SlowQueryFile) + err := e.initialize(sctx) if err != nil { return nil, err } - e.initialized = true } + if len(e.files) == 0 || e.fileIdx >= len(e.files) { + e.retrieved = true + return nil, nil + } + rows, err := e.dataForSlowLog(sctx) if err != nil { return nil, err @@ -77,9 +92,37 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return retRows, nil } +func (e *SlowQueryRetriever) initialize(sctx sessionctx.Context) error { + var err error + e.initialized = true + e.fileIdx = 0 + e.files, err = e.GetAllFiles(sctx.GetSessionVars().SlowQueryFile) + if err != nil { + return err + } + var hasProcessPriv bool + if pm := privilege.GetPrivilegeManager(sctx); pm != nil { + if pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { + hasProcessPriv = true + } + } + + e.checker = &slowLogChecker{ + hasProcessPriv: hasProcessPriv, + user: sctx.GetSessionVars().User, + } + if e.extractor != nil { + e.checker.enableTimeCheck = e.extractor.Enable + e.checker.startTime = e.extractor.StartTime + e.checker.endTime = e.extractor.EndTime + + } + return nil +} + func (e *SlowQueryRetriever) close() error { - if e.file != nil { - err := e.file.Close() + for _, f := range e.files { + err := f.file.Close() if err != nil { logutil.BgLogger().Error("close slow log file failed.", zap.Error(err)) } @@ -88,23 +131,15 @@ func (e *SlowQueryRetriever) close() error { } func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { - var hasProcessPriv bool - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } - } - user := ctx.GetSessionVars().User - checkValid := func(userName string) bool { - if !hasProcessPriv && user != nil && userName != user.Username { - return false - } - return true - } - rows, fileLine, err := ParseSlowLog(ctx, bufio.NewReader(e.file), e.fileLine, 1024, checkValid) + reader := bufio.NewReader(e.files[e.fileIdx].file) + rows, fileLine, err := ParseSlowLog(ctx, reader, e.fileLine, 1024, e.checker) if err != nil { if err == io.EOF { - e.retrieved = true + e.fileIdx++ + e.fileLine = 0 + if e.fileIdx >= len(e.files) { + e.retrieved = true + } } else { return nil, err } @@ -116,11 +151,33 @@ func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.D return rows, nil } -type checkValidFunc func(string) bool +type slowLogChecker struct { + // Below fields is used to check privilege. + hasProcessPriv bool + user *auth.UserIdentity + // Below fields is used to check slow log time valid. + enableTimeCheck bool + startTime time.Time + endTime time.Time +} + +func (sc *slowLogChecker) hasPrivilege(userName string) bool { + if !sc.hasProcessPriv && sc.user != nil && userName != sc.user.Username { + return false + } + return true +} + +func (sc *slowLogChecker) isTimeValid(t time.Time) bool { + if sc.enableTimeCheck && (t.Before(sc.startTime) || t.After(sc.endTime)) { + return false + } + return true +} // ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checkValid checkValidFunc) ([][]types.Datum, int, error) { +func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checker *slowLogChecker) ([][]types.Datum, int, error) { var rows [][]types.Datum startFlag := false lineNum := fileLine @@ -139,7 +196,7 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow // Check slow log entry start flag. if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} - valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checkValid) + valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -163,7 +220,7 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow if strings.HasSuffix(field, ":") { field = field[:len(field)-1] } - valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checkValid) + valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -175,12 +232,12 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow } } else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) { // Get the sql string, and mark the start flag to false. - _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checkValid) + _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } - if checkValid == nil || checkValid(st.user) { + if checker == nil || checker.hasPrivilege(st.user) { rows = append(rows, st.convertToDatumRow()) } startFlag = false @@ -271,7 +328,7 @@ type slowQueryTuple struct { planDigest string } -func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checkValid checkValidFunc) (valid bool, err error) { +func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checker *slowLogChecker) (valid bool, err error) { valid = true switch field { case variable.SlowLogTimeStr: @@ -282,6 +339,9 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, if st.time.Location() != tz { st.time = st.time.In(tz) } + if checker != nil { + valid = checker.isTimeValid(st.time) + } case variable.SlowLogTxnStartTSStr: st.txnStartTs, err = strconv.ParseUint(value, 10, 64) case variable.SlowLogUserStr: @@ -292,8 +352,8 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, if len(field) > 1 { st.host = fields[1] } - if checkValid != nil { - valid = checkValid(st.user) + if checker != nil { + valid = checker.hasPrivilege(st.user) } case variable.SlowLogConnIDStr: st.connID, err = strconv.ParseUint(value, 10, 64) @@ -465,3 +525,166 @@ func ParseTime(s string) (time.Time, error) { } return t, err } + +type logFile struct { + file *os.File // The opened file handle + begin, end time.Time // The start/end time of the log file +} + +func (l *logFile) File() *os.File { + return l.file +} + +func (l *logFile) BeginTime() time.Time { + return l.begin +} + +func (l *logFile) EndTime() time.Time { + return l.end +} + +// GetAllFiles is used to get all slow-log need to parse, it is export for test. +func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) { + if e.extractor == nil || !e.extractor.Enable { + file, err := os.Open(logFilePath) + if err != nil { + return nil, err + } + return []logFile{{file: file}}, nil + + } + var logFiles []logFile + logDir := filepath.Dir(logFilePath) + ext := filepath.Ext(logFilePath) + prefix := logFilePath[:len(logFilePath)-len(ext)] + fmt.Printf("dir: %v, prefix: %v, ext: %v\n", logDir, prefix, ext) + err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + //All rotated log files have the same prefix with the original file + if !strings.HasPrefix(path, prefix) { + return nil + } + fmt.Printf("path: %v, info: %v\n", path, info.Name()) + file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) + if err != nil { + return err + } + skip := false + defer func() { + if !skip { + fmt.Printf("close file: %v-------------\n", file.Name()) + _ = file.Close() + } + }() + now := time.Now() + fileBeginTime, err := e.getFileStartTime(file) + if err != nil { + return err + } + fmt.Printf("get start time: %v, %v---------\n\n", time.Since(now), fileBeginTime) + + if fileBeginTime.After(e.extractor.EndTime) { + return nil + } + + now = time.Now() + fileEndTime, err := e.getFileEndTime(file) + if err != nil { + return err + } + fmt.Printf("get end time: %v, %v---------\n\n", time.Since(now), fileEndTime) + if fileEndTime.Before(e.extractor.StartTime) { + return nil + } + _, err = file.Seek(0, io.SeekStart) + if err != nil { + return err + } + logFiles = append(logFiles, logFile{ + file: file, + begin: fileBeginTime, + end: fileEndTime, + }) + skip = true + return nil + }) + // Sort by start time + sort.Slice(logFiles, func(i, j int) bool { + return logFiles[i].begin.Before(logFiles[j].begin) + }) + for _, f := range logFiles { + fmt.Printf("valid file: %v ----\n", f.file.Name()) + } + return logFiles, err +} + +func (e *SlowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) { + var t time.Time + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return t, err + } + reader := bufio.NewReader(file) + maxNum := 128 + for { + lineByte, err := getOneLine(reader) + if err != nil { + return t, err + } + line := string(lineByte) + if strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { + return ParseTime(line[len(variable.SlowLogStartPrefixStr):]) + } + maxNum -= 1 + if maxNum <= 0 { + break + } + } + return t, errors.Errorf("can't found start time") +} +func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { + var t time.Time + stat, err := file.Stat() + if err != nil { + return t, err + } + fileSize := stat.Size() + cursor := int64(0) + line := make([]byte, 0, 64) + maxNum := 128 + for { + cursor -= 1 + _, err := file.Seek(cursor, io.SeekEnd) + if err != nil { + return t, err + } + + char := make([]byte, 1) + _, err = file.Read(char) + if err != nil { + return t, err + } + // If find a line. + if cursor != -1 && (char[0] == 10 || char[0] == 13) { + for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 { + line[i], line[j] = line[j], line[i] + } + lineStr := string(line) + lineStr = strings.TrimSpace(lineStr) + if strings.HasPrefix(lineStr, variable.SlowLogStartPrefixStr) { + return ParseTime(lineStr[len(variable.SlowLogStartPrefixStr):]) + } + line = line[:0] + maxNum -= 1 + } + line = append(line, char[0]) + if cursor == -fileSize || maxNum <= 0 { + return t, errors.Errorf("can't found end time") + } + } +} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 07c654e868ffe..8580c62eeb506 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -16,7 +16,10 @@ package executor_test import ( "bufio" "bytes" + "fmt" + plannercore "github.com/pingcap/tidb/planner/core" "io" + "os" "strings" "time" @@ -29,8 +32,8 @@ import ( "github.com/pingcap/tidb/util/mock" ) -func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, checkValid func(string) bool) ([][]types.Datum, int, error) { - rows, lineNum, err := executor.ParseSlowLog(ctx, reader, 0, 1024, checkValid) +func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, int, error) { + rows, lineNum, err := executor.ParseSlowLog(ctx, reader, 0, 1024, nil) if err == io.EOF { err = nil } @@ -58,11 +61,7 @@ select * from t;` c.Assert(err, IsNil) s.ctx = mock.NewContext() s.ctx.GetSessionVars().TimeZone = loc - rows, _, err := parseSlowLog(s.ctx, reader, func(_ string) bool { return false }) - c.Assert(err, IsNil) - c.Assert(len(rows), Equals, 0) - reader = bufio.NewReader(bytes.NewBufferString(slowLogStr)) - rows, _, err = parseSlowLog(s.ctx, reader, nil) + rows, _, err := parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, 1) recordString := "" @@ -92,7 +91,7 @@ select a# from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) // test for time format compatibility. @@ -103,7 +102,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - rows, _, err = parseSlowLog(s.ctx, reader, nil) + rows, _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows) == 2, IsTrue) t0Str, err := rows[0][0].ToString() @@ -124,13 +123,13 @@ select * from t; sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, _, err = parseSlowLog(s.ctx, reader) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536") variable.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) // Add parse error check. @@ -140,7 +139,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) @@ -194,7 +193,7 @@ select * from t;`) c.Assert(err, IsNil) s.ctx = mock.NewContext() s.ctx.GetSessionVars().TimeZone = loc - _, _, err = parseSlowLog(s.ctx, scanner, nil) + _, _, err = parseSlowLog(s.ctx, scanner) c.Assert(err, IsNil) // Test parser error. @@ -204,10 +203,60 @@ select * from t;`) `) scanner = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, scanner, nil) + _, _, err = parseSlowLog(s.ctx, scanner) c.Assert(err, IsNil) warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Txn_start_ts`, error: strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax") } + +func (s *testSuite1) TestSlowQueryLocateFiles(c *C) { + writeFile := func(file string, data string) { + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) + c.Assert(err, IsNil) + _, err = f.Write([]byte(data)) + c.Assert(f.Close(), IsNil) + c.Assert(err, IsNil) + } + + logData1 := `# Time: 2020-02-15T18:00:01.000000+08:00 +select * from t; +# Time: 2020-02-15T18:00:01.000000+08:00 +select * from t;` + logData2 := `# Time: 2020-02-16T18:00:01.000000+08:00 +select * from t; +# Time: 2020-02-16T18:00:01.000000+08:00 +select * from t;` + logData3 := `# Time: 2020-02-16T18:00:02.000000+08:00 +select * from t; +# Time: 2020-02-15T18:00:03.000000+08:00 +select * from t;` + writeFile("tidb-slow-2020-02-15T19-04-05.01.log", logData1) + writeFile("tidb-slow-2020-02-16T19-04-05.01.log", logData2) + writeFile("tidb-slow.log", logData3) + defer func() { + os.Remove("tidb-slow-2020-02-15T19-04-05.01.log") + os.Remove("tidb-slow-2020-02-16T19-04-05.01.log") + os.Remove("tidb-slow.log") + }() + + startTime, err := executor.ParseTime("2020-01-14T00:15:27.438708708+08:00") + c.Assert(err, IsNil) + endTime, err := executor.ParseTime("2020-02-20T00:10:28.488427431+08:00") + c.Assert(err, IsNil) + extractor := &plannercore.SlowQueryExtractor{ + Enable: true, + StartTime: startTime, + EndTime: endTime, + } + retriever := executor.NewSlowQueryRetrieverForTest(extractor) + files, err := retriever.GetAllFiles("tidb-slow.log") + c.Assert(err, IsNil) + c.Assert(files, HasLen, 0) + for _, f := range files { + fmt.Printf("file: %v, start: %v, end: %v\n", f.File().Name(), f.BeginTime().Format(logutil.SlowLogTimeFormat), f.EndTime().Format(logutil.SlowLogTimeFormat)) + err = f.File().Close() + c.Assert(err, IsNil) + } +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 0e15289700aec..81aa2ec8afaff 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2795,6 +2795,8 @@ func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tab p.Extractor = &InspectionResultTableExtractor{} case infoschema.TableMetricSummary, infoschema.TableMetricSummaryByLabel: p.Extractor = newMetricTableExtractor() + case infoschema.TableSlowQuery: + p.Extractor = &SlowQueryExtractor{} } } return p, nil diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index 3d2828546e8c0..f9b7fcfc71715 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -608,3 +608,65 @@ func (e *InspectionResultTableExtractor) Extract( e.Items = items return remained } + +// SlowQueryExtractor is used to extract some predicates of `slow_query` +type SlowQueryExtractor struct { + extractHelper + + SkipRequest bool + StartTime time.Time + EndTime time.Time + Enable bool +} + +// Extract implements the MemTablePredicateExtractor Extract interface +func (e *SlowQueryExtractor) Extract( + ctx sessionctx.Context, + schema *expression.Schema, + names []*types.FieldName, + predicates []expression.Expression, +) []expression.Expression { + remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, predicates, "time", ctx.GetSessionVars().StmtCtx.TimeZone) + e.setTimeRange(startTime, endTime) + e.SkipRequest = e.Enable && e.StartTime.After(e.EndTime) + if e.SkipRequest { + return nil + } + return remained +} + +func (e *SlowQueryExtractor) setTimeRange(start, end int64) { + const defaultSlowQueryDuration = 24 * time.Hour + var startTime, endTime time.Time + if start == 0 && end == 0 { + return + } + if start != 0 { + startTime = e.convertToTime(start) + } + if end != 0 { + endTime = e.convertToTime(end) + } + if start == 0 { + startTime = endTime.Add(-defaultSlowQueryDuration) + } + if end == 0 { + endTime = startTime.Add(defaultSlowQueryDuration) + } + e.StartTime, e.EndTime = startTime, endTime + e.Enable = true +} + +func (e *SlowQueryExtractor) CheckTimeInvalid(t time.Time) bool { + if !e.Enable || t.Before(e.StartTime) || t.After(e.EndTime) { + return true + } + return false +} + +func (e *SlowQueryExtractor) convertToTime(t int64) time.Time { + if t == 0 || t == math.MaxInt64 { + return time.Now() + } + return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) +} From 9920298540ee2a2fbfda203ae5f2bdbd2935fdf0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Feb 2020 11:34:05 +0800 Subject: [PATCH 02/12] add more test Signed-off-by: crazycs --- executor/slow_query.go | 78 ++++----- executor/slow_query_test.go | 165 +++++++++++++++---- planner/core/memtable_predicate_extractor.go | 7 - 3 files changed, 166 insertions(+), 84 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 8f15b9c9806ef..0a41783786062 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -16,9 +16,6 @@ package executor import ( "bufio" "context" - "fmt" - "github.com/pingcap/parser/auth" - plannercore "github.com/pingcap/tidb/planner/core" "io" "os" "path/filepath" @@ -28,9 +25,12 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/infoschema" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -55,16 +55,12 @@ type SlowQueryRetriever struct { checker *slowLogChecker } -func NewSlowQueryRetrieverForTest(extractor *plannercore.SlowQueryExtractor) *SlowQueryRetriever { - return &SlowQueryRetriever{extractor: extractor} -} - func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved { return nil, nil } if !e.initialized { - err := e.initialize(sctx) + err := e.Initialize(sctx) if err != nil { return nil, err } @@ -92,7 +88,8 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return retRows, nil } -func (e *SlowQueryRetriever) initialize(sctx sessionctx.Context) error { +// Initialize is exported for test. +func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { var err error e.initialized = true e.fileIdx = 0 @@ -132,17 +129,9 @@ func (e *SlowQueryRetriever) close() error { func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { reader := bufio.NewReader(e.files[e.fileIdx].file) - rows, fileLine, err := ParseSlowLog(ctx, reader, e.fileLine, 1024, e.checker) + rows, fileLine, err := e.ParseSlowLog(ctx, reader, 1024) if err != nil { - if err == io.EOF { - e.fileIdx++ - e.fileLine = 0 - if e.fileIdx >= len(e.files) { - e.retrieved = true - } - } else { - return nil, err - } + return nil, err } e.fileLine = fileLine if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) { @@ -177,10 +166,10 @@ func (sc *slowLogChecker) isTimeValid(t time.Time) bool { // ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checker *slowLogChecker) ([][]types.Datum, int, error) { +func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, int, error) { var rows [][]types.Datum startFlag := false - lineNum := fileLine + lineNum := e.fileLine tz := ctx.GetSessionVars().Location() var st *slowQueryTuple for { @@ -190,13 +179,23 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow lineNum++ lineByte, err := getOneLine(reader) if err != nil { + if err == io.EOF { + e.fileIdx++ + e.fileLine = 0 + if e.fileIdx >= len(e.files) { + e.retrieved = true + return rows, lineNum, nil + } + reader = bufio.NewReader(e.files[e.fileIdx].file) + continue + } return rows, lineNum, err } line := string(hack.String(lineByte)) // Check slow log entry start flag. if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} - valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checker) + valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -220,7 +219,7 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow if strings.HasSuffix(field, ":") { field = field[:len(field)-1] } - valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checker) + valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -232,12 +231,12 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow } } else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) { // Get the sql string, and mark the start flag to false. - _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checker) + _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } - if checker == nil || checker.hasPrivilege(st.user) { + if e.checker == nil || e.checker.hasPrivilege(st.user) { rows = append(rows, st.convertToDatumRow()) } startFlag = false @@ -535,14 +534,6 @@ func (l *logFile) File() *os.File { return l.file } -func (l *logFile) BeginTime() time.Time { - return l.begin -} - -func (l *logFile) EndTime() time.Time { - return l.end -} - // GetAllFiles is used to get all slow-log need to parse, it is export for test. func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) { if e.extractor == nil || !e.extractor.Enable { @@ -551,13 +542,11 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) return nil, err } return []logFile{{file: file}}, nil - } var logFiles []logFile logDir := filepath.Dir(logFilePath) ext := filepath.Ext(logFilePath) prefix := logFilePath[:len(logFilePath)-len(ext)] - fmt.Printf("dir: %v, prefix: %v, ext: %v\n", logDir, prefix, ext) err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -569,7 +558,6 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) if !strings.HasPrefix(path, prefix) { return nil } - fmt.Printf("path: %v, info: %v\n", path, info.Name()) file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) if err != nil { return err @@ -577,27 +565,23 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) skip := false defer func() { if !skip { - fmt.Printf("close file: %v-------------\n", file.Name()) - _ = file.Close() + terror.Log(file.Close()) } }() - now := time.Now() + // Get the file start time. fileBeginTime, err := e.getFileStartTime(file) if err != nil { return err } - fmt.Printf("get start time: %v, %v---------\n\n", time.Since(now), fileBeginTime) - if fileBeginTime.After(e.extractor.EndTime) { return nil } - now = time.Now() + // Get the file end time. fileEndTime, err := e.getFileEndTime(file) if err != nil { return err } - fmt.Printf("get end time: %v, %v---------\n\n", time.Since(now), fileEndTime) if fileEndTime.Before(e.extractor.StartTime) { return nil } @@ -617,9 +601,6 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) sort.Slice(logFiles, func(i, j int) bool { return logFiles[i].begin.Before(logFiles[j].begin) }) - for _, f := range logFiles { - fmt.Printf("valid file: %v ----\n", f.file.Name()) - } return logFiles, err } @@ -688,3 +669,8 @@ func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { } } } + +// NewSlowQueryRetrieverForTest was only used in test. +func NewSlowQueryRetrieverForTest(extractor *plannercore.SlowQueryExtractor, files []logFile) *SlowQueryRetriever { + return &SlowQueryRetriever{extractor: extractor, files: files} +} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 8580c62eeb506..9a357c3ecf7b0 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -16,8 +16,6 @@ package executor_test import ( "bufio" "bytes" - "fmt" - plannercore "github.com/pingcap/tidb/planner/core" "io" "os" "strings" @@ -25,6 +23,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/executor" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -33,7 +32,9 @@ import ( ) func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, int, error) { - rows, lineNum, err := executor.ParseSlowLog(ctx, reader, 0, 1024, nil) + extractor := &plannercore.SlowQueryExtractor{Enable: false} + retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) + rows, lineNum, err := retriever.ParseSlowLog(ctx, reader, 1024) if err == io.EOF { err = nil } @@ -211,7 +212,7 @@ select * from t;`) } -func (s *testSuite1) TestSlowQueryLocateFiles(c *C) { +func (s *testSuite) TestSlowQueryRetriever(c *C) { writeFile := func(file string, data string) { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) c.Assert(err, IsNil) @@ -220,43 +221,145 @@ func (s *testSuite1) TestSlowQueryLocateFiles(c *C) { c.Assert(err, IsNil) } - logData1 := `# Time: 2020-02-15T18:00:01.000000+08:00 -select * from t; + logData1 := ` # Time: 2020-02-15T18:00:01.000000+08:00 -select * from t;` - logData2 := `# Time: 2020-02-16T18:00:01.000000+08:00 -select * from t; +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +select 2;` + logData2 := ` # Time: 2020-02-16T18:00:01.000000+08:00 -select * from t;` - logData3 := `# Time: 2020-02-16T18:00:02.000000+08:00 -select * from t; -# Time: 2020-02-15T18:00:03.000000+08:00 -select * from t;` - writeFile("tidb-slow-2020-02-15T19-04-05.01.log", logData1) - writeFile("tidb-slow-2020-02-16T19-04-05.01.log", logData2) - writeFile("tidb-slow.log", logData3) +select 3; +# Time: 2020-02-16T18:00:05.000000+08:00 +select 4;` + logData3 := ` +# Time: 2020-02-16T19:00:00.000000+08:00 +select 5; +# Time: 2020-02-17T18:00:05.000000+08:00 +select 6;` + + fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow.log" + writeFile(fileName1, logData1) + writeFile(fileName2, logData2) + writeFile(fileName3, logData3) defer func() { os.Remove("tidb-slow-2020-02-15T19-04-05.01.log") os.Remove("tidb-slow-2020-02-16T19-04-05.01.log") os.Remove("tidb-slow.log") }() - startTime, err := executor.ParseTime("2020-01-14T00:15:27.438708708+08:00") - c.Assert(err, IsNil) - endTime, err := executor.ParseTime("2020-02-20T00:10:28.488427431+08:00") - c.Assert(err, IsNil) - extractor := &plannercore.SlowQueryExtractor{ - Enable: true, - StartTime: startTime, - EndTime: endTime, + cases := []struct { + startTime string + endTime string + files []string + querys []string + }{ + { + startTime: "2020-02-15T18:00:00.000000+08:00", + endTime: "2020-02-17T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 1;", + "select 2;", + "select 3;", + "select 4;", + "select 5;", + "select 6;", + }, + }, + { + startTime: "2020-02-15T18:00:02.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 2;", + "select 3;", + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T18:59:00.000000+08:00", + files: []string{fileName2}, + querys: []string{ + "select 4;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName2, fileName3}, + querys: []string{ + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T19:00:00.000000+08:00", + endTime: "2020-02-17T17:00:00.000000+08:00", + files: []string{fileName3}, + querys: []string{ + "select 5;", + }, + }, + { + startTime: "2010-01-01T00:00:00.000000+08:00", + endTime: "2010-01-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "2020-03-01T00:00:00.000000+08:00", + endTime: "2010-03-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "", + endTime: "", + files: []string{fileName3}, + querys: []string{ + "select 5;", + "select 6;", + }, + }, } - retriever := executor.NewSlowQueryRetrieverForTest(extractor) - files, err := retriever.GetAllFiles("tidb-slow.log") + + loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - c.Assert(files, HasLen, 0) - for _, f := range files { - fmt.Printf("file: %v, start: %v, end: %v\n", f.File().Name(), f.BeginTime().Format(logutil.SlowLogTimeFormat), f.EndTime().Format(logutil.SlowLogTimeFormat)) - err = f.File().Close() + s.ctx = mock.NewContext() + s.ctx.GetSessionVars().TimeZone = loc + for i, cas := range cases { + extractor := &plannercore.SlowQueryExtractor{Enable: (len(cas.startTime) > 0 && len(cas.endTime) > 0)} + if extractor.Enable { + startTime, err := executor.ParseTime(cas.startTime) + c.Assert(err, IsNil) + endTime, err := executor.ParseTime(cas.endTime) + c.Assert(err, IsNil) + extractor.StartTime = startTime + extractor.EndTime = endTime + + } + retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) + files, err := retriever.GetAllFiles("tidb-slow.log") c.Assert(err, IsNil) + comment := Commentf("case id: %v", i) + c.Assert(files, HasLen, len(cas.files), comment) + if len(files) > 0 { + retriever := executor.NewSlowQueryRetrieverForTest(extractor, files) + err = retriever.Initialize(s.ctx) + c.Assert(err, IsNil) + rows, _, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(files[0].File()), 1024) + c.Assert(err, IsNil) + c.Assert(len(rows), Equals, len(cas.querys), comment) + for i, row := range rows { + c.Assert(row[len(row)-1].GetString(), Equals, cas.querys[i], comment) + } + } + + for i, file := range files { + c.Assert(file.File().Name(), Equals, cas.files[i]) + c.Assert(file.File().Close(), IsNil) + } } } diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index f9b7fcfc71715..70ba6d5cf8450 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -657,13 +657,6 @@ func (e *SlowQueryExtractor) setTimeRange(start, end int64) { e.Enable = true } -func (e *SlowQueryExtractor) CheckTimeInvalid(t time.Time) bool { - if !e.Enable || t.Before(e.StartTime) || t.After(e.EndTime) { - return true - } - return false -} - func (e *SlowQueryExtractor) convertToTime(t int64) time.Time { if t == 0 || t == math.MaxInt64 { return time.Now() From d24c1340f78d5785d9b475f81acda762384acba7 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Feb 2020 12:01:35 +0800 Subject: [PATCH 03/12] refine code Signed-off-by: crazycs --- executor/slow_query.go | 42 ++++++++++++++++++++----------------- executor/slow_query_test.go | 36 +++++++++++++++++-------------- 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 0a41783786062..04979826e109a 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -92,8 +92,7 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { var err error e.initialized = true - e.fileIdx = 0 - e.files, err = e.GetAllFiles(sctx.GetSessionVars().SlowQueryFile) + e.files, err = e.GetAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) if err != nil { return err } @@ -103,7 +102,6 @@ func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { hasProcessPriv = true } } - e.checker = &slowLogChecker{ hasProcessPriv: hasProcessPriv, user: sctx.GetSessionVars().User, @@ -112,7 +110,6 @@ func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { e.checker.enableTimeCheck = e.extractor.Enable e.checker.startTime = e.extractor.StartTime e.checker.endTime = e.extractor.EndTime - } return nil } @@ -129,11 +126,10 @@ func (e *SlowQueryRetriever) close() error { func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { reader := bufio.NewReader(e.files[e.fileIdx].file) - rows, fileLine, err := e.ParseSlowLog(ctx, reader, 1024) + rows, err := e.ParseSlowLog(ctx, reader, 1024) if err != nil { return nil, err } - e.fileLine = fileLine if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) { return infoschema.AppendHostInfoToRows(rows) } @@ -166,17 +162,16 @@ func (sc *slowLogChecker) isTimeValid(t time.Time) bool { // ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, int, error) { +func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { var rows [][]types.Datum + var st *slowQueryTuple startFlag := false - lineNum := e.fileLine tz := ctx.GetSessionVars().Location() - var st *slowQueryTuple for { if len(rows) >= maxRow { - return rows, lineNum, nil + return rows, nil } - lineNum++ + e.fileLine++ lineByte, err := getOneLine(reader) if err != nil { if err == io.EOF { @@ -184,18 +179,18 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. e.fileLine = 0 if e.fileIdx >= len(e.files) { e.retrieved = true - return rows, lineNum, nil + return rows, nil } reader = bufio.NewReader(e.files[e.fileIdx].file) continue } - return rows, lineNum, err + return rows, err } line := string(hack.String(lineByte)) // Check slow log entry start flag. if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} - valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, e.checker) + valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -219,7 +214,7 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. if strings.HasSuffix(field, ":") { field = field[:len(field)-1] } - valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, e.checker) + valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -231,7 +226,7 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. } } else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) { // Get the sql string, and mark the start flag to false. - _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, e.checker) + _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -530,12 +525,13 @@ type logFile struct { begin, end time.Time // The start/end time of the log file } +// File is exported for tests. func (l *logFile) File() *os.File { return l.file } // GetAllFiles is used to get all slow-log need to parse, it is export for test. -func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) { +func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { if e.extractor == nil || !e.extractor.Enable { file, err := os.Open(logFilePath) if err != nil { @@ -571,7 +567,11 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) // Get the file start time. fileBeginTime, err := e.getFileStartTime(file) if err != nil { - return err + if err != io.EOF { + err = errors.Errorf("get slow-log file %v start time error: %v", file.Name(), err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) + } + return nil } if fileBeginTime.After(e.extractor.EndTime) { return nil @@ -580,7 +580,11 @@ func (e *SlowQueryRetriever) GetAllFiles(logFilePath string) ([]logFile, error) // Get the file end time. fileEndTime, err := e.getFileEndTime(file) if err != nil { - return err + if err != io.EOF { + err = errors.Errorf("get slow-log file %v end time error: %v", file.Name(), err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) + } + return nil } if fileEndTime.Before(e.extractor.StartTime) { return nil diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 9a357c3ecf7b0..a5e9e6952b513 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -31,14 +31,14 @@ import ( "github.com/pingcap/tidb/util/mock" ) -func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, int, error) { +func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { extractor := &plannercore.SlowQueryExtractor{Enable: false} retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) - rows, lineNum, err := retriever.ParseSlowLog(ctx, reader, 1024) + rows, err := retriever.ParseSlowLog(ctx, reader, 1024) if err == io.EOF { err = nil } - return rows, lineNum, err + return rows, err } func (s *testSuite) TestParseSlowLogFile(c *C) { @@ -62,7 +62,7 @@ select * from t;` c.Assert(err, IsNil) s.ctx = mock.NewContext() s.ctx.GetSessionVars().TimeZone = loc - rows, _, err := parseSlowLog(s.ctx, reader) + rows, err := parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, 1) recordString := "" @@ -92,7 +92,7 @@ select a# from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) // test for time format compatibility. @@ -103,7 +103,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - rows, _, err = parseSlowLog(s.ctx, reader) + rows, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows) == 2, IsTrue) t0Str, err := rows[0][0].ToString() @@ -124,13 +124,13 @@ select * from t; sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(s.ctx, reader) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536") variable.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) // Add parse error check. @@ -140,7 +140,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(s.ctx, reader) c.Assert(err, IsNil) warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) @@ -194,7 +194,7 @@ select * from t;`) c.Assert(err, IsNil) s.ctx = mock.NewContext() s.ctx.GetSessionVars().TimeZone = loc - _, _, err = parseSlowLog(s.ctx, scanner) + _, err = parseSlowLog(s.ctx, scanner) c.Assert(err, IsNil) // Test parser error. @@ -204,7 +204,7 @@ select * from t;`) `) scanner = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, scanner) + _, err = parseSlowLog(s.ctx, scanner) c.Assert(err, IsNil) warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) @@ -221,6 +221,7 @@ func (s *testSuite) TestSlowQueryRetriever(c *C) { c.Assert(err, IsNil) } + logData0 := "" logData1 := ` # Time: 2020-02-15T18:00:01.000000+08:00 select 1; @@ -237,16 +238,19 @@ select 5; # Time: 2020-02-17T18:00:05.000000+08:00 select 6;` + fileName0 := "tidb-slow-2020-02-14T19-04-05.01.log" fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log" fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log" fileName3 := "tidb-slow.log" + writeFile(fileName0, logData0) writeFile(fileName1, logData1) writeFile(fileName2, logData2) writeFile(fileName3, logData3) defer func() { - os.Remove("tidb-slow-2020-02-15T19-04-05.01.log") - os.Remove("tidb-slow-2020-02-16T19-04-05.01.log") - os.Remove("tidb-slow.log") + os.Remove(fileName0) + os.Remove(fileName1) + os.Remove(fileName2) + os.Remove(fileName3) }() cases := []struct { @@ -341,7 +345,7 @@ select 6;` } retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) - files, err := retriever.GetAllFiles("tidb-slow.log") + files, err := retriever.GetAllFiles(s.ctx, "tidb-slow.log") c.Assert(err, IsNil) comment := Commentf("case id: %v", i) c.Assert(files, HasLen, len(cas.files), comment) @@ -349,7 +353,7 @@ select 6;` retriever := executor.NewSlowQueryRetrieverForTest(extractor, files) err = retriever.Initialize(s.ctx) c.Assert(err, IsNil) - rows, _, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(files[0].File()), 1024) + rows, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(files[0].File()), 1024) c.Assert(err, IsNil) c.Assert(len(rows), Equals, len(cas.querys), comment) for i, row := range rows { From 0551d14495cfa8d3209bdeeb4116bf274a84ff0a Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Feb 2020 12:26:11 +0800 Subject: [PATCH 04/12] fix test Signed-off-by: crazycs --- executor/builder.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index b351dcea29ad8..eb6b412958e6f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1354,13 +1354,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo }, } case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog): + retriever := &SlowQueryRetriever{ + table: v.Table, + outputCols: v.Columns, + } + if v.Extractor != nil { + retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor) + } return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - retriever: &SlowQueryRetriever{ - table: v.Table, - outputCols: v.Columns, - extractor: v.Extractor.(*plannercore.SlowQueryExtractor), - }, + retriever: retriever, } } } From 62665400934154a357531c4eda5ecfa34f83da4b Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Feb 2020 12:52:32 +0800 Subject: [PATCH 05/12] add ignore error Signed-off-by: crazycs --- executor/slow_query.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 04979826e109a..d7c1af60ac485 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -543,9 +543,16 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st logDir := filepath.Dir(logFilePath) ext := filepath.Ext(logFilePath) prefix := logFilePath[:len(logFilePath)-len(ext)] + handleErr := func(err error) error { + // Ignore the error and append warning for usability. + if err != io.EOF { + sctx.GetSessionVars().StmtCtx.AppendWarning(err) + } + return nil + } err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error { if err != nil { - return err + return handleErr(err) } if info.IsDir() { return nil @@ -556,7 +563,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st } file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) if err != nil { - return err + return handleErr(err) } skip := false defer func() { @@ -567,11 +574,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st // Get the file start time. fileBeginTime, err := e.getFileStartTime(file) if err != nil { - if err != io.EOF { - err = errors.Errorf("get slow-log file %v start time error: %v", file.Name(), err) - sctx.GetSessionVars().StmtCtx.AppendWarning(err) - } - return nil + return handleErr(err) } if fileBeginTime.After(e.extractor.EndTime) { return nil @@ -580,18 +583,14 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st // Get the file end time. fileEndTime, err := e.getFileEndTime(file) if err != nil { - if err != io.EOF { - err = errors.Errorf("get slow-log file %v end time error: %v", file.Name(), err) - sctx.GetSessionVars().StmtCtx.AppendWarning(err) - } - return nil + return handleErr(err) } if fileEndTime.Before(e.extractor.StartTime) { return nil } _, err = file.Seek(0, io.SeekStart) if err != nil { - return err + return handleErr(err) } logFiles = append(logFiles, logFile{ file: file, From cd4ea4c4c193c2e2c02f912d55f0c97efe58d184 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Feb 2020 22:01:34 +0800 Subject: [PATCH 06/12] address comment Signed-off-by: crazycs --- executor/builder.go | 2 +- executor/slow_query.go | 68 +++++++++----------- executor/slow_query_test.go | 22 +++---- planner/core/memtable_predicate_extractor.go | 26 ++++---- 4 files changed, 53 insertions(+), 65 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index eb6b412958e6f..0c62763b8eec3 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1359,7 +1359,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo outputCols: v.Columns, } if v.Extractor != nil { - retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor) + retriever.Extractor = v.Extractor.(*plannercore.SlowQueryExtractor) } return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), diff --git a/executor/slow_query.go b/executor/slow_query.go index d7c1af60ac485..cd5bfa1e6360c 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -47,12 +47,14 @@ type SlowQueryRetriever struct { table *model.TableInfo outputCols []*model.ColumnInfo retrieved bool - extractor *plannercore.SlowQueryExtractor initialized bool - files []logFile - fileIdx int - fileLine int - checker *slowLogChecker + // Extractor is exported for test. + Extractor *plannercore.SlowQueryExtractor + // Files is exported for test. + Files []logFile + fileIdx int + fileLine int + checker *slowLogChecker } func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { @@ -65,7 +67,7 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return nil, err } } - if len(e.files) == 0 || e.fileIdx >= len(e.files) { + if len(e.Files) == 0 || e.fileIdx >= len(e.Files) { e.retrieved = true return nil, nil } @@ -91,31 +93,26 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte // Initialize is exported for test. func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { var err error - e.initialized = true - e.files, err = e.GetAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) - if err != nil { - return err - } var hasProcessPriv bool if pm := privilege.GetPrivilegeManager(sctx); pm != nil { - if pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } + hasProcessPriv = pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) } e.checker = &slowLogChecker{ hasProcessPriv: hasProcessPriv, user: sctx.GetSessionVars().User, } - if e.extractor != nil { - e.checker.enableTimeCheck = e.extractor.Enable - e.checker.startTime = e.extractor.StartTime - e.checker.endTime = e.extractor.EndTime + if e.Extractor != nil { + e.checker.enableTimeCheck = e.Extractor.Enable + e.checker.startTime = e.Extractor.StartTime + e.checker.endTime = e.Extractor.EndTime } - return nil + e.initialized = true + e.Files, err = e.GetAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) + return err } func (e *SlowQueryRetriever) close() error { - for _, f := range e.files { + for _, f := range e.Files { err := f.file.Close() if err != nil { logutil.BgLogger().Error("close slow log file failed.", zap.Error(err)) @@ -125,7 +122,7 @@ func (e *SlowQueryRetriever) close() error { } func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { - reader := bufio.NewReader(e.files[e.fileIdx].file) + reader := bufio.NewReader(e.Files[e.fileIdx].file) rows, err := e.ParseSlowLog(ctx, reader, 1024) if err != nil { return nil, err @@ -177,11 +174,11 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. if err == io.EOF { e.fileIdx++ e.fileLine = 0 - if e.fileIdx >= len(e.files) { + if e.fileIdx >= len(e.Files) { e.retrieved = true return rows, nil } - reader = bufio.NewReader(e.files[e.fileIdx].file) + reader = bufio.NewReader(e.Files[e.fileIdx].file) continue } return rows, err @@ -231,7 +228,7 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } - if e.checker == nil || e.checker.hasPrivilege(st.user) { + if e.checker.hasPrivilege(st.user) { rows = append(rows, st.convertToDatumRow()) } startFlag = false @@ -532,7 +529,7 @@ func (l *logFile) File() *os.File { // GetAllFiles is used to get all slow-log need to parse, it is export for test. func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { - if e.extractor == nil || !e.extractor.Enable { + if e.Extractor == nil || !e.Extractor.Enable { file, err := os.Open(logFilePath) if err != nil { return nil, err @@ -576,7 +573,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st if err != nil { return handleErr(err) } - if fileBeginTime.After(e.extractor.EndTime) { + if fileBeginTime.After(e.Extractor.EndTime) { return nil } @@ -585,7 +582,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st if err != nil { return handleErr(err) } - if fileEndTime.Before(e.extractor.StartTime) { + if fileEndTime.Before(e.Extractor.StartTime) { return nil } _, err = file.Seek(0, io.SeekStart) @@ -629,7 +626,7 @@ func (e *SlowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) break } } - return t, errors.Errorf("can't found start time") + return t, errors.Errorf("malform slow query file %v", file.Name()) } func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { var t time.Time @@ -640,7 +637,7 @@ func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { fileSize := stat.Size() cursor := int64(0) line := make([]byte, 0, 64) - maxNum := 128 + maxLineNum := 128 for { cursor -= 1 _, err := file.Seek(cursor, io.SeekEnd) @@ -654,7 +651,7 @@ func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { return t, err } // If find a line. - if cursor != -1 && (char[0] == 10 || char[0] == 13) { + if cursor != -1 && (char[0] == '\n' || char[0] == '\r') { for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 { line[i], line[j] = line[j], line[i] } @@ -664,16 +661,11 @@ func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { return ParseTime(lineStr[len(variable.SlowLogStartPrefixStr):]) } line = line[:0] - maxNum -= 1 + maxLineNum -= 1 } line = append(line, char[0]) - if cursor == -fileSize || maxNum <= 0 { - return t, errors.Errorf("can't found end time") + if cursor == -fileSize || maxLineNum <= 0 { + return t, errors.Errorf("malform slow query file %v", file.Name()) } } } - -// NewSlowQueryRetrieverForTest was only used in test. -func NewSlowQueryRetrieverForTest(extractor *plannercore.SlowQueryExtractor, files []logFile) *SlowQueryRetriever { - return &SlowQueryRetriever{extractor: extractor, files: files} -} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index a5e9e6952b513..20a06ffa36930 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/executor" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" @@ -32,8 +33,9 @@ import ( ) func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { - extractor := &plannercore.SlowQueryExtractor{Enable: false} - retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) + retriever := &executor.SlowQueryRetriever{} + // Ignore the error is ok for test. + terror.Log(retriever.Initialize(ctx)) rows, err := retriever.ParseSlowLog(ctx, reader, 1024) if err == io.EOF { err = nil @@ -333,6 +335,7 @@ select 6;` c.Assert(err, IsNil) s.ctx = mock.NewContext() s.ctx.GetSessionVars().TimeZone = loc + s.ctx.GetSessionVars().SlowQueryFile = fileName3 for i, cas := range cases { extractor := &plannercore.SlowQueryExtractor{Enable: (len(cas.startTime) > 0 && len(cas.endTime) > 0)} if extractor.Enable { @@ -344,16 +347,13 @@ select 6;` extractor.EndTime = endTime } - retriever := executor.NewSlowQueryRetrieverForTest(extractor, nil) - files, err := retriever.GetAllFiles(s.ctx, "tidb-slow.log") + retriever := &executor.SlowQueryRetriever{Extractor: extractor} + err := retriever.Initialize(s.ctx) c.Assert(err, IsNil) comment := Commentf("case id: %v", i) - c.Assert(files, HasLen, len(cas.files), comment) - if len(files) > 0 { - retriever := executor.NewSlowQueryRetrieverForTest(extractor, files) - err = retriever.Initialize(s.ctx) - c.Assert(err, IsNil) - rows, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(files[0].File()), 1024) + c.Assert(retriever.Files, HasLen, len(cas.files), comment) + if len(retriever.Files) > 0 { + rows, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(retriever.Files[0].File()), 1024) c.Assert(err, IsNil) c.Assert(len(rows), Equals, len(cas.querys), comment) for i, row := range rows { @@ -361,7 +361,7 @@ select 6;` } } - for i, file := range files { + for i, file := range retriever.Files { c.Assert(file.File().Name(), Equals, cas.files[i]) c.Assert(file.File().Close(), IsNil) } diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index f52911513067e..8e74f8dd02d00 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -382,6 +382,13 @@ func (helper extractHelper) parseQuantiles(quantileSet set.StringSet) []float64 return quantiles } +func (helper extractHelper) convertToTime(t int64) time.Time { + if t == 0 || t == math.MaxInt64 { + return time.Now() + } + return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) +} + // ClusterTableExtractor is used to extract some predicates of cluster table. type ClusterTableExtractor struct { extractHelper @@ -573,13 +580,6 @@ func (e *MetricTableExtractor) getTimeRange(start, end int64) (time.Time, time.T return startTime, endTime } -func (e *MetricTableExtractor) convertToTime(t int64) time.Time { - if t == 0 || t == math.MaxInt64 { - return time.Now() - } - return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) -} - // InspectionResultTableExtractor is used to extract some predicates of `inspection_result` type InspectionResultTableExtractor struct { extractHelper @@ -648,7 +648,10 @@ type SlowQueryExtractor struct { SkipRequest bool StartTime time.Time EndTime time.Time - Enable bool + // Enable is true means the executor should use the time range to locate the slow-log file that need to be parsed. + // Enable is false, means the executor should keep the behavior compatible with before, which is only parse the + // current slow-log file. + Enable bool } // Extract implements the MemTablePredicateExtractor Extract interface @@ -688,10 +691,3 @@ func (e *SlowQueryExtractor) setTimeRange(start, end int64) { e.StartTime, e.EndTime = startTime, endTime e.Enable = true } - -func (e *SlowQueryExtractor) convertToTime(t int64) time.Time { - if t == 0 || t == math.MaxInt64 { - return time.Now() - } - return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) -} From b003e3bf3feb7fba3596b3446929ee316eec4a9b Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 10:32:40 +0800 Subject: [PATCH 07/12] address comment Signed-off-by: crazycs --- executor/slow_query.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index cd5bfa1e6360c..63471618da970 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -144,10 +144,7 @@ type slowLogChecker struct { } func (sc *slowLogChecker) hasPrivilege(userName string) bool { - if !sc.hasProcessPriv && sc.user != nil && userName != sc.user.Username { - return false - } - return true + return sc.hasProcessPriv || sc.user == nil || userName == sc.user.Username } func (sc *slowLogChecker) isTimeValid(t time.Time) bool { From e856c9b0acbe3cb0477a63881bd69ee6b75742e4 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 11:44:39 +0800 Subject: [PATCH 08/12] address comment Signed-off-by: crazycs --- executor/builder.go | 4 +- executor/slow_query.go | 69 ++++++++++++++++------------------- executor/slow_query_test.go | 73 ++++++++++++++++++------------------- 3 files changed, 69 insertions(+), 77 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 0c62763b8eec3..258e26018141a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1354,12 +1354,12 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo }, } case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog): - retriever := &SlowQueryRetriever{ + retriever := &slowQueryRetriever{ table: v.Table, outputCols: v.Columns, } if v.Extractor != nil { - retriever.Extractor = v.Extractor.(*plannercore.SlowQueryExtractor) + retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor) } return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), diff --git a/executor/slow_query.go b/executor/slow_query.go index 63471618da970..a4699451d1307 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -42,32 +42,30 @@ import ( "go.uber.org/zap" ) -//SlowQueryRetriever is used to read slow log data. -type SlowQueryRetriever struct { +//slowQueryRetriever is used to read slow log data. +type slowQueryRetriever struct { table *model.TableInfo outputCols []*model.ColumnInfo retrieved bool initialized bool - // Extractor is exported for test. - Extractor *plannercore.SlowQueryExtractor - // Files is exported for test. - Files []logFile - fileIdx int - fileLine int - checker *slowLogChecker + extractor *plannercore.SlowQueryExtractor + files []logFile + fileIdx int + fileLine int + checker *slowLogChecker } -func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { +func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved { return nil, nil } if !e.initialized { - err := e.Initialize(sctx) + err := e.initialize(sctx) if err != nil { return nil, err } } - if len(e.Files) == 0 || e.fileIdx >= len(e.Files) { + if len(e.files) == 0 || e.fileIdx >= len(e.files) { e.retrieved = true return nil, nil } @@ -91,7 +89,7 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte } // Initialize is exported for test. -func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { +func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { var err error var hasProcessPriv bool if pm := privilege.GetPrivilegeManager(sctx); pm != nil { @@ -101,18 +99,18 @@ func (e *SlowQueryRetriever) Initialize(sctx sessionctx.Context) error { hasProcessPriv: hasProcessPriv, user: sctx.GetSessionVars().User, } - if e.Extractor != nil { - e.checker.enableTimeCheck = e.Extractor.Enable - e.checker.startTime = e.Extractor.StartTime - e.checker.endTime = e.Extractor.EndTime + if e.extractor != nil { + e.checker.enableTimeCheck = e.extractor.Enable + e.checker.startTime = e.extractor.StartTime + e.checker.endTime = e.extractor.EndTime } e.initialized = true - e.Files, err = e.GetAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) + e.files, err = e.getAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) return err } -func (e *SlowQueryRetriever) close() error { - for _, f := range e.Files { +func (e *slowQueryRetriever) close() error { + for _, f := range e.files { err := f.file.Close() if err != nil { logutil.BgLogger().Error("close slow log file failed.", zap.Error(err)) @@ -121,9 +119,9 @@ func (e *SlowQueryRetriever) close() error { return nil } -func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { - reader := bufio.NewReader(e.Files[e.fileIdx].file) - rows, err := e.ParseSlowLog(ctx, reader, 1024) +func (e *slowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { + reader := bufio.NewReader(e.files[e.fileIdx].file) + rows, err := e.parseSlowLog(ctx, reader, 1024) if err != nil { return nil, err } @@ -156,7 +154,7 @@ func (sc *slowLogChecker) isTimeValid(t time.Time) bool { // ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { +func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { var rows [][]types.Datum var st *slowQueryTuple startFlag := false @@ -171,11 +169,11 @@ func (e *SlowQueryRetriever) ParseSlowLog(ctx sessionctx.Context, reader *bufio. if err == io.EOF { e.fileIdx++ e.fileLine = 0 - if e.fileIdx >= len(e.Files) { + if e.fileIdx >= len(e.files) { e.retrieved = true return rows, nil } - reader = bufio.NewReader(e.Files[e.fileIdx].file) + reader = bufio.NewReader(e.files[e.fileIdx].file) continue } return rows, err @@ -519,14 +517,9 @@ type logFile struct { begin, end time.Time // The start/end time of the log file } -// File is exported for tests. -func (l *logFile) File() *os.File { - return l.file -} - -// GetAllFiles is used to get all slow-log need to parse, it is export for test. -func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { - if e.Extractor == nil || !e.Extractor.Enable { +// getAllFiles is used to get all slow-log need to parse, it is export for test. +func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { + if e.extractor == nil || !e.extractor.Enable { file, err := os.Open(logFilePath) if err != nil { return nil, err @@ -570,7 +563,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st if err != nil { return handleErr(err) } - if fileBeginTime.After(e.Extractor.EndTime) { + if fileBeginTime.After(e.extractor.EndTime) { return nil } @@ -579,7 +572,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st if err != nil { return handleErr(err) } - if fileEndTime.Before(e.Extractor.StartTime) { + if fileEndTime.Before(e.extractor.StartTime) { return nil } _, err = file.Seek(0, io.SeekStart) @@ -601,7 +594,7 @@ func (e *SlowQueryRetriever) GetAllFiles(sctx sessionctx.Context, logFilePath st return logFiles, err } -func (e *SlowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) { +func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) { var t time.Time _, err := file.Seek(0, io.SeekStart) if err != nil { @@ -625,7 +618,7 @@ func (e *SlowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) } return t, errors.Errorf("malform slow query file %v", file.Name()) } -func (e *SlowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { +func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { var t time.Time stat, err := file.Stat() if err != nil { diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 20a06ffa36930..cd02389897496 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package executor_test +package executor import ( "bufio" @@ -23,7 +23,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/executor" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -33,17 +32,17 @@ import ( ) func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { - retriever := &executor.SlowQueryRetriever{} + retriever := &slowQueryRetriever{} // Ignore the error is ok for test. - terror.Log(retriever.Initialize(ctx)) - rows, err := retriever.ParseSlowLog(ctx, reader, 1024) + terror.Log(retriever.initialize(ctx)) + rows, err := retriever.parseSlowLog(ctx, reader, 1024) if err == io.EOF { err = nil } return rows, err } -func (s *testSuite) TestParseSlowLogFile(c *C) { +func (s *testExecSuite) TestParseSlowLogFile(c *C) { slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00 # Txn_start_ts: 405888132465033227 @@ -62,9 +61,9 @@ select * from t;` reader := bufio.NewReader(bytes.NewBufferString(slowLogStr)) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - s.ctx = mock.NewContext() - s.ctx.GetSessionVars().TimeZone = loc - rows, err := parseSlowLog(s.ctx, reader) + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + rows, err := parseSlowLog(ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, 1) recordString := "" @@ -94,7 +93,7 @@ select a# from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) // test for time format compatibility. @@ -105,7 +104,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - rows, err = parseSlowLog(s.ctx, reader) + rows, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows) == 2, IsTrue) t0Str, err := rows[0][0].ToString() @@ -126,13 +125,13 @@ select * from t; sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(ctx, reader) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536") variable.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) // Add parse error check. @@ -142,17 +141,17 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(s.ctx, reader) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) - warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() + warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Succ`, error: strconv.ParseBool: parsing \"abc\": invalid syntax") } -func (s *testSuite) TestSlowLogParseTime(c *C) { +func (s *testExecSuite) TestSlowLogParseTime(c *C) { t1Str := "2019-01-24T22:32:29.313255+08:00" t2Str := "2019-01-24T22:32:29.313255" - t1, err := executor.ParseTime(t1Str) + t1, err := ParseTime(t1Str) c.Assert(err, IsNil) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) @@ -166,7 +165,7 @@ func (s *testSuite) TestSlowLogParseTime(c *C) { // TestFixParseSlowLogFile bugfix // sql select * from INFORMATION_SCHEMA.SLOW_QUERY limit 1; // ERROR 1105 (HY000): string "2019-05-12-11:23:29.61474688" doesn't has a prefix that matches format "2006-01-02-15:04:05.999999999 -0700", err: parsing time "2019-05-12-11:23:29.61474688" as "2006-01-02-15:04:05.999999999 -0700": cannot parse "" as "-0700" -func (s *testSuite) TestFixParseSlowLogFile(c *C) { +func (s *testExecSuite) TestFixParseSlowLogFile(c *C) { slowLog := bytes.NewBufferString( `# Time: 2019-05-12-11:23:29.614327491 +0800 # Txn_start_ts: 405888132465033227 @@ -194,9 +193,9 @@ select * from t;`) scanner := bufio.NewReader(slowLog) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - s.ctx = mock.NewContext() - s.ctx.GetSessionVars().TimeZone = loc - _, err = parseSlowLog(s.ctx, scanner) + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + _, err = parseSlowLog(ctx, scanner) c.Assert(err, IsNil) // Test parser error. @@ -206,15 +205,15 @@ select * from t;`) `) scanner = bufio.NewReader(slowLog) - _, err = parseSlowLog(s.ctx, scanner) + _, err = parseSlowLog(ctx, scanner) c.Assert(err, IsNil) - warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() + warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Txn_start_ts`, error: strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax") } -func (s *testSuite) TestSlowQueryRetriever(c *C) { +func (s *testExecSuite) TestSlowQueryRetriever(c *C) { writeFile := func(file string, data string) { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) c.Assert(err, IsNil) @@ -333,27 +332,27 @@ select 6;` loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - s.ctx = mock.NewContext() - s.ctx.GetSessionVars().TimeZone = loc - s.ctx.GetSessionVars().SlowQueryFile = fileName3 + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + ctx.GetSessionVars().SlowQueryFile = fileName3 for i, cas := range cases { extractor := &plannercore.SlowQueryExtractor{Enable: (len(cas.startTime) > 0 && len(cas.endTime) > 0)} if extractor.Enable { - startTime, err := executor.ParseTime(cas.startTime) + startTime, err := ParseTime(cas.startTime) c.Assert(err, IsNil) - endTime, err := executor.ParseTime(cas.endTime) + endTime, err := ParseTime(cas.endTime) c.Assert(err, IsNil) extractor.StartTime = startTime extractor.EndTime = endTime } - retriever := &executor.SlowQueryRetriever{Extractor: extractor} - err := retriever.Initialize(s.ctx) + retriever := &slowQueryRetriever{extractor: extractor} + err := retriever.initialize(ctx) c.Assert(err, IsNil) comment := Commentf("case id: %v", i) - c.Assert(retriever.Files, HasLen, len(cas.files), comment) - if len(retriever.Files) > 0 { - rows, err := retriever.ParseSlowLog(s.ctx, bufio.NewReader(retriever.Files[0].File()), 1024) + c.Assert(retriever.files, HasLen, len(cas.files), comment) + if len(retriever.files) > 0 { + rows, err := retriever.parseSlowLog(ctx, bufio.NewReader(retriever.files[0].file), 1024) c.Assert(err, IsNil) c.Assert(len(rows), Equals, len(cas.querys), comment) for i, row := range rows { @@ -361,9 +360,9 @@ select 6;` } } - for i, file := range retriever.Files { - c.Assert(file.File().Name(), Equals, cas.files[i]) - c.Assert(file.File().Close(), IsNil) + for i, file := range retriever.files { + c.Assert(file.file.Name(), Equals, cas.files[i]) + c.Assert(file.file.Close(), IsNil) } } } From 13321e2584f6b903237586f753669401c343a7c1 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 11:47:05 +0800 Subject: [PATCH 09/12] update comment Signed-off-by: crazycs --- executor/slow_query.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index a4699451d1307..67a4c0fc0e782 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -88,7 +88,6 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return retRows, nil } -// Initialize is exported for test. func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { var err error var hasProcessPriv bool @@ -152,7 +151,6 @@ func (sc *slowLogChecker) isTimeValid(t time.Time) bool { return true } -// ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { var rows [][]types.Datum From 0d06cc0d34c8ff8101ebf9544a4bc537d465fd71 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 16:44:41 +0800 Subject: [PATCH 10/12] address comment Signed-off-by: crazycs --- executor/slow_query.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 67a4c0fc0e782..43a16ae7afd64 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -512,7 +512,7 @@ func ParseTime(s string) (time.Time, error) { type logFile struct { file *os.File // The opened file handle - begin, end time.Time // The start/end time of the log file + start, end time.Time // The start/end time of the log file } // getAllFiles is used to get all slow-log need to parse, it is export for test. @@ -557,11 +557,11 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st } }() // Get the file start time. - fileBeginTime, err := e.getFileStartTime(file) + fileStartTime, err := e.getFileStartTime(file) if err != nil { return handleErr(err) } - if fileBeginTime.After(e.extractor.EndTime) { + if fileStartTime.After(e.extractor.EndTime) { return nil } @@ -579,7 +579,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st } logFiles = append(logFiles, logFile{ file: file, - begin: fileBeginTime, + start: fileStartTime, end: fileEndTime, }) skip = true @@ -587,7 +587,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st }) // Sort by start time sort.Slice(logFiles, func(i, j int) bool { - return logFiles[i].begin.Before(logFiles[j].begin) + return logFiles[i].start.Before(logFiles[j].start) }) return logFiles, err } From dab75e2f505bdebc2e6388a4c35c45556b4050e5 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 16:45:23 +0800 Subject: [PATCH 11/12] Update executor/slow_query.go Co-Authored-By: Arenatlx --- executor/slow_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 43a16ae7afd64..136a1648ef8a6 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -515,7 +515,7 @@ type logFile struct { start, end time.Time // The start/end time of the log file } -// getAllFiles is used to get all slow-log need to parse, it is export for test. +// getAllFiles is used to get all slow-log needed to parse, it is exported for test. func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { if e.extractor == nil || !e.extractor.Enable { file, err := os.Open(logFilePath) From 4b6d499b906c65e77e7b3b37320d4fdec305b398 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 20 Feb 2020 16:45:50 +0800 Subject: [PATCH 12/12] Update executor/slow_query.go Co-Authored-By: Arenatlx --- executor/slow_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 136a1648ef8a6..2294cc5a00cf7 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -542,7 +542,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st if info.IsDir() { return nil } - //All rotated log files have the same prefix with the original file + // All rotated log files have the same prefix with the original file. if !strings.HasPrefix(path, prefix) { return nil }