Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Offset to libbeat/reader.Message #39873

Merged
merged 13 commits into from
Jun 21, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561]
- Fix parsing of RFC 3164 process IDs in syslog processor. {issue}38947[38947] {pull}38982[38982]
- Rename the field "apache2.module.error" to "apache.module.error" in Apache error visualization. {issue}39480[39480] {pull}39481[39481]
- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this in the user facing changelog, this is more of a developer only detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Craig, the other item you wrote is enough.

Suggested change
- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653]


*Auditbeat*

Expand Down Expand Up @@ -142,6 +143,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131]
- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]
- Fix filestream not correctly tracking the offset of a file when using the `include_message` parsser. {pull}39873[39873] {issue}39653[39653]

*Heartbeat*

Expand Down
23 changes: 13 additions & 10 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand Down Expand Up @@ -95,7 +94,7 @@
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp)

Check failure on line 97 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `manager.Init` is not checked (errcheck)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -107,7 +106,7 @@
func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp)

Check failure on line 109 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `manager.Init` is not checked (errcheck)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -128,9 +127,9 @@
e.wg.Add(1)
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()

Check failure on line 130 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `grp.Stop` is not checked (errcheck)
inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
inp.Run(inputCtx, e.pipeline)

Check failure on line 132 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `inp.Run` is not checked (errcheck)
}(&e.wg, &e.grp)
}

Expand Down Expand Up @@ -359,13 +358,13 @@
err := inputStore.Get(key, &entry)
if err != nil {
keys := []string{}
inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) {

Check failure on line 361 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `inputStore.Each` is not checked (errcheck)
keys = append(keys, key)
return false, nil
})
e.t.Logf("keys in store: %v", keys)

return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err)

Check failure on line 367 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}

return entry, nil
Expand All @@ -385,16 +384,20 @@

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
for {
sum := len(e.pipeline.GetAllEvents())
msg := &strings.Builder{}
require.Eventuallyf(e.t, func() bool {
msg.Reset()

events := e.pipeline.GetAllEvents()
sum := len(events)
if sum == count {
return
return true
}
if count < sum {
e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum)
}
time.Sleep(10 * time.Millisecond)
}
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d",
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n",

count, sum)

return false
}, 2*time.Minute, 10*time.Millisecond, "%s", msg)
}

// waitUntilEventCountCtx calls waitUntilEventCount, but fails if ctx is cancelled.
Expand Down Expand Up @@ -489,7 +492,7 @@
func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) {
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {

Check failure on line 495 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

S1011: should replace loop with `events = append(events, c.GetEvents()...)` (gosimple)
events = append(events, evt)
}
}
Expand All @@ -514,7 +517,7 @@
}
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {

Check failure on line 520 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

S1011: should replace loop with `events = append(events, c.GetEvents()...)` (gosimple)
events = append(events, evt)
}
}
Expand Down Expand Up @@ -578,7 +581,7 @@
}
c.ackHandler.ACKEvents(len(events))

for _, event := range events {

Check failure on line 584 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

S1011: should replace loop with `c.published = append(c.published, events...)` (gosimple)
c.published = append(c.published, event)
}
}
Expand Down Expand Up @@ -652,7 +655,7 @@
}
}

func (pc *mockPipelineConnector) cancelClient(i int) {

Check failure on line 658 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

func `(*mockPipelineConnector).cancelClient` is unused (unused)
pc.mtx.Lock()
defer pc.mtx.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (inp *filestream) readFromSource(
return nil
}

s.Offset += int64(message.Bytes)
s.Offset += int64(message.Bytes) + int64(message.Offset)

metrics.MessagesRead.Inc()
if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
Expand Down
33 changes: 33 additions & 0 deletions filebeat/input/filestream/parsers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ func TestParsersAgentLogs(t *testing.T) {
env.waitUntilInputStops()
}

func TestParsersIncludeMessage(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
readLine := "include this"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "100ms",
"parsers": []map[string]interface{}{
{
"include_message": map[string]interface{}{
"patterns": "^" + readLine + "$",
},
},
},
})

logs := []byte("do no include this line\r\n" + readLine + "\r\n")
env.mustWriteToFile(testlogName, logs)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(logs))

env.requireEventContents(0, "message", readLine)

cancelInput()
env.waitUntilInputStops()
}

// test_docker_logs_filtering from test_json.py
func TestParsersDockerLogsFiltering(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down
13 changes: 11 additions & 2 deletions libbeat/reader/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ func NewParser(r reader.Reader, c *Config) *FilterParser {
}
}

func (p *FilterParser) Next() (reader.Message, error) {
func (p *FilterParser) Next() (message reader.Message, err error) {
// discardedOffset accounts for the bytes of discarded messages. The inputs
// need to correctly track the file offset, therefore if only the matching
// message size is returned, the offset cannot be correctly updated.
var discardedOffset int
defer func() {
message.Offset = discardedOffset
}()

for p.ctx.Err() == nil {
message, err := p.r.Next()
message, err = p.r.Next()
if err != nil {
return message, err
}
if p.matchAny(string(message.Content)) {
return message, err
}
discardedOffset += message.Bytes
p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content))
}
return reader.Message{}, io.EOF
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Message struct {
Ts time.Time // timestamp the content was read
Content []byte // actual content read
Bytes int // total number of bytes read to generate the message
Offset int // total number of bytes read and discarded prior to generate the message
Fields mapstr.M // optional fields that can be added by reader
Meta mapstr.M // deprecated
Private interface{}
Expand Down
Loading