From 5cd2b9c20b54ed51573380a87d3045e29641a044 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Fri, 23 Feb 2024 15:10:02 +0800 Subject: [PATCH 1/8] add test cases --- go.mod | 1 + go.sum | 6 +++++- search_log_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 13d49ad..cab695a 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/shirou/gopsutil/v3 v3.21.12 github.com/stretchr/testify v1.7.1 + github.com/tj/assert v0.0.3 go.uber.org/multierr v1.4.0 // indirect go.uber.org/zap v1.12.0 // indirect golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 // indirect diff --git a/go.sum b/go.sum index 051f1dd..13ca78a 100644 --- a/go.sum +++ b/go.sum @@ -52,9 +52,12 @@ github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= +github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= @@ -137,8 +140,9 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= diff --git a/search_log_test.go b/search_log_test.go index 370fb87..19289e2 100644 --- a/search_log_test.go +++ b/search_log_test.go @@ -15,6 +15,7 @@ package sysutil_test import ( + "compress/gzip" "context" "fmt" "io" @@ -30,6 +31,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/sysutil" "github.com/stretchr/testify/require" + "github.com/tj/assert" "google.golang.org/grpc" ) @@ -653,6 +655,33 @@ func TestReadAndAppendLogFile(t *testing.T) { } } +func TestCompressLog(t *testing.T) { + tmpDir := t.TempDir() + + filename := "test.log" + gzf, err := os.OpenFile(filepath.Join(tmpDir, filename+".gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + assert.NoError(t, err) + gz := gzip.NewWriter(gzf) + gz.Write([]byte(strings.Join([]string{ + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, "\n"))) + assert.NoError(t, err) + gz.Close() + + beginTime, err := sysutil.ParseTimeStamp("2019/08/26 06:22:14.000 -04:00") + assert.NoError(t, err) + endTime, err := sysutil.ParseTimeStamp("2019/08/26 06:22:16.000 -04:00") + assert.NoError(t, err) + + logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(tmpDir, filename), beginTime, endTime) + assert.NoError(t, err) + assert.NotNil(t, logfile) +} + func BenchmarkReadLastLines(b *testing.B) { s, clean := createSearchLogSuite(b) defer clean() From 19e9bd33a4e938cdc8dff5b9c41887fa3657320e Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 09:45:33 +0800 Subject: [PATCH 2/8] add test cases --- search_log.go | 72 ++++++++++++++++++++----- search_log_test.go | 129 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 175 insertions(+), 26 deletions(-) diff --git a/search_log.go b/search_log.go index b8b19ef..4af021a 100644 --- a/search_log.go +++ b/search_log.go @@ -16,11 +16,12 @@ package sysutil import ( "bufio" + "compress/gzip" "context" "errors" "fmt" "io" - "io/ioutil" + "math" "os" "path/filepath" "regexp" @@ -44,6 +45,8 @@ func (l *logFile) EndTime() int64 { return l.end } +const compressSuffix = ".gz" + func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime int64) ([]logFile, error) { if logFilePath == "" { return nil, errors.New("empty log file location configuration") @@ -54,11 +57,11 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in logDir := filepath.Dir(logFilePath) ext := filepath.Ext(logFilePath) filePrefix := logFilePath[:len(logFilePath)-len(ext)] - files, err := ioutil.ReadDir(logDir) + files, err := os.ReadDir(logDir) if err != nil { return nil, err } - walkFn := func(path string, info os.FileInfo) error { + walkFn := func(path string, info os.DirEntry) error { if info.IsDir() { return nil } @@ -66,6 +69,11 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in if !strings.HasPrefix(path, filePrefix) { return nil } + compressed := false + if strings.HasSuffix(path, compressSuffix) { + compressed = true + path = path[0 : len(path)-len(compressSuffix)] + } if !strings.HasSuffix(path, ext) { return nil } @@ -75,37 +83,58 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in // If we cannot open the file, we skip to search the file instead of returning // error and abort entire searching task. // TODO: do we need to return some warning to client? - file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) + var file *os.File + if !compressed { + file, err = os.OpenFile(path, os.O_RDONLY, os.ModePerm) + } else { + file, err = os.OpenFile(path+compressSuffix, os.O_RDONLY, os.ModePerm) + } if err != nil { return nil } - reader := bufio.NewReader(file) + var reader *bufio.Reader + if !compressed { + reader = bufio.NewReader(file) + } else { + gr, err := gzip.NewReader(file) + if err != nil { + return nil + } + reader = bufio.NewReader(gr) + } + var firstItemTime, lastItemTime int64 firstItem, err := readFirstValidLog(ctx, reader, 10) if err != nil { skipFiles = append(skipFiles, file) return nil } + firstItemTime = firstItem.Time - lastItem, err := readLastValidLog(ctx, file, 10) - if err != nil { - skipFiles = append(skipFiles, file) - return nil + if !compressed { + lastItem, err := readLastValidLog(ctx, file, 10) + if err != nil { + skipFiles = append(skipFiles, file) + return nil + } + lastItemTime = lastItem.Time + } else { + // for compressed file, to avoid decompression, we assume lastTime equals to inf. + lastItemTime = math.MaxInt64 } - // Reset position to the start and skip this file if cannot seek to start if _, err := file.Seek(0, io.SeekStart); err != nil { skipFiles = append(skipFiles, file) return nil } - if beginTime > lastItem.Time || endTime < firstItem.Time { + if beginTime > lastItemTime || endTime < firstItemTime { skipFiles = append(skipFiles, file) } else { logFiles = append(logFiles, logFile{ file: file, - begin: firstItem.Time, - end: lastItem.Time, + begin: firstItemTime, + end: lastItemTime, }) } return nil @@ -127,7 +156,22 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in sort.Slice(logFiles, func(i, j int) bool { return logFiles[i].begin < logFiles[j].begin }) - return logFiles, err + + // Assume no time range overlap in log files and remove unnecessary log files for compressed files. + // When logFiles[i-1].end < begin < logFiles[i].begin, it will return one more logFiles[i-1]. + idx := 0 + for i := range logFiles { + if i == 0 { + continue + } + if logFiles[i].begin < beginTime { + idx = i + skipFiles = append(skipFiles, logFiles[i-1].file) + } else { + break + } + } + return logFiles[idx:], err } func isCtxDone(ctx context.Context) bool { diff --git a/search_log_test.go b/search_log_test.go index 19289e2..f049cfd 100644 --- a/search_log_test.go +++ b/search_log_test.go @@ -31,7 +31,6 @@ import ( pb "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/sysutil" "github.com/stretchr/testify/require" - "github.com/tj/assert" "google.golang.org/grpc" ) @@ -658,10 +657,21 @@ func TestReadAndAppendLogFile(t *testing.T) { func TestCompressLog(t *testing.T) { tmpDir := t.TempDir() - filename := "test.log" - gzf, err := os.OpenFile(filepath.Join(tmpDir, filename+".gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) - assert.NoError(t, err) + gzf, err := os.OpenFile(filepath.Join(tmpDir, "test-2.log.gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + require.NoError(t, err) gz := gzip.NewWriter(gzf) + gz.Write([]byte(strings.Join([]string{ + `[2019/08/26 06:22:08.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:09.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, "\n"))) + gz.Close() + + gzf, err = os.OpenFile(filepath.Join(tmpDir, "test-1.log.gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + require.NoError(t, err) + gz = gzip.NewWriter(gzf) gz.Write([]byte(strings.Join([]string{ `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, @@ -669,17 +679,112 @@ func TestCompressLog(t *testing.T) { `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, }, "\n"))) - assert.NoError(t, err) gz.Close() - beginTime, err := sysutil.ParseTimeStamp("2019/08/26 06:22:14.000 -04:00") - assert.NoError(t, err) - endTime, err := sysutil.ParseTimeStamp("2019/08/26 06:22:16.000 -04:00") - assert.NoError(t, err) + f, err := os.OpenFile(filepath.Join(tmpDir, "test.log"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + require.NoError(t, err) + f.Write([]byte(strings.Join([]string{ + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:20.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:21.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:22.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, "\n"))) + f.Close() + + type timeRange struct{ start, end string } + cases := []struct { + search timeRange + expectFileNum int + expect []string + levels []pb.LogLevel + patterns []string + }{ + { + search: timeRange{"2019/08/26 06:22:14.000 -04:00", "2019/08/26 06:22:16.000 -04:00"}, + expectFileNum: 1, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:10.000 -04:00", "2019/08/26 06:22:16.000 -04:00"}, + expectFileNum: 2, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:14.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 2, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + search: timeRange{"2019/08/26 06:22:10.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 3, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + { + // When file1.endtime < search.start < file2.begintime, + // it will scan one more file, because we don't now the last item for a compressed file. + search: timeRange{"2019/08/26 06:22:13.000 -04:00", "2019/08/26 06:22:20.000 -04:00"}, + expectFileNum: 3, + levels: []pb.LogLevel{pb.LogLevel_Info}, + patterns: []string{".*TiDB.*"}, + expect: []string{ + `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, + }, + }, + } + + for _, cas := range cases { + beginTime, err := sysutil.ParseTimeStamp(cas.search.start) + require.NoError(t, err) + endTime, err := sysutil.ParseTimeStamp(cas.search.end) + require.NoError(t, err) - logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(tmpDir, filename), beginTime, endTime) - assert.NoError(t, err) - assert.NotNil(t, logfile) + logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(tmpDir, "test.log"), beginTime, endTime) + require.NoError(t, err) + require.Len(t, logfile, cas.expectFileNum) + } } func BenchmarkReadLastLines(b *testing.B) { From 460aa73a9dc5696e6831168ee559157d4c76ede4 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 13:26:53 +0800 Subject: [PATCH 3/8] update comments --- search_log.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/search_log.go b/search_log.go index 4af021a..99b9a5b 100644 --- a/search_log.go +++ b/search_log.go @@ -119,7 +119,8 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in } lastItemTime = lastItem.Time } else { - // for compressed file, to avoid decompression, we assume lastTime equals to inf. + // For compressed file, it's hard to get last item, + // and to avoid decompression, we assume lastTime equals to `math.MaxInt64`. lastItemTime = math.MaxInt64 } // Reset position to the start and skip this file if cannot seek to start From 4d43c73a0afb0413a8b280f26cdc213ba98e5c0c Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 14:25:54 +0800 Subject: [PATCH 4/8] update --- search_log.go | 32 ++++++++++++++----- search_log_test.go | 78 ++++++++++++++++++++++++++++++++++------------ service.go | 8 +---- 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/search_log.go b/search_log.go index 99b9a5b..4441b58 100644 --- a/search_log.go +++ b/search_log.go @@ -35,6 +35,7 @@ import ( type logFile struct { file *os.File // The opened file handle begin, end int64 // The timesteamp in millisecond of first line + compressed bool // The file is compressed or not } func (l *logFile) BeginTime() int64 { @@ -133,9 +134,10 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in skipFiles = append(skipFiles, file) } else { logFiles = append(logFiles, logFile{ - file: file, - begin: firstItemTime, - end: lastItemTime, + file: file, + begin: firstItemTime, + end: lastItemTime, + compressed: compressed, }) } return nil @@ -392,14 +394,14 @@ type logIterator struct { // inner state fileIndex int reader *bufio.Reader - pending []*os.File + pending []logFile preLog *pb.LogMessage } // The Close method close all resources the iterator has. func (iter *logIterator) close() { for _, f := range iter.pending { - _ = f.Close() + _ = f.file.Close() } } @@ -409,7 +411,15 @@ func (iter *logIterator) next(ctx context.Context) (*pb.LogMessage, error) { if len(iter.pending) == 0 { return nil, io.EOF } - iter.reader = bufio.NewReader(iter.pending[iter.fileIndex]) + if !iter.pending[iter.fileIndex].compressed { + iter.reader = bufio.NewReader(iter.pending[iter.fileIndex].file) + } else { + gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) + if err != nil { + return nil, err + } + iter.reader = bufio.NewReader(gr) + } } nextLine: @@ -424,7 +434,15 @@ nextLine: if iter.fileIndex >= len(iter.pending) { return nil, io.EOF } - iter.reader.Reset(iter.pending[iter.fileIndex]) + if !iter.pending[iter.fileIndex].compressed { + iter.reader.Reset(iter.pending[iter.fileIndex].file) + } else { + gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) + if err != nil { + return nil, err + } + iter.reader.Reset(gr) + } continue } line = strings.TrimSpace(line) diff --git a/search_log_test.go b/search_log_test.go index f049cfd..0520c00 100644 --- a/search_log_test.go +++ b/search_log_test.go @@ -73,6 +73,15 @@ func (s *searchLogSuite) writeTmpFile(t testing.TB, filename string, lines []str require.NoError(t, err, fmt.Sprintf("write tmp file %s failed", filename)) } +func (s *searchLogSuite) writeTmpGzipFile(t testing.TB, filename string, lines []string) { + gzf, err := os.OpenFile(filepath.Join(s.tmpDir, filename), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) + require.NoError(t, err, fmt.Sprintf("write tmp gzip file %s failed", filename)) + gz := gzip.NewWriter(gzf) + defer gz.Close() + _, err = gz.Write([]byte(strings.Join(lines, "\n"))) + require.NoError(t, err, fmt.Sprintf("write tmp gzip file %s failed", filename)) +} + func TestResolveFiles(t *testing.T) { s, clean := createSearchLogSuite(t) defer clean() @@ -655,42 +664,32 @@ func TestReadAndAppendLogFile(t *testing.T) { } func TestCompressLog(t *testing.T) { - tmpDir := t.TempDir() + s, clean := createSearchLogSuite(t) + defer clean() - gzf, err := os.OpenFile(filepath.Join(tmpDir, "test-2.log.gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) - require.NoError(t, err) - gz := gzip.NewWriter(gzf) - gz.Write([]byte(strings.Join([]string{ + s.writeTmpGzipFile(t, "rpc.tidb-2.log.gz", []string{ `[2019/08/26 06:22:08.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:09.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:10.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:11.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:12.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, - }, "\n"))) - gz.Close() + }) - gzf, err = os.OpenFile(filepath.Join(tmpDir, "test-1.log.gz"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) - require.NoError(t, err) - gz = gzip.NewWriter(gzf) - gz.Write([]byte(strings.Join([]string{ + s.writeTmpGzipFile(t, "rpc.tidb-1.log.gz", []string{ `[2019/08/26 06:22:13.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:14.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:15.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:16.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:17.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, - }, "\n"))) - gz.Close() + }) - f, err := os.OpenFile(filepath.Join(tmpDir, "test.log"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm) - require.NoError(t, err) - f.Write([]byte(strings.Join([]string{ + s.writeTmpFile(t, "rpc.tidb.log", []string{ `[2019/08/26 06:22:18.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:19.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:20.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:21.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, `[2019/08/26 06:22:22.011 -04:00] [INFO] [printer.go:41] ["Welcome to TiDB."]`, - }, "\n"))) - f.Close() + }) type timeRange struct{ start, end string } cases := []struct { @@ -775,15 +774,54 @@ func TestCompressLog(t *testing.T) { }, } - for _, cas := range cases { + // Set up a connection to the server. + conn, err := grpc.Dial(s.address, grpc.WithInsecure()) + require.NoError(t, err) + + for i, cas := range cases { beginTime, err := sysutil.ParseTimeStamp(cas.search.start) require.NoError(t, err) endTime, err := sysutil.ParseTimeStamp(cas.search.end) require.NoError(t, err) - logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(tmpDir, "test.log"), beginTime, endTime) + logfile, err := sysutil.ResolveFiles(context.Background(), filepath.Join(s.tmpDir, "rpc.tidb.log"), beginTime, endTime) require.NoError(t, err) require.Len(t, logfile, cas.expectFileNum) + + req := &pb.SearchLogRequest{ + StartTime: beginTime, + EndTime: endTime, + Levels: cas.levels, + Patterns: cas.patterns, + } + client := pb.NewDiagnosticsClient(conn) + + // Contact the server and print out its response. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + stream, err := client.SearchLog(ctx, req) + require.NoError(t, err) + + resp := &pb.SearchLogResponse{} + for { + res, err := stream.Recv() + if err != nil && err == io.EOF { + break + } + require.NoError(t, err) + require.NotNil(t, res) + resp.Messages = append(resp.Messages, res.Messages...) + } + + var items []*pb.LogMessage + for _, s := range cas.expect { + item, err := sysutil.ParseLogItem(s) + require.NoError(t, err) + items = append(items, item) + } + require.Equal(t, len(items), len(resp.Messages), fmt.Sprintf("search log (index: %d) failed", i)) + require.Equal(t, items, resp.Messages, fmt.Sprintf("search log (index: %d) failed", i)) } } diff --git a/service.go b/service.go index 361a3ce..f6d3a4d 100644 --- a/service.go +++ b/service.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "math" - "os" "regexp" "runtime" "sort" @@ -62,11 +61,6 @@ func (d *DiagnosticsServer) SearchLog(req *pb.SearchLogRequest, stream pb.Diagno return err } - // Sort log files by start time - var searchFiles []*os.File - for _, f := range logFiles { - searchFiles = append(searchFiles, f.file) - } var levelFlag int64 for _, l := range req.Levels { levelFlag |= 1 << l @@ -84,7 +78,7 @@ func (d *DiagnosticsServer) SearchLog(req *pb.SearchLogRequest, stream pb.Diagno end: endTime, levelFlag: levelFlag, patterns: patterns, - pending: searchFiles, + pending: logFiles, } defer iter.close() From 5d292a312665ad2a92336823d786463bb94a7777 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 14:27:52 +0800 Subject: [PATCH 5/8] go mod tidy --- go.mod | 2 +- go.sum | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/go.mod b/go.mod index cab695a..cdc2aa2 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,10 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/shirou/gopsutil/v3 v3.21.12 github.com/stretchr/testify v1.7.1 - github.com/tj/assert v0.0.3 go.uber.org/multierr v1.4.0 // indirect go.uber.org/zap v1.12.0 // indirect golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 // indirect google.golang.org/grpc v1.25.1 gopkg.in/yaml.v2 v2.2.4 // indirect + gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect ) diff --git a/go.sum b/go.sum index 13ca78a..cfc1428 100644 --- a/go.sum +++ b/go.sum @@ -52,12 +52,9 @@ github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= -github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= From a5b7172e11e53c7d69f347ce0313dd8c796843b6 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 16:18:22 +0800 Subject: [PATCH 6/8] add comments --- search_log.go | 1 + 1 file changed, 1 insertion(+) diff --git a/search_log.go b/search_log.go index 4441b58..44ab31a 100644 --- a/search_log.go +++ b/search_log.go @@ -464,6 +464,7 @@ nextLine: } else { iter.preLog = item } + // It assumes no time range overlap for log files. if item.Time > iter.end { return nil, io.EOF } From 9bcce866e68b39d33d7b7d8ab4f7be9eb33f98de Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 7 Mar 2024 16:32:04 +0800 Subject: [PATCH 7/8] add function --- search_log.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/search_log.go b/search_log.go index 44ab31a..70bb895 100644 --- a/search_log.go +++ b/search_log.go @@ -405,20 +405,27 @@ func (iter *logIterator) close() { } } +func (iter *logIterator) updateToNextReader() error { + if !iter.pending[iter.fileIndex].compressed { + iter.reader = bufio.NewReader(iter.pending[iter.fileIndex].file) + } else { + gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) + if err != nil { + return err + } + iter.reader = bufio.NewReader(gr) + } + return nil +} + func (iter *logIterator) next(ctx context.Context) (*pb.LogMessage, error) { // initial state if iter.reader == nil { if len(iter.pending) == 0 { return nil, io.EOF } - if !iter.pending[iter.fileIndex].compressed { - iter.reader = bufio.NewReader(iter.pending[iter.fileIndex].file) - } else { - gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) - if err != nil { - return nil, err - } - iter.reader = bufio.NewReader(gr) + if err := iter.updateToNextReader(); err != nil { + return nil, err } } @@ -434,14 +441,8 @@ nextLine: if iter.fileIndex >= len(iter.pending) { return nil, io.EOF } - if !iter.pending[iter.fileIndex].compressed { - iter.reader.Reset(iter.pending[iter.fileIndex].file) - } else { - gr, err := gzip.NewReader(iter.pending[iter.fileIndex].file) - if err != nil { - return nil, err - } - iter.reader.Reset(gr) + if err := iter.updateToNextReader(); err != nil { + return nil, err } continue } From 64f13e8146a93d060fad36d7657b9b35f1653e72 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Fri, 8 Mar 2024 16:22:26 +0800 Subject: [PATCH 8/8] code clean --- search_log.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/search_log.go b/search_log.go index 70bb895..92f3e80 100644 --- a/search_log.go +++ b/search_log.go @@ -70,12 +70,8 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in if !strings.HasPrefix(path, filePrefix) { return nil } - compressed := false - if strings.HasSuffix(path, compressSuffix) { - compressed = true - path = path[0 : len(path)-len(compressSuffix)] - } - if !strings.HasSuffix(path, ext) { + compressed := strings.HasSuffix(path, compressSuffix) + if !strings.HasSuffix(path, ext) && !strings.HasSuffix(path, ext+compressSuffix) { return nil } if isCtxDone(ctx) { @@ -84,12 +80,7 @@ func resolveFiles(ctx context.Context, logFilePath string, beginTime, endTime in // If we cannot open the file, we skip to search the file instead of returning // error and abort entire searching task. // TODO: do we need to return some warning to client? - var file *os.File - if !compressed { - file, err = os.OpenFile(path, os.O_RDONLY, os.ModePerm) - } else { - file, err = os.OpenFile(path+compressSuffix, os.O_RDONLY, os.ModePerm) - } + file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) if err != nil { return nil }