Skip to content

Commit

Permalink
Port four Harvester tests of log input to filestream in Golang (elast…
Browse files Browse the repository at this point in the history
…ic#24250)

(cherry picked from commit f323b36)
  • Loading branch information
kvch committed Apr 1, 2021
1 parent f2d4c16 commit 19918a5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
47 changes: 43 additions & 4 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package filestream
import (
"bytes"
"context"
"os"
"runtime"
"testing"

Expand Down Expand Up @@ -67,7 +68,6 @@ func TestFilestreamCloseRenamed(t *testing.T) {
newerTestlines := []byte("new first log line\nnew second log line\n")
env.mustWriteLinesToFile(testlogName, newerTestlines)

// new two events arrived
env.waitUntilEventCount(3)

cancelInput()
Expand Down Expand Up @@ -116,6 +116,45 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
env.waitUntilInputStops()
}

// test_close_removed from test_harvester.py
func TestFilestreamCloseRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.removed": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
env.mustWriteLinesToFile(testlogName, testlines)

// first event has made it successfully
env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(testlines))

fi, err := os.Stat(env.abspath(testlogName))
if err != nil {
t.Fatalf("cannot stat file: %+v", err)
}

env.mustRemoveFile(testlogName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()

id := getIDFromPath(env.abspath(testlogName), fi)
env.requireOffsetInRegistryByID(id, len(testlines))
}

// test_close_eof from test_harvester.py
func TestFilestreamCloseEOF(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand All @@ -127,13 +166,13 @@ func TestFilestreamCloseEOF(t *testing.T) {
"close.reader.on_eof": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
expectedOffset := len(testlines)
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// first event has made it successfully
env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, expectedOffset)
Expand Down
35 changes: 35 additions & 0 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/text/transform"

"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
Expand Down Expand Up @@ -390,3 +391,37 @@ func TestMaxBytesLimit(t *testing.T) {
}
}
}

// test_exceed_buffer from test_harvester.py
func TestBufferSize(t *testing.T) {
lines := []string{
"first line is too long\n",
"second line is too long\n",
"third line too long\n",
"OK\n",
}

codecFactory, _ := encoding.FindEncoding("")
codec, _ := codecFactory(bytes.NewBuffer(nil))
bufferSize := 10

in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, "")))
reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}

for i := 0; i < len(lines); i++ {
b, n, err := reader.Next()
if err != nil {
if err == io.EOF {
break
} else {
t.Fatal("unexpected error:", err)
}
}

require.Equal(t, n, len(lines[i]))
require.Equal(t, string(b[:n]), lines[i])
}
}

0 comments on commit 19918a5

Please sign in to comment.