Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rotatingfile: add basic rotating file plugin #3922

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
_ "github.com/influxdata/telegraf/plugins/outputs/rotatingfile"
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
)
17 changes: 17 additions & 0 deletions plugins/outputs/rotatingfile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# rotatingfile Output Plugin
This plugin works exactly the same as the file output plugin, but the file is rotated. This practical if you for example use something that grabs those files and moves them across a network boundary or similar.

# Configuration
```
[[outputs.rotating_file]]
## Files to write to, "stdout" is a specially handled file.
root = "/tmp"
filename_prefix = "metrics"
max_age = "1m"

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.m
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
123 changes: 123 additions & 0 deletions plugins/outputs/rotatingfile/rotate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package rotatingfile

// Rotating things
import (
"errors"
"fmt"
"os"
"path"
"sync"
"time"
)

// RootPerm defines the permissions that Writer will use if it
// needs to create the root directory.
var RootPerm = os.FileMode(0755)

// FilePerm defines the permissions that Writer will use for all
// the files it creates.
var FilePerm = os.FileMode(0644)

// Writer implements the io.Writer interface and writes to the
// "current" file in the root directory. When current file age
// exceeds max, it is renamed and a new file is created.
type Writer struct {
root string
prefix string
current *os.File
expireTime time.Time
max time.Duration
sync.Mutex
}

// New creates a new Writer. The files will be created in the
// root directory. root will be created if necessary. The
// filenames will start with prefix.
func NewRotatingWriter(root, prefix string, maxAgeInput string) (*Writer, error) {
maxAge, err := time.ParseDuration(maxAgeInput)
if err != nil {
return nil, err
}
l := &Writer{root: root, prefix: prefix, max: maxAge}
if err := l.setup(); err != nil {
return nil, err
}
return l, nil
}

// Write writes p to the current file, then checks to see if
// rotation is necessary.
func (r *Writer) Write(p []byte) (n int, err error) {
r.Lock()
defer r.Unlock()
n, err = r.current.Write(p)
if err != nil {
return n, err
}
if time.Now().After(r.expireTime) {
if err := r.rotate(); err != nil {
return n, err
}
}
return n, nil
}

// Close closes the current file. Writer is unusable after this
// is called.
func (r *Writer) Close() error {
r.Lock()
defer r.Unlock()

// Rotate before closing
if err := r.rotate(); err != nil {
return err
}

if err := r.current.Close(); err != nil {
return err
}
r.current = nil
return nil
}

// setup creates the root directory if necessary, then opens the
// current file.
func (r *Writer) setup() error {
fi, err := os.Stat(r.root)
if err != nil && os.IsNotExist(err) {
err := os.MkdirAll(r.root, RootPerm)
if err != nil {
return err
}
} else if err != nil {
return err
} else if !fi.IsDir() {
return errors.New("root must be a directory")
}

// root exists, and it is a directory

return r.openCurrent()
}

func (r *Writer) openCurrent() error {
cp := path.Join(r.root, fmt.Sprintf("%s-current", r.prefix)) // It should be safe to use Sprintf here since path.Join() uses path.Clean() on the path afterwards
var err error
r.current, err = os.OpenFile(cp, os.O_RDWR|os.O_CREATE|os.O_APPEND, FilePerm)
r.expireTime = time.Now().Add(r.max)
if err != nil {
return err
}
return nil
}

func (r *Writer) rotate() error {
if err := r.current.Close(); err != nil {
return err
}
filename := fmt.Sprintf("%s-%d", r.prefix, time.Now().UnixNano()) // UnixNano should be unique enough for this (up until a point)
if err := os.Rename(path.Join(r.root, fmt.Sprintf("%s-current", r.prefix)), path.Join(r.root, filename)); err != nil {
return err
}
return r.openCurrent()
}
85 changes: 85 additions & 0 deletions plugins/outputs/rotatingfile/rotatingfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package rotatingfile

import (
"errors"
"fmt"
"io"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

type File struct {
Root string
FilenamePrefix string
MaxAge string

writer io.WriteCloser
serializer serializers.Serializer
}

var sampleConfig = `
## Path to write files into.
root = "/tmp"
filename_prefix = "metrics"
max_age = "1m"

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

func (f *File) SetSerializer(serializer serializers.Serializer) {
f.serializer = serializer
}

func (f *File) Connect() error {
if len(f.Root) == 0 {
return errors.New("we need a root path")
}

var err error
f.writer, err = NewRotatingWriter(f.Root, f.FilenamePrefix, f.MaxAge)
if err != nil {
return err
}
return nil
}

func (f *File) Close() error {
return f.writer.Close()
}

func (f *File) SampleConfig() string {
return sampleConfig
}

func (f *File) Description() string {
return "Send telegraf metrics to a rotating file"
}
func (f *File) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

for _, metric := range metrics {
b, err := f.serializer.Serialize(metric)
if err != nil {
return fmt.Errorf("failed to serialize message: %s", err)
}
_, err = f.writer.Write(b)
if err != nil {
return fmt.Errorf("failed to write message: %s, %s", b, err)
}
}
return nil
}

func init() {
outputs.Add("rotating_file", func() telegraf.Output {
return &File{}
})
}