diff --git a/go.mod b/go.mod index 13d49ad..cdc2aa2 100644 --- a/go.mod +++ b/go.mod @@ -14,4 +14,5 @@ require ( golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 // indirect google.golang.org/grpc v1.25.1 gopkg.in/yaml.v2 v2.2.4 // indirect + gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect ) diff --git a/go.sum b/go.sum index 051f1dd..cfc1428 100644 --- a/go.sum +++ b/go.sum @@ -137,8 +137,9 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= diff --git a/search_log.go b/search_log.go index b8b19ef..92f3e80 100644 --- a/search_log.go +++ b/search_log.go @@ -16,11 +16,12 @@ package sysutil import ( "bufio" + "compress/gzip" "context" "errors" "fmt" "io" - "io/ioutil" + "math" "os" "path/filepath" "regexp" @@ -34,6 +35,7 @@ import ( type logFile struct { file *os.File // The opened file handle begin, end int64 // The timesteamp in millisecond of first line + compressed bool // The file is compressed or not } func (l *logFile) BeginTime() int64 { @@ -44,6 +46,8 @@ func (l *logFile) EndTime() int64 { return l.end } +const compressSuffix = ".gz" + func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime int64) ([]logFile, error) { if logFilePath == "" { return nil, errors.New("empty log file location configuration") @@ -54,11 +58,11 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in logDir := filepath.Dir(logFilePath) ext := filepath.Ext(logFilePath) filePrefix := logFilePath[:len(logFilePath)-len(ext)] - files, err := ioutil.ReadDir(logDir) + files, err := os.ReadDir(logDir) if err != nil { return nil, err } - walkFn := func(path string, info os.FileInfo) error { + walkFn := func(path string, info os.DirEntry) error { if info.IsDir() { return nil } @@ -66,7 +70,8 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in if !strings.HasPrefix(path, filePrefix) { return nil } - if !strings.HasSuffix(path, ext) { + compressed := strings.HasSuffix(path, compressSuffix) + if !strings.HasSuffix(path, ext) && !strings.HasSuffix(path, ext+compressSuffix) { return nil } if isCtxDone(ctx) { @@ -79,33 +84,51 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in if err != nil { return nil } - reader := bufio.NewReader(file) + var reader *bufio.Reader + if !compressed { + reader = bufio.NewReader(file) + } else { + gr, err := gzip.NewReader(file) + if err != nil { + return nil + } + reader = bufio.NewReader(gr) + } + var firstItemTime, lastItemTime int64 firstItem, err := readFirstValidLog(ctx, reader, 10) if err != nil { skipFiles = append(skipFiles, file) return nil } + firstItemTime = firstItem.Time - lastItem, err := readLastValidLog(ctx, file, 10) - if err != nil { - skipFiles = append(skipFiles, file) - return nil + if !compressed { + lastItem, err := readLastValidLog(ctx, file, 10) + if err != nil { + skipFiles = append(skipFiles, file) + return nil + } + lastItemTime = lastItem.Time + } else { + // For compressed file, it's hard to get last item, + // and to avoid decompression, we assume lastTime equals to `math.MaxInt64`. + lastItemTime = math.MaxInt64 } - // Reset position to the start and skip this file if cannot seek to start if _, err := file.Seek(0, io.SeekStart); err != nil { skipFiles = append(skipFiles, file) return nil } - if beginTime > lastItem.Time || endTime < firstItem.Time { + if beginTime > lastItemTime || endTime < firstItemTime { skipFiles = append(skipFiles, file) } else { logFiles = append(logFiles, logFile{ - file: file, - begin: firstItem.Time, - end: lastItem.Time, + file: file, + begin: firstItemTime, + end: lastItemTime, + compressed: compressed, }) } return nil @@ -127,7 +150,22 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in sort.Slice(logFiles, func(i, j int) bool { return logFiles[i].begin < logFiles[j].begin }) - return logFiles, err + + // Assume no time range overlap in log files and remove unnecessary log files for compressed files. + // When logFiles[i-1].end < begin < logFiles[i].begin, it will return one more logFiles[i-1]. + idx := 0 + for i := range logFiles { + if i == 0 { + continue + } + if logFiles[i].begin < beginTime { + idx = i + skipFiles = append(skipFiles, logFiles[i-1].file) + } else { + break + } + } + return logFiles[idx:], err } func isCtxDone(ctx context.Context) bool { @@ -347,15 +385,28 @@ type logIterator struct { // inner state fileIndex int reader *bufio.Reader - pending []*os.File + pending []logFile preLog *pb.LogMessage } // The Close method close all resources the iterator has. func (iter *logIterator) close() { for _, f := range iter.pending { - _ = f.Close() + _ = f.file.Close() + } +} + +func (iter *logIterator) updateToNextReader() error { + if !iter.pending[iter.fileIndex].compressed { + iter.reader = bufio.NewReader(iter.pending[iter.fileIndex].file) + } else { + gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) + if err != nil { + return err + } + iter.reader = bufio.NewReader(gr) } + return nil } func (iter *logIterator) next(ctx context.Context) (*pb.LogMessage, error) { @@ -364,7 +415,9 @@ func (iter *logIterator) next(ctx context.Context) (*pb.LogMessage, error) { if len(iter.pending) == 0 { return nil, io.EOF } - iter.reader = bufio.NewReader(iter.pending[iter.fileIndex]) + if err := iter.updateToNextReader(); err != nil { + return nil, err + } } nextLine: @@ -379,7 +432,9 @@ nextLine: if iter.fileIndex >= len(iter.pending) { return nil, io.EOF } - iter.reader.Reset(iter.pending[iter.fileIndex]) + if err := iter.updateToNextReader(); err != nil { + return nil, err + } continue } line = strings.TrimSpace(line) @@ -401,6 +456,7 @@ nextLine: } else { iter.preLog = item } + // It assumes no time range overlap for log files. if item.Time > iter.end { return nil, io.EOF } diff --git a/search_log_test.go b/search_log_test.go index 370fb87..0520c00 100644 --- a/search_log_test.go +++ b/search_log_test.go @@ -15,6 +15,7 @@ package sysutil_test import ( + "compress/gzip" "context" "fmt" "io" @@ -72,6 +73,15 @@ func (s *searchLogSuite) writeTmpFile(t testing.TB, filename string, lines []str require.NoError(t, err, fmt.Sprintf("write tmp file %s failed", filename)) } +func (s *searchLogSuite) writeTmpGzipFile(t testing.TB, filename string, lines []string) { + gzf, err := os.OpenFile(filepath.Join(s.tmpDir, filename), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + require.NoError(t, err, fmt.Sprintf("write tmp gzip file %s failed", filename)) + gz := gzip.NewWriter(gzf) + defer gz.Close() + _, err = gz.Write([]byte(strings.Join(lines, "\n"))) + require.NoError(t, err, fmt.Sprintf("write tmp gzip file %s failed", filename)) +} + func TestResolveFiles(t *testing.T) { s, clean := createSearchLogSuite(t) defer clean() @@ -653,6 +663,168 @@ func TestReadAndAppendLogFile(t *testing.T) { } } +func TestCompressLog(t *testing.T) { + s, clean := createSearchLogSuite(t) + defer clean() + + s.writeTmpGzipFile(t, "rpc.tidb-2.log.gz", []string{ + `[2019/08/26 06:22:08.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:09.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }) + + s.writeTmpGzipFile(t, "rpc.tidb-1.log.gz", []string{ + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }) + + s.writeTmpFile(t, "rpc.tidb.log", []string{ + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:20.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:21.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:22.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }) + + type timeRange struct{ start, end string } + cases := []struct { + search timeRange + expectFileNum int + expect []string + levels []pb.LogLevel + patterns []string + }{ + { + search: timeRange{"2019/08/26 06:22:14.000 -04:00", "2019/08/26 06:22:16.000 -04:00"}, + expectFileNum: 1, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:10.000 -04:00", "2019/08/26 06:22:16.000 -04:00"}, + expectFileNum: 2, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:14.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 2, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:10.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 3, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + // When file1.endtime < search.start < file2.begintime, + // it will scan one more file, because we don't now the last item for a compressed file. + search: timeRange{"2019/08/26 06:22:13.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 3, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + } + + // Set up a connection to the server. + conn, err := grpc.Dial(s.address, grpc.WithInsecure()) + require.NoError(t, err) + + for i, cas := range cases { + beginTime, err := sysutil.ParseTimeStamp(cas.search.start) + require.NoError(t, err) + endTime, err := sysutil.ParseTimeStamp(cas.search.end) + require.NoError(t, err) + + logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(s.tmpDir, "rpc.tidb.log"), beginTime, endTime) + require.NoError(t, err) + require.Len(t, logfile, cas.expectFileNum) + + req := &pb.SearchLogRequest{ + StartTime: beginTime, + EndTime: endTime, + Levels: cas.levels, + Patterns: cas.patterns, + } + client := pb.NewDiagnosticsClient(conn) + + // Contact the server and print out its response. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + stream, err := client.SearchLog(ctx, req) + require.NoError(t, err) + + resp := &pb.SearchLogResponse{} + for { + res, err := stream.Recv() + if err != nil && err == io.EOF { + break + } + require.NoError(t, err) + require.NotNil(t, res) + resp.Messages = append(resp.Messages, res.Messages...) + } + + var items []*pb.LogMessage + for _, s := range cas.expect { + item, err := sysutil.ParseLogItem(s) + require.NoError(t, err) + items = append(items, item) + } + require.Equal(t, len(items), len(resp.Messages), fmt.Sprintf("search log (index: %d) failed", i)) + require.Equal(t, items, resp.Messages, fmt.Sprintf("search log (index: %d) failed", i)) + } +} + func BenchmarkReadLastLines(b *testing.B) { s, clean := createSearchLogSuite(b) defer clean() diff --git a/service.go b/service.go index 361a3ce..f6d3a4d 100644 --- a/service.go +++ b/service.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "math" - "os" "regexp" "runtime" "sort" @@ -62,11 +61,6 @@ func (d *DiagnosticsServer) SearchLog(req *pb.SearchLogRequest, stream pb.Diagno return err } - // Sort log files by start time - var searchFiles []*os.File - for _, f := range logFiles { - searchFiles = append(searchFiles, f.file) - } var levelFlag int64 for _, l := range req.Levels { levelFlag |= 1 << l @@ -84,7 +78,7 @@ func (d *DiagnosticsServer) SearchLog(req *pb.SearchLogRequest, stream pb.Diagno end: endTime, levelFlag: levelFlag, patterns: patterns, - pending: searchFiles, + pending: logFiles, } defer iter.close()