-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support handle invalid logs #19
Changes from all commits
dab3e40
d2fbf9c
71a9f15
ca397ce
e1084ad
f53e8a8
b08939a
f386b97
b5ead57
2f47a32
b935fdb
f43ae93
b3464a1
650740f
f628718
56008cf
6921c86
35d98d7
82bb8b1
f25244b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ on: | |
- master | ||
|
||
jobs: | ||
release_packages: | ||
test: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,31 +73,25 @@ func resolveFiles(logFilePath string, beginTime, endTime int64) ([]logFile, erro | |
return nil | ||
} | ||
reader := bufio.NewReader(file) | ||
// Skip this file if cannot read the first line | ||
firstLine, err := readLine(reader) | ||
if err != nil && err != io.EOF { | ||
skipFiles = append(skipFiles, file) | ||
return nil | ||
} | ||
// Skip this file if the first line is not a valid log message | ||
firstItem, err := parseLogItem(firstLine) | ||
|
||
firstItem, err := readFirstValidLog(reader, 10) | ||
if err != nil { | ||
skipFiles = append(skipFiles, file) | ||
return nil | ||
} | ||
// Skip this file if cannot read the last line | ||
lastLine := readLastLine(file) | ||
// Skip this file if the last line is not a valid log message | ||
lastItem, err := parseLogItem(lastLine) | ||
|
||
lastItem, err := readLastValidLog(file, 10) | ||
if err != nil { | ||
skipFiles = append(skipFiles, file) | ||
return nil | ||
} | ||
|
||
// Reset position to the start and skip this file if cannot seek to start | ||
if _, err := file.Seek(0, io.SeekStart); err != nil { | ||
skipFiles = append(skipFiles, file) | ||
return nil | ||
} | ||
|
||
if beginTime > lastItem.Time || endTime < firstItem.Time { | ||
skipFiles = append(skipFiles, file) | ||
} else { | ||
|
@@ -109,18 +103,64 @@ func resolveFiles(logFilePath string, beginTime, endTime int64) ([]logFile, erro | |
} | ||
return nil | ||
}) | ||
|
||
defer func() { | ||
for _, f := range skipFiles { | ||
_ = f.Close() | ||
} | ||
}() | ||
|
||
// Sort by start time | ||
sort.Slice(logFiles, func(i, j int) bool { | ||
return logFiles[i].begin < logFiles[j].begin | ||
}) | ||
return logFiles, err | ||
} | ||
|
||
func readFirstValidLog(reader *bufio.Reader, tryLines int64) (*pb.LogMessage, error) { | ||
var tried int64 | ||
for { | ||
line, err := readLine(reader) | ||
if err != nil { | ||
return nil, err | ||
} | ||
item, err := parseLogItem(line) | ||
if err == nil { | ||
return item, nil | ||
} | ||
tried++ | ||
if tried >= tryLines { | ||
break | ||
} | ||
} | ||
return nil, errors.New("not a valid log file") | ||
} | ||
|
||
func readLastValidLog(file *os.File, tryLines int) (*pb.LogMessage, error) { | ||
var tried int | ||
stat, _ := file.Stat() | ||
endCursor := stat.Size() | ||
for { | ||
lines, readBytes := readLastLines(file, endCursor) | ||
// read out the file | ||
if readBytes == 0 { | ||
break | ||
} | ||
endCursor -= int64(readBytes) | ||
for i := len(lines) - 1; i >= 0; i-- { | ||
item, err := parseLogItem(lines[i]) | ||
if err == nil { | ||
return item, nil | ||
} | ||
} | ||
tried += len(lines) | ||
if tried >= tryLines { | ||
break | ||
} | ||
} | ||
return nil, errors.New("not a valid log file") | ||
} | ||
|
||
// Read a line from a reader. | ||
func readLine(reader *bufio.Reader) (string, error) { | ||
var line, b []byte | ||
|
@@ -136,34 +176,51 @@ func readLine(reader *bufio.Reader) (string, error) { | |
return string(line), nil | ||
} | ||
|
||
func readLastLine(file *os.File) string { | ||
var line []byte | ||
var cursor int64 | ||
stat, _ := file.Stat() | ||
filesize := stat.Size() | ||
// Read lines from the end of a file | ||
// endCursor initial value should be the filesize | ||
func readLastLines(file *os.File, endCursor int64) ([]string, int) { | ||
var lines []byte | ||
var firstNonNewlinePos int | ||
var cursor = endCursor | ||
for { | ||
cursor -= 1 | ||
file.Seek(cursor, io.SeekEnd) | ||
// stop if we are at the begining | ||
// check it in the start to avoid read beyond the size | ||
if cursor <= 0 { | ||
break | ||
} | ||
|
||
var size int64 = 512 | ||
if cursor < size { | ||
size = cursor | ||
} | ||
cursor -= size | ||
baurine marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
char := make([]byte, 1) | ||
file.Read(char) | ||
file.Seek(cursor, io.SeekStart) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace |
||
chars := make([]byte, size) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can move this line out of the loop and reuse the buffer to improve performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea! update it soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried but found it doesn't work well, including keep size value not change. Because |
||
file.Read(chars) | ||
lines = append(chars, lines...) | ||
|
||
// stop if we find a line | ||
if cursor != -1 && (char[0] == 10 || char[0] == 13) { | ||
break | ||
// find first '\n' or '\r' | ||
for i := 0; i < len(chars); i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it works as you said, |
||
// reach the line end | ||
// the first newline may be in the line end at the first round | ||
if i >= len(lines)-1 { | ||
break | ||
} | ||
if (chars[i] == 10 || chars[i] == 13) && chars[i+1] != 10 && chars[i+1] != 13 { | ||
firstNonNewlinePos = i + 1 | ||
break | ||
} | ||
} | ||
line = append(line, char[0]) | ||
if cursor == -filesize { // stop if we are at the begining | ||
if firstNonNewlinePos > 0 { | ||
break | ||
} | ||
} | ||
for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 { | ||
line[i], line[j] = line[j], line[i] | ||
} | ||
return string(line) | ||
finalStr := string(lines[firstNonNewlinePos:]) | ||
return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr) | ||
} | ||
|
||
// Returns LogLevel from string and return LogLevel_Info if | ||
// ParseLogLevel returns LogLevel from string and return LogLevel_Info if | ||
// the string is an invalid level string | ||
func ParseLogLevel(s string) pb.LogLevel { | ||
switch s { | ||
|
@@ -214,22 +271,21 @@ func parseLogItem(s string) (*pb.LogMessage, error) { | |
return item, nil | ||
} | ||
|
||
const TimeStampLayout = "2006/01/02 15:04:05.000 -07:00" | ||
const ( | ||
timeStampLayout = "2006/01/02 15:04:05.000 -07:00" | ||
timeStampLayoutLen = len(timeStampLayout) | ||
) | ||
|
||
// TiDB / TiKV / PD unified log format | ||
// [2019/03/04 17:04:24.614 +08:00] ... | ||
func parseTimeStamp(s string) (int64, error) { | ||
t, err := time.Parse(TimeStampLayout, s) | ||
t, err := time.Parse(timeStampLayout, s) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return t.UnixNano() / int64(time.Millisecond), nil | ||
} | ||
|
||
// Only enable seek when position range is more than SEEK_THRESHOLD. | ||
// The suggested value of SEEK_THRESHOLD is the biggest log size. | ||
const SEEK_THRESHOLD = 1024 * 1024 | ||
|
||
// logIterator implements Iterator and IteratorWithPeek interface. | ||
// It's used for reading logs from log files one by one by their | ||
// time. | ||
|
@@ -244,6 +300,7 @@ type logIterator struct { | |
fileIndex int | ||
reader *bufio.Reader | ||
pending []*os.File | ||
preLog *pb.LogMessage | ||
} | ||
|
||
// The Close method close all resources the iterator has. | ||
|
@@ -274,21 +331,33 @@ nextLine: | |
iter.reader.Reset(iter.pending[iter.fileIndex]) | ||
continue | ||
} | ||
if len(line) < len(TimeStampLayout) { | ||
line = strings.TrimSpace(line) | ||
if iter.preLog == nil && len(line) < timeStampLayoutLen { | ||
continue | ||
} | ||
// Skip invalid log item | ||
item, err := parseLogItem(line) | ||
if err != nil { | ||
continue | ||
if iter.preLog == nil { | ||
continue | ||
} | ||
// handle invalid log | ||
// make whole line as log message with pre time and pre log_level | ||
item = &pb.LogMessage{ | ||
Time: iter.preLog.Time, | ||
Level: iter.preLog.Level, | ||
Message: line, | ||
} | ||
} else { | ||
iter.preLog = item | ||
} | ||
if item.Time > iter.end { | ||
return nil, io.EOF | ||
} | ||
if item.Time < iter.begin { | ||
continue | ||
} | ||
if iter.levelFlag != 0 && iter.levelFlag&(1<<item.Level) == 0 { | ||
// always keep unknown log_level | ||
if item.Level > pb.LogLevel_UNKNOWN && iter.levelFlag != 0 && iter.levelFlag&(1<<item.Level) == 0 { | ||
continue | ||
} | ||
if len(iter.patterns) > 0 { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read 512 bytes once instead of 1 byte.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change very much, would you like to write a benchmark for this optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me try it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparison:
Conclusion:
the new implementation is 65x (126135/1920) as fast as the old way. (the result should depend on the length of the last line. In this case, the last line is 76 bytes long. If we double the last line length, the old way cost time will be doubled too, see details in PR #20 )