Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Always recombine if possible, even if incomplete #30797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/pkg-stanza-recombine-clean-state.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Recombine operator should always recombine partial logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30797]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Previously, certain circumstances could result in partial logs being emitted without any
recombiniation. This could occur when using `is_first_entry`, if the first partial log from
a source was emitted before a matching "start of log" indicator was found. This could also
occur when the collector was shutting down.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
32 changes: 21 additions & 11 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() {
if timeSinceFirstEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(source, true); err != nil {
if err := r.flushSource(context.Background(), source, true); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand All @@ -198,7 +198,7 @@ func (r *Transformer) Stop() error {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r.flushUncombined(ctx)
r.flushAllSources(ctx)

close(r.chClose)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
// This is the first entry in the next batch
case matches && r.matchIndicatesFirst():
// Flush the existing batch
err := r.flushSource(s, true)
err := r.flushSource(ctx, s, true)
if err != nil {
return err
}
Expand All @@ -251,11 +251,8 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
return nil
// This is the last entry in a complete batch
case matches && r.matchIndicatesLast():
fallthrough
// When matching on first entry, never batch partial first. Just emit immediately
case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil:
r.addToBatch(ctx, e, s)
return r.flushSource(s, true)
return r.flushSource(ctx, s, true)
}

// This is neither the first entry of a new log,
Expand All @@ -273,7 +270,7 @@ func (r *Transformer) matchIndicatesLast() bool {
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source string) {
func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) {
batch, ok := r.batchMap[source]
if !ok {
batch = r.addNewBatch(source, e)
Expand Down Expand Up @@ -305,7 +302,7 @@ func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source strin
batch.recombined.WriteString(s)

if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize {
if err := r.flushSource(source, false); err != nil {
if err := r.flushSource(ctx, source, false); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand All @@ -325,9 +322,22 @@ func (r *Transformer) flushUncombined(ctx context.Context) {
r.ticker.Reset(r.forceFlushTimeout)
}

// flushAllSources flushes all sources.
func (r *Transformer) flushAllSources(ctx context.Context) {
var errs []error
for source := range r.batchMap {
if err := r.flushSource(ctx, source, true); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
r.Errorf("there was error flushing combined logs %s", errs)
}
}

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *Transformer) flushSource(source string, deleteSource bool) error {
func (r *Transformer) flushSource(ctx context.Context, source string, deleteSource bool) error {
batch := r.batchMap[source]
// Skip flushing a combined log if the batch is empty
if batch == nil {
Expand Down Expand Up @@ -355,7 +365,7 @@ func (r *Transformer) flushSource(source string, deleteSource bool) error {
return err
}

r.Write(context.Background(), base)
r.Write(ctx, base)
if deleteSource {
r.removeBatch(source)
} else {
Expand Down
45 changes: 25 additions & 20 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand All @@ -166,35 +167,34 @@ func TestTransformer(t *testing.T) {
entryWithBody(t2, "test4"),
},
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
entryWithBody(t1, "test2\ntest3\ntest4"),
},
},
{
"EntriesMatchingForFirstEntryOneFileOnly",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "body == 'file1'"
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "more1a", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more1b", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more2a", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "more2b", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file1\nfile2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
Comment on lines -185 to -197
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these changes are just to clarify the test. (Use meaningful content instead of "file1", etc). The primary change is that the last two tokens are emitted as one after the timeout, rather than each being emitted immediately. A complementary change below adds ExpectEntries because the final token from file1 and the other tokens are not emitted in a deterministic order.

entryWithBodyAttr(t1, "start\nmore1a", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start\nmore1b", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more2a\nmore2b", map[string]string{"file.path": "file2"}),
},
},
{
Expand Down Expand Up @@ -507,9 +507,7 @@ func TestTransformer(t *testing.T) {
require.NoError(t, recombine.Process(context.Background(), e))
}

for _, expected := range tc.expectedOutput {
fake.ExpectEntry(t, expected)
}
fake.ExpectEntries(t, tc.expectedOutput)

select {
case e := <-fake.Received:
Expand Down Expand Up @@ -747,14 +745,21 @@ func TestSourceBatchDelete(t *testing.T) {
next := entry.New()
next.Timestamp = time.Now()
next.Body = "next"
start.AddAttribute("file.path", "file1")
next.AddAttribute("file.path", "file1")

expect := entry.New()
expect.ObservedTimestamp = start.ObservedTimestamp
expect.Timestamp = start.Timestamp
expect.AddAttribute("file.path", "file1")
expect.Body = "start\nnext"

ctx := context.Background()

require.NoError(t, recombine.Process(ctx, start))
require.NoError(t, recombine.Process(ctx, next))
require.Equal(t, 1, len(recombine.batchMap))
require.NoError(t, recombine.flushSource("file1", true))
require.NoError(t, recombine.flushSource(ctx, "file1", true))
require.Equal(t, 0, len(recombine.batchMap))
fake.ExpectEntry(t, expect)
require.NoError(t, recombine.Stop())
}
21 changes: 19 additions & 2 deletions pkg/stanza/testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (f *FakeOutput) ExpectBody(t testing.TB, body any) {
case e := <-f.Received:
require.Equal(t, body, e.Body)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry")
require.FailNowf(t, "Timed out waiting for entry", "%s", body)
}
}

Expand All @@ -96,10 +96,27 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
case e := <-f.Received:
require.Equal(t, expected, e)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry")
require.FailNowf(t, "Timed out waiting for entry", "%v", expected)
}
}

// ExpectEntries expects that the given entries will be received in any order
func (f *FakeOutput) ExpectEntries(t testing.TB, expected []*entry.Entry) {
entries := make([]*entry.Entry, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case e := <-f.Received:
entries = append(entries, e)
case <-time.After(time.Second):
require.Fail(t, "Timed out waiting for entry")
}
if t.Failed() {
break
}
}
require.ElementsMatch(t, expected, entries)
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
Expand Down
Loading