Skip to content

Commit

Permalink
Fixed out of order entries allowed in a chunk on edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
pracucci committed Aug 7, 2019
1 parent 07f9584 commit a61e24c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,15 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool {

// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
if err := c.head.append(entry.Timestamp.UnixNano(), entry.Line); err != nil {
entryTimestamp := entry.Timestamp.UnixNano()

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}

if err := c.head.append(entryTimestamp, entry.Line); err != nil {
return err
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/chunkenc/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -216,6 +218,47 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
}

func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Parallel()

type tester func(t *testing.T, chk *MemChunk)

tests := map[string]tester{
"append out of order in the same block": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))

assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
"append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())

assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
"append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.cut())

assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())

assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
}

for testName, tester := range tests {
tester := tester

t.Run(testName, func(t *testing.T) {
t.Parallel()

tester(t, NewMemChunk(EncGZIP))
})
}
}

var result []Chunk

func BenchmarkWriteGZIP(b *testing.B) {
Expand Down

0 comments on commit a61e24c

Please sign in to comment.