Skip to content

Commit

Permalink
[637] Add support to index gzipped logs
Browse files Browse the repository at this point in the history
This is a proposed approach to fix elastic#637. In this approach, we reuse existing input type 'log' to transparently process gzipped files. So there is no configuration change for filebeat users except for ensuring that the filename patterns match gzipped file names. The filebeat recognizes gzipped files using extension '.gz', any other extension will not work.

Ruflin noted an alternative approach of introducing a brand new input type to deal with gzipped files. I'm ok about either approaches. But reusing existing input type seemed more intuitive from filebeat users viewpoint and for me it was easier to implement. I hope this change gives Ruflin a better view of how this approach looks like. If we still feel a new input type is better, we can certainly go down that path..

Few pending things that we can do once we agree this approach is acceptable
- Test for cases where a regular file gets log rotated and compressed. In this case compressed file will have a different inode and today this works assuming filename patterns don't match .gz files, but with this support, .gz files will typically be matched.
- Write new tests
- Go's compress/gzip package doesn't support seeking, so resuming is not supported. Need to decide if we want to support this or gracefully handle this
  • Loading branch information
Raghavendra Rachamadugu committed Aug 10, 2016
1 parent 32446ed commit c0ae186
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
36 changes: 22 additions & 14 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,33 @@ func (h *Harvester) openFile() error {

harvesterOpenFiles.Add(1)

defer func() {
if err != nil {
f.Close()
harvesterOpenFiles.Add(-1)
}
}()

// Makes sure file handler is also closed on errors
err = h.validateFile(f)
if err != nil {
f.Close()
harvesterOpenFiles.Add(-1)
return err
}

h.file = source.File{f}
return nil
h.file, err = source.NewFile(f)
if err != nil {
return err
}

// get file offset. Only update offset if no error
offset, err := h.initFileOffset(h.file)
if err != nil {
return err
}

logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset)
h.state.Offset = offset
return err
}

func (h *Harvester) validateFile(f *os.File) error {
Expand Down Expand Up @@ -198,19 +215,10 @@ func (h *Harvester) validateFile(f *os.File) error {
return err
}

// get file offset. Only update offset if no error
offset, err := h.initFileOffset(f)
if err != nil {
return err
}

logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset)
h.state.Offset = offset

return nil
}

func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
func (h *Harvester) initFileOffset(file source.FileSource) (int64, error) {

// continue from last known offset
if h.state.Offset > 0 {
Expand Down
68 changes: 66 additions & 2 deletions filebeat/harvester/source/file.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,73 @@
package source

import "os"
import (
"os"
"errors"
"path/filepath"
"compress/gzip"
"github.com/elastic/beats/libbeat/logp"
)

type File struct {
*os.File
*os.File
}

func NewFile(osf *os.File) (FileSource, error) {
fileExt := filepath.Ext(osf.Name())
logp.Debug("harvester", "file extension is %s", fileExt)
switch fileExt {
case ".gz": {
logp.Debug("harvester", "reading compressed gzip file %s", osf.Name())
return newGZipFile(osf)
}
default: return File{osf}, nil
}
}

func (File) Continuable() bool { return true }

type GZipFile struct {
*os.File
gzipReader *gzip.Reader
}

func newGZipFile(osf *os.File) (FileSource, error) {
gzipReader, err := gzip.NewReader(osf)
if err != nil {
return nil, err
} else {
return GZipFile{osf, gzipReader}, nil
}
}

func (GZipFile) Continuable() bool { return false }

func (gf GZipFile) Close() error {
err1 := gf.gzipReader.Close()
err2 := gf.File.Close()
if err2 != nil {
return err2
} else {
return err1
}
}

func (gf GZipFile) Read(p []byte) (n int, err error) {
return gf.gzipReader.Read(p)
}

func (gf GZipFile) Seek(offset int64, whence int) (int64, error) {
// TODO: gzip package doesn't inherently support seeking, we maybe able to fake seeking using read
err := errors.New("Seeking is not supported on gzip files. Offset: ")
switch whence {
case os.SEEK_SET:
case os.SEEK_CUR:
if (offset > 0) {
return 0, err
} else {
return offset, nil
}
}
return 0, err
}

14 changes: 12 additions & 2 deletions filebeat/harvester/source/pipe.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package source

import "os"
import (
"os"
"errors"
)

// restrict file to minimal interface of FileSource to prevent possible casts
// to additional interfaces supported by underlying file
type Pipe struct {
File *os.File
File *os.File
}

func (p Pipe) Read(b []byte) (int, error) { return p.File.Read(b) }
func (p Pipe) Close() error { return p.File.Close() }
func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) Seek(offset int64, whence int) (int64, error) {
if offset == 0 && (whence == os.SEEK_CUR || whence == os.SEEK_SET) {
return 0, nil
} else {
return 0, errors.New("Seek not supported on pipes")
}
}
15 changes: 8 additions & 7 deletions filebeat/harvester/source/source.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package source

import (
"io"
"os"
"io"
"os"
)

type LogSource interface {
io.ReadCloser
Name() string
io.ReadCloser
Name() string
io.Seeker
}

type FileSource interface {
LogSource
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
LogSource
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
}

0 comments on commit c0ae186

Please sign in to comment.