Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #20455 to 7.x: Make implementing Close required by reader.Reader #20581

Merged
merged 1 commit into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]

==== Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ field. You can revert this change by configuring tags for the module and omittin
- Add event.ingested for Suricata module {pull}20220[20220]
- Add support for custom header and headersecret for filebeat http_endpoint input {pull}20435[20435]
- Convert httpjson to v2 input {pull}20226[20226]
- Add event.ingested to all Filebeat modules. {pull}20386[20386]
- Return error when log harvester tries to open a named pipe. {issue}18682[18682] {pull}20450[20450]
- Avoid goroutine leaks in Filebeat readers. {issue}19193[19193] {pull}20455[20455]


*Heartbeat*

Expand Down
5 changes: 4 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ func (h *Harvester) Run() error {
}

h.stop()
h.log.Close()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (f *Log) wait() {
}

// Close closes the done channel but no th the file handler
func (f *Log) Close() {
func (f *Log) Close() error {
close(f.done)
// Note: File reader is not closed here because that leads to race conditions
return nil
}
10 changes: 7 additions & 3 deletions libbeat/reader/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type CheckFunc func(offset int64, buf []byte) bool
// Is is useful is you want to detect if you have received garbage from a network volume.
type Reader struct {
log *logp.Logger
reader io.Reader
reader io.ReadCloser
buffer bytes.Buffer
minBufferSize int
maxFailures int
Expand All @@ -59,7 +59,7 @@ type Reader struct {
// NewReader returns a debug reader.
func NewReader(
log *logp.Logger,
reader io.Reader,
reader io.ReadCloser,
minBufferSize int,
maxFailures int,
predicate CheckFunc,
Expand Down Expand Up @@ -115,6 +115,10 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) Close() error {
return r.reader.Close()
}

func makeNullCheck(log *logp.Logger, minSize int) CheckFunc {
// create a slice with null bytes to match on the buffer.
pattern := make([]byte, minSize, minSize)
Expand Down Expand Up @@ -159,7 +163,7 @@ func summarizeBufferInfo(idx int, buf []byte) (int, []byte) {

// AppendReaders look into the current enabled log selector and will add any debug reader that match
// the selectors.
func AppendReaders(reader io.Reader) (io.Reader, error) {
func AppendReaders(reader io.ReadCloser) (io.ReadCloser, error) {
var err error

if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") {
Expand Down
13 changes: 8 additions & 5 deletions libbeat/reader/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package debug
import (
"bytes"
"io"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -78,8 +79,9 @@ func testCheckContent(t *testing.T) {
s.WriteString("hello world")
s.WriteByte(0x00)
s.WriteString("hello world")
r := ioutil.NopCloser(&s)

reader, _ := NewReader(logp.L(), &s, 5, 3, check)
reader, _ := NewReader(logp.L(), r, 5, 3, check)

_, err := reader.Read(make([]byte, 20))
if !assert.NoError(t, err) {
Expand All @@ -91,7 +93,7 @@ func testCheckContent(t *testing.T) {

func testConsumeAll(t *testing.T) {
c, _ := common.RandomBytes(2000)
reader := bytes.NewReader(c)
reader := ioutil.NopCloser(bytes.NewReader(c))
var buf bytes.Buffer
consumed := 0
debug, _ := NewReader(logp.L(), reader, 8, 20, makeNullCheck(logp.L(), 1))
Expand All @@ -106,8 +108,8 @@ func testConsumeAll(t *testing.T) {
}

func testEmptyBuffer(t *testing.T) {
var buf bytes.Buffer
debug, _ := NewReader(logp.L(), &buf, 8, 20, makeNullCheck(logp.L(), 1))
buf := ioutil.NopCloser(&bytes.Buffer{})
debug, _ := NewReader(logp.L(), buf, 8, 20, makeNullCheck(logp.L(), 1))
data := make([]byte, 33)
n, err := debug.Read(data)
assert.Equal(t, io.EOF, err)
Expand All @@ -134,8 +136,9 @@ func testSilent(t *testing.T) {
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'})
r := ioutil.NopCloser(&b)

debug, _ := NewReader(logp.L(), &b, 3, 2, check)
debug, _ := NewReader(logp.L(), r, 3, 2, check)
consumed := 0
for consumed < b.Len() {
n, _ := debug.Read(make([]byte, 3))
Expand Down
11 changes: 11 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package multiline

import (
"io"

"github.com/elastic/beats/v7/libbeat/reader"
)

Expand Down Expand Up @@ -131,3 +133,12 @@ func (cr *counterReader) resetState() {
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}

func (cr *counterReader) Close() error {
cr.setState((*counterReader).readClosed)
return cr.reader.Close()
}

func (cr *counterReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}
3 changes: 2 additions & 1 deletion libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package multiline
import (
"bytes"
"errors"
"io/ioutil"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -333,7 +334,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, readfile.Config{
r, err = readfile.NewEncodeReader(ioutil.NopCloser(in), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/reader/multiline/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package multiline
import (
"errors"
"fmt"
"io"
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
Expand Down Expand Up @@ -254,6 +255,15 @@ func (pr *patternReader) setState(next func(pr *patternReader) (reader.Message,
pr.state = next
}

func (pr *patternReader) Close() error {
pr.setState((*patternReader).readClosed)
return pr.reader.Close()
}

func (pr *patternReader) readClosed() (reader.Message, error) {
return reader.Message{}, io.EOF
}

// matchers
func afterMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package reader

import (
"errors"
"io"
)

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
io.Closer
Next() (Message, error)
}

Expand Down
6 changes: 5 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}
Expand All @@ -59,3 +59,7 @@ func (r EncoderReader) Next() (reader.Message, error) {
Bytes: sz,
}, err
}

func (r EncoderReader) Close() error {
return r.reader.Close()
}
3 changes: 2 additions & 1 deletion libbeat/reader/readfile/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestEncodeLines(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
r := bytes.NewReader(testCase.Input)
r := ioutil.NopCloser(bytes.NewReader(testCase.Input))
codec, err := encFactory(r)
assert.Nil(t, err, "failed to initialize encoding: %v", err)

Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
}
return message, err
}

func (r *LimitReader) Close() error {
return r.reader.Close()
}
2 changes: 2 additions & 0 deletions libbeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (m *mockReader) Next() (reader.Message, error) {
}, nil
}

func (m *mockReader) Close() error { return nil }

var limitTests = []struct {
line string
maxBytes int
Expand Down
8 changes: 6 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const unlimited = 0
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
reader io.ReadCloser
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
Expand All @@ -48,7 +48,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
Expand Down Expand Up @@ -271,3 +271,7 @@ func (r *LineReader) decode(end int) (int, error) {
r.byteCount += start
return start, err
}

func (r *LineReader) Close() error {
return r.reader.Close()
}
9 changes: 5 additions & 4 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/hex"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestLineTerminators(t *testing.T) {
buffer.Write([]byte("this is my second line"))
buffer.Write(nl)

reader, err := NewLineReader(buffer, Config{codec, 1024, terminator, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited})
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -229,7 +230,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down Expand Up @@ -349,7 +350,7 @@ func TestMaxBytesLimit(t *testing.T) {
}

// Create line reader
reader, err := NewLineReader(strings.NewReader(input), Config{codec, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/reader/readfile/strip_newline.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (p *StripNewline) autoLineEndingChars(l []byte) int {
}
return 1
}

func (p *StripNewline) Close() error {
return p.reader.Close()
}
21 changes: 18 additions & 3 deletions libbeat/reader/readfile/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"errors"
"io"
"time"

"github.com/elastic/beats/v7/libbeat/reader"
Expand All @@ -36,6 +37,7 @@ type TimeoutReader struct {
signal error
running bool
ch chan lineMessage
done chan struct{}
}

type lineMessage struct {
Expand All @@ -54,6 +56,7 @@ func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *Time
signal: signal,
timeout: t,
ch: make(chan lineMessage, 1),
done: make(chan struct{}),
}
}

Expand All @@ -68,9 +71,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
go func() {
for {
message, err := r.reader.Next()
r.ch <- lineMessage{message, err}
if err != nil {
break
select {
case <-r.done:
return
case r.ch <- lineMessage{message, err}:
if err != nil {
return
}
}
}
}()
Expand All @@ -85,5 +92,13 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
return msg.line, msg.err
case <-timer.C:
return reader.Message{}, r.signal
case <-r.done:
return reader.Message{}, io.EOF
}
}

func (r *TimeoutReader) Close() error {
close(r.done)

return r.reader.Close()
}
4 changes: 4 additions & 0 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,7 @@ func stripNewLineWin(msg *reader.Message) {
return r == '\n' || r == '\r'
})
}

func (p *DockerJSONReader) Close() error {
return p.reader.Close()
}
Loading