Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start a new harvester when file was truncated #1882

Merged
merged 1 commit into from
Jun 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package harvester

import (
"errors"
"io"
"os"

"golang.org/x/text/transform"
Expand Down Expand Up @@ -64,17 +63,9 @@ func (h *Harvester) Harvest() {
ts, text, bytesRead, jsonFields, err := readLine(reader)
if err != nil {
if err == errFileTruncate {
seeker, ok := h.file.(io.Seeker)
if !ok {
logp.Err("can not seek source")
return
}

logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is info message enough? There is a chance we did loose lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. We should make a WARN out of this and add a comment that this means that not all content was read potentially. WDYT?

But I suggest to make this in a second PR, ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso after our conversation yesterday about warn messages, I will leave it as info, ok?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we remove the requirement to seek most recently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in this PR. But the logging that the file was truncated is still needed.


h.SetOffset(0)
seeker.Seek(h.getOffset(), os.SEEK_SET)
continue
return
}

logp.Info("Read line error: %s", err)
Expand Down
22 changes: 4 additions & 18 deletions filebeat/harvester/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (
)

type logFileReader struct {
fs FileSource
offset int64
config logFileReaderConfig
truncated bool
fs FileSource
offset int64
config logFileReaderConfig

lastTimeRead time.Time
backoff time.Duration
Expand Down Expand Up @@ -60,18 +59,6 @@ func newLogFileReader(
}

func (r *logFileReader) Read(buf []byte) (int, error) {
if r.truncated {
var offset int64
if seeker, ok := r.fs.(io.Seeker); ok {
var err error
offset, err = seeker.Seek(0, os.SEEK_CUR)
if err != nil {
return 0, err
}
}
r.offset = offset
r.truncated = false
}

for {
select {
Expand Down Expand Up @@ -114,9 +101,8 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
// handle fails if file was truncated
if info.Size() < r.offset {
logp.Debug("harvester",
"File was truncated as offset (%s) > size (%s). Begin reading file from offset 0: %s",
"File was truncated as offset (%s) > size (%s): %s",
r.offset, info.Size(), r.fs.Name())
r.truncated = true
return n, errFileTruncate
}

Expand Down