Skip to content

Commit

Permalink
Improved Ingester out-of-order error for faster troubleshooting (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
wardbekker authored and rfratto committed Sep 17, 2019
1 parent 35d5a15 commit 3b96510
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
33 changes: 28 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ingester

import (
"bytes"
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -64,6 +66,11 @@ type chunkDesc struct {
lastUpdated time.Time
}

type entryWithError struct {
entry *logproto.Entry
e error
}

func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int) *stream {
return &stream{
fp: fp,
Expand Down Expand Up @@ -97,10 +104,10 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}

storedEntries := []logproto.Entry{}
failedEntriesWithError := []entryWithError{}

// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
var appendErr error
for i := range entries {
chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) {
Expand All @@ -115,7 +122,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunk = &s.chunks[len(s.chunks)-1]
}
if err := chunk.chunk.Append(&entries[i]); err != nil {
appendErr = err
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
} else {
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
Expand Down Expand Up @@ -150,11 +157,27 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}()
}

if appendErr == chunkenc.ErrOutOfOrder {
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelAdaptersToLabels(s.labels).String())
if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries
buf := bytes.Buffer{}
streamName := client.FromLabelAdaptersToLabels(s.labels).String()

for _, entryWithError := range failedEntriesWithError {
_, _ = fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
}

_, _ = fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))

return httpgrpc.Errorf(http.StatusBadRequest, buf.String())
}
return lastEntryWithErr.e
}

return appendErr
return nil
}

// Returns an iterator.
Expand Down
21 changes: 19 additions & 2 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,24 @@ func TestTransferOut(t *testing.T) {
assert.Len(t, ing.instances["test"].streams, 2)
}

// Create a new ingester and trasfer data to it
// verify we get out of order exception on adding an entry with older timestamps
_, err2 := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "out of order line", Timestamp: time.Unix(0, 0)},
{Line: "line 4", Timestamp: time.Unix(2, 0)},
},
Labels: `{foo="bar",bar="baz1"}`,
},
},
})

require.Error(t, err2)
require.Contains(t, err2.Error(), "out of order")
require.Contains(t, err2.Error(), "total ignored: 1 out of 2")

// Create a new ingester and transfer data to it
ing2 := f.getIngester(time.Second*60, t)
ing.Shutdown()

Expand Down Expand Up @@ -87,7 +104,7 @@ func TestTransferOut(t *testing.T) {

assert.Equal(
t,
[]string{"line 0", "line 1", "line 2", "line 3"},
[]string{"line 0", "line 1", "line 2", "line 3", "line 4"},
lines,
)
}
Expand Down

0 comments on commit 3b96510

Please sign in to comment.