Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make implementing Close required by reader.Reader #20455

Merged
merged 11 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]

==== Bugfixes

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for custom header and headersecret for filebeat http_endpoint input {pull}20435[20435]
- Add event.ingested to all Filebeat modules. {pull}20386[20386]
- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450]
- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455]


*Heartbeat*
Expand Down
5 changes: 4 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ func (h *Harvester) Run() error {
}

h.stop()
h.log.Close()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (f *Log) wait() {
}

// Close closes the done channel but no th the file handler
func (f *Log) Close() {
func (f *Log) Close() error {
close(f.done)
// Note: File reader is not closed here because that leads to race conditions
return nil
}
10 changes: 7 additions & 3 deletions libbeat/reader/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type CheckFunc func(offset int64, buf []byte) bool
// Is is useful is you want to detect if you have received garbage from a network volume.
type Reader struct {
log *logp.Logger
reader io.Reader
reader io.ReadCloser
buffer bytes.Buffer
minBufferSize int
maxFailures int
Expand All @@ -59,7 +59,7 @@ type Reader struct {
// NewReader returns a debug reader.
func NewReader(
log *logp.Logger,
reader io.Reader,
reader io.ReadCloser,
minBufferSize int,
maxFailures int,
predicate CheckFunc,
Expand Down Expand Up @@ -115,6 +115,10 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) Close() error {
return r.reader.Close()
}

func makeNullCheck(log *logp.Logger, minSize int) CheckFunc {
// create a slice with null bytes to match on the buffer.
pattern := make([]byte, minSize, minSize)
Expand Down Expand Up @@ -159,7 +163,7 @@ func summarizeBufferInfo(idx int, buf []byte) (int, []byte) {

// AppendReaders look into the current enabled log selector and will add any debug reader that match
// the selectors.
func AppendReaders(reader io.Reader) (io.Reader, error) {
func AppendReaders(reader io.ReadCloser) (io.ReadCloser, error) {
var err error

if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") {
Expand Down
13 changes: 8 additions & 5 deletions libbeat/reader/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package debug
import (
"bytes"
"io"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -78,8 +79,9 @@ func testCheckContent(t *testing.T) {
s.WriteString("hello world")
s.WriteByte(0x00)
s.WriteString("hello world")
r := ioutil.NopCloser(&s)

reader, _ := NewReader(logp.L(), &s, 5, 3, check)
reader, _ := NewReader(logp.L(), r, 5, 3, check)

_, err := reader.Read(make([]byte, 20))
if !assert.NoError(t, err) {
Expand All @@ -91,7 +93,7 @@ func testCheckContent(t *testing.T) {

func testConsumeAll(t *testing.T) {
c, _ := common.RandomBytes(2000)
reader := bytes.NewReader(c)
reader := ioutil.NopCloser(bytes.NewReader(c))
var buf bytes.Buffer
consumed := 0
debug, _ := NewReader(logp.L(), reader, 8, 20, makeNullCheck(logp.L(), 1))
Expand All @@ -106,8 +108,8 @@ func testConsumeAll(t *testing.T) {
}

func testEmptyBuffer(t *testing.T) {
var buf bytes.Buffer
debug, _ := NewReader(logp.L(), &buf, 8, 20, makeNullCheck(logp.L(), 1))
buf := ioutil.NopCloser(&bytes.Buffer{})
debug, _ := NewReader(logp.L(), buf, 8, 20, makeNullCheck(logp.L(), 1))
data := make([]byte, 33)
n, err := debug.Read(data)
assert.Equal(t, io.EOF, err)
Expand All @@ -134,8 +136,9 @@ func testSilent(t *testing.T) {
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
r := ioutil.NopCloser(&b)

debug, _ := NewReader(logp.L(), &b, 3, 2, check)
debug, _ := NewReader(logp.L(), r, 3, 2, check)
consumed := 0
for consumed < b.Len() {
n, _ := debug.Read(make([]byte, 3))
Expand Down
11 changes: 11 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package multiline

import (
"io"

"github.com/elastic/beats/v7/libbeat/reader"
)

Expand Down Expand Up @@ -131,3 +133,12 @@ func (cr *counterReader) resetState() {
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}

func (cr *counterReader) Close() error {
cr.setState((*counterReader).readClosed)
return cr.reader.Close()
}

func (cr *counterReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}
3 changes: 2 additions & 1 deletion libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package multiline
import (
"bytes"
"errors"
"io/ioutil"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -375,7 +376,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, readfile.Config{
r, err = readfile.NewEncodeReader(ioutil.NopCloser(in), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/reader/multiline/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package multiline
import (
"errors"
"fmt"
"io"
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
Expand Down Expand Up @@ -254,6 +255,15 @@ func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message,
pr.state = next
}

func (pr *patternReader) Close() error {
pr.setState((*patternReader).readClosed)
return pr.reader.Close()
}

func (pr *patternReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}

// matchers
func afterMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
Expand Down
11 changes: 11 additions & 0 deletions libbeat/reader/multiline/while.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package multiline

import (
"io"

"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
Expand Down Expand Up @@ -223,3 +225,12 @@ func negatedLineMatcher(m lineMatcherFunc) lineMatcherFunc {
return !m(content)
}
}

func (pr *whilePatternReader) Close() error {
pr.setState((*whilePatternReader).readClosed)
return pr.reader.Close()
}

func (pr *whilePatternReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}
2 changes: 2 additions & 0 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package reader

import (
"errors"
"io"
)

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
io.Closer
Next() (Message, error)
}

Expand Down
6 changes: 5 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}
Expand All @@ -59,3 +59,7 @@ func (r EncoderReader) Next() (reader.Message, error) {
Bytes: sz,
}, err
}

func (r EncoderReader) Close() error {
return r.reader.Close()
}
3 changes: 2 additions & 1 deletion libbeat/reader/readfile/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestEncodeLines(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
r := bytes.NewReader(testCase.Input)
r := ioutil.NopCloser(bytes.NewReader(testCase.Input))
codec, err := encFactory(r)
assert.Nil(t, err, "failed to initialize encoding: %v", err)

Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
}
return message, err
}

func (r *LimitReader) Close() error {
return r.reader.Close()
}
2 changes: 2 additions & 0 deletions libbeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (m *mockReader) Next() (reader.Message, error) {
}, nil
}

func (m *mockReader) Close() error { return nil }

var limitTests = []struct {
line string
maxBytes int
Expand Down
8 changes: 6 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const unlimited = 0
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
Expand All @@ -48,7 +48,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
Expand Down Expand Up @@ -271,3 +271,7 @@ func (r *LineReader) decode(end int) (int, error) {
r.byteCount += start
return start, err
}

func (r *LineReader) Close() error {
return r.reader.Close()
}
9 changes: 5 additions & 4 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/hex"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestLineTerminators(t *testing.T) {
buffer.Write([]byte("this is my second line"))
buffer.Write(nl)

reader, err := NewLineReader(buffer, Config{codec, 1024, terminator, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited})
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -229,7 +230,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down Expand Up @@ -349,7 +350,7 @@ func TestMaxBytesLimit(t *testing.T) {
}

// Create line reader
reader, err := NewLineReader(strings.NewReader(input), Config{codec, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/strip_newline.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (p *StripNewline) autoLineEndingChars(l []byte) int {
}
return 1
}

func (p *StripNewline) Close() error {
return p.reader.Close()
}
Loading