Skip to content

Commit

Permalink
Fix tail following on EOF (influxdata#7927)
Browse files Browse the repository at this point in the history
  • Loading branch information
reimda authored and idohalevi committed Sep 23, 2020
1 parent d785689 commit 917e798
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 6 deletions.
10 changes: 4 additions & 6 deletions plugins/common/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"golang.org/x/text/encoding/unicode"
)

type Decoder = encoding.Decoder

// NewDecoder returns a x/text Decoder for the specified text encoding. The
// Decoder converts a character encoding into utf-8 bytes. If a BOM is found
// it will be converted into a utf-8 BOM, you can use
Expand All @@ -24,13 +22,13 @@ type Decoder = encoding.Decoder
func NewDecoder(enc string) (*Decoder, error) {
switch enc {
case "utf-8":
return unicode.UTF8.NewDecoder(), nil
return &Decoder{Transformer: unicode.UTF8.NewDecoder()}, nil
case "utf-16le":
return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder(), nil
return newDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil
case "utf-16be":
return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder(), nil
return newDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil
case "none", "":
return encoding.Nop.NewDecoder(), nil
return newDecoder(encoding.Nop.NewDecoder()), nil
}
return nil, errors.New("unknown character encoding")
}
171 changes: 171 additions & 0 deletions plugins/common/encoding/decoder_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package encoding

import (
"errors"
"io"

"golang.org/x/text/transform"
)

// Other than resetting r.err and r.transformComplete in Read() this
// was copied from x/text

func newDecoder(t transform.Transformer) *Decoder {
return &Decoder{Transformer: t}
}

// A Decoder converts bytes to UTF-8. It implements transform.Transformer.
//
// Transforming source bytes that are not of that encoding will not result in an
// error per se. Each byte that cannot be transcoded will be represented in the
// output by the UTF-8 encoding of '\uFFFD', the replacement rune.
type Decoder struct {
transform.Transformer

// This forces external creators of Decoders to use names in struct
// initializers, allowing for future extendibility without having to break
// code.
_ struct{}
}

// Bytes converts the given encoded bytes to UTF-8. It returns the converted
// bytes or nil, err if any error occurred.
func (d *Decoder) Bytes(b []byte) ([]byte, error) {
b, _, err := transform.Bytes(d, b)
if err != nil {
return nil, err
}
return b, nil
}

// String converts the given encoded string to UTF-8. It returns the converted
// string or "", err if any error occurred.
func (d *Decoder) String(s string) (string, error) {
s, _, err := transform.String(d, s)
if err != nil {
return "", err
}
return s, nil
}

// Reader wraps another Reader to decode its bytes.
//
// The Decoder may not be used for any other operation as long as the returned
// Reader is in use.
func (d *Decoder) Reader(r io.Reader) io.Reader {
return NewReader(r, d)
}

// Reader wraps another io.Reader by transforming the bytes read.
type Reader struct {
r io.Reader
t transform.Transformer
err error

// dst[dst0:dst1] contains bytes that have been transformed by t but
// not yet copied out via Read.
dst []byte
dst0, dst1 int

// src[src0:src1] contains bytes that have been read from r but not
// yet transformed through t.
src []byte
src0, src1 int

// transformComplete is whether the transformation is complete,
// regardless of whether or not it was successful.
transformComplete bool
}

var (
// ErrShortDst means that the destination buffer was too short to
// receive all of the transformed bytes.
ErrShortDst = errors.New("transform: short destination buffer")

// ErrShortSrc means that the source buffer has insufficient data to
// complete the transformation.
ErrShortSrc = errors.New("transform: short source buffer")

// errInconsistentByteCount means that Transform returned success (nil
// error) but also returned nSrc inconsistent with the src argument.
errInconsistentByteCount = errors.New("transform: inconsistent byte count returned")
)

const defaultBufSize = 4096

// NewReader returns a new Reader that wraps r by transforming the bytes read
// via t. It calls Reset on t.
func NewReader(r io.Reader, t transform.Transformer) *Reader {
t.Reset()
return &Reader{
r: r,
t: t,
dst: make([]byte, defaultBufSize),
src: make([]byte, defaultBufSize),
}
}

// Read implements the io.Reader interface.
func (r *Reader) Read(p []byte) (int, error) {
// Clear previous errors so a Read can be performed even if the last call
// returned EOF.
r.err = nil
r.transformComplete = false

n, err := 0, error(nil)
for {
// Copy out any transformed bytes and return the final error if we are done.
if r.dst0 != r.dst1 {
n = copy(p, r.dst[r.dst0:r.dst1])
r.dst0 += n
if r.dst0 == r.dst1 && r.transformComplete {
return n, r.err
}
return n, nil
} else if r.transformComplete {
return 0, r.err
}

// Try to transform some source bytes, or to flush the transformer if we
// are out of source bytes. We do this even if r.r.Read returned an error.
// As the io.Reader documentation says, "process the n > 0 bytes returned
// before considering the error".
if r.src0 != r.src1 || r.err != nil {
r.dst0 = 0
r.dst1, n, err = r.t.Transform(r.dst, r.src[r.src0:r.src1], r.err == io.EOF)
r.src0 += n

switch {
case err == nil:
if r.src0 != r.src1 {
r.err = errInconsistentByteCount
}
// The Transform call was successful; we are complete if we
// cannot read more bytes into src.
r.transformComplete = r.err != nil
continue
case err == ErrShortDst && (r.dst1 != 0 || n != 0):
// Make room in dst by copying out, and try again.
continue
case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil:
// Read more bytes into src via the code below, and try again.
default:
r.transformComplete = true
// The reader error (r.err) takes precedence over the
// transformer error (err) unless r.err is nil or io.EOF.
if r.err == nil || r.err == io.EOF {
r.err = err
}
continue
}
}

// Move any untransformed source bytes to the start of the buffer
// and read more bytes.
if r.src0 != 0 {
r.src0, r.src1 = 0, copy(r.src, r.src[r.src0:r.src1])
}
n, r.err = r.r.Read(r.src[r.src1:])
r.src1 += n
}
}
44 changes: 44 additions & 0 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,47 @@ func TestCharacterEncoding(t *testing.T) {
})
}
}

func TestTailEOF(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\n")
require.NoError(t, err)
err = tmpfile.Sync()
require.NoError(t, err)

tt := NewTail()
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)

err = tt.Init()
require.NoError(t, err)

acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1) // input hits eof

_, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n")
require.NoError(t, err)
err = tmpfile.Sync()
require.NoError(t, err)

acc.Wait(2)
require.NoError(t, acc.GatherError(tt.Gather))
acc.AssertContainsFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
})
acc.AssertContainsFields(t, "cpu2",
map[string]interface{}{
"usage_idle": float64(200),
})

err = tmpfile.Close()
require.NoError(t, err)
}

0 comments on commit 917e798

Please sign in to comment.