diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 89685c73522ba..6298d26ddbb0c 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -490,7 +490,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi return iterForward, nil } - return iter.NewEntryIteratorBackward(iterForward) + return iter.NewReversedIter(iterForward, 0, false) } func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filter) iter.EntryIterator { diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 9e48edacbb4ab..4c15802a5e178 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -484,55 +484,76 @@ func (i *timeRangedIterator) Next() bool { return ok } -type entryIteratorBackward struct { - forwardIter EntryIterator - cur logproto.Entry - entries []logproto.Entry - loaded bool +type entryWithLabels struct { + entry logproto.Entry + labels string +} + +type reverseIterator struct { + iter EntryIterator + cur entryWithLabels + entriesWithLabels []entryWithLabels + + loaded bool + limit uint32 } -// NewEntryIteratorBackward returns an iterator which loads all the entries +// NewReversedIter returns an iterator which loads all or up to N entries // of an existing iterator, and then iterates over them backward. -func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) { - return &entryIteratorBackward{entries: make([]logproto.Entry, 0, 1024), forwardIter: it}, it.Error() +// Preload entries when they are being queried with a timeout. +func NewReversedIter(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) { + iter, err := &reverseIterator{ + iter: it, + entriesWithLabels: make([]entryWithLabels, 0, 1024), + limit: limit, + }, it.Error() + + if err != nil { + return nil, err + } + + if preload { + iter.load() + } + + return iter, nil } -func (i *entryIteratorBackward) load() { +func (i *reverseIterator) load() { if !i.loaded { i.loaded = true - for i.forwardIter.Next() { - entry := i.forwardIter.Entry() - i.entries = append(i.entries, entry) + for count := uint32(0); (i.limit == 0 || count < i.limit) && i.iter.Next(); count++ { + i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.iter.Entry(), i.iter.Labels()}) } - i.forwardIter.Close() + i.iter.Close() } } -func (i *entryIteratorBackward) Next() bool { +func (i *reverseIterator) Next() bool { i.load() - if len(i.entries) == 0 { - i.entries = nil + if len(i.entriesWithLabels) == 0 { + i.entriesWithLabels = nil return false } - i.cur, i.entries = i.entries[len(i.entries)-1], i.entries[:len(i.entries)-1] + i.cur, i.entriesWithLabels = i.entriesWithLabels[len(i.entriesWithLabels)-1], i.entriesWithLabels[:len(i.entriesWithLabels)-1] return true } -func (i *entryIteratorBackward) Entry() logproto.Entry { - return i.cur +func (i *reverseIterator) Entry() logproto.Entry { + return i.cur.entry } -func (i *entryIteratorBackward) Close() error { - if !i.loaded { - return i.forwardIter.Close() - } - return nil +func (i *reverseIterator) Labels() string { + return i.cur.labels } -func (i *entryIteratorBackward) Error() error { return nil } +func (i *reverseIterator) Error() error { return nil } -func (i *entryIteratorBackward) Labels() string { - return "" +func (i *reverseIterator) Close() error { + if !i.loaded { + return i.iter.Close() + } + return nil } // ReadBatch reads a set of entries off an iterator. @@ -560,72 +581,6 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, e return &result, respSize, i.Error() } -type entryWithLabels struct { - entry logproto.Entry - labels string -} - -type entryIteratorForward struct { - backwardIter EntryIterator - cur entryWithLabels - entriesWithLabels []entryWithLabels - loaded bool - limit uint32 -} - -// NewEntryIteratorBackward returns an iterator which loads all or upton N entries -// of an existing iterator, and then iterates over them backward. -// preload entries when they are being queried with a timeout -func NewEntryIteratorForward(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) { - itr, err := &entryIteratorForward{entriesWithLabels: make([]entryWithLabels, 0, 1024), backwardIter: it, limit: limit}, it.Error() - if err != nil { - return nil, err - } - - if preload { - itr.load() - } - - return itr, nil -} - -func (i *entryIteratorForward) load() { - if !i.loaded { - i.loaded = true - for count := uint32(0); (i.limit == 0 || count < i.limit) && i.backwardIter.Next(); count++ { - i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.backwardIter.Entry(), i.backwardIter.Labels()}) - } - i.backwardIter.Close() - } -} - -func (i *entryIteratorForward) Next() bool { - i.load() - if len(i.entriesWithLabels) == 0 { - i.entriesWithLabels = nil - return false - } - i.cur, i.entriesWithLabels = i.entriesWithLabels[len(i.entriesWithLabels)-1], i.entriesWithLabels[:len(i.entriesWithLabels)-1] - return true -} - -func (i *entryIteratorForward) Entry() logproto.Entry { - return i.cur.entry -} - -func (i *entryIteratorForward) Close() error { - if !i.loaded { - return i.backwardIter.Close() - } - return nil -} - -func (i *entryIteratorForward) Error() error { return nil } - -func (i *entryIteratorForward) Labels() string { - return i.cur.labels -} - type peekingEntryIterator struct { iter EntryIterator diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 8344a90679e15..54efb3a35fc92 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -275,24 +275,43 @@ func TestInsert(t *testing.T) { })) } -func TestEntryIteratorForward(t *testing.T) { +func TestReverseEntryIterator(t *testing.T) { itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD) - forwardIterator, err := NewEntryIteratorForward(heapIterator, testSize, false) + reversedIter, err := NewReversedIter(heapIterator, testSize, false) require.NoError(t, err) for i := int64((testSize / 2) + 1); i <= testSize; i++ { - assert.Equal(t, true, forwardIterator.Next()) - assert.Equal(t, identity(i), forwardIterator.Entry(), fmt.Sprintln("iteration", i)) - assert.Equal(t, true, forwardIterator.Next()) - assert.Equal(t, identity(i), forwardIterator.Entry(), fmt.Sprintln("iteration", i)) + assert.Equal(t, true, reversedIter.Next()) + assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i)) + assert.Equal(t, reversedIter.Labels(), itr1.Labels()) + assert.Equal(t, true, reversedIter.Next()) + assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i)) + assert.Equal(t, reversedIter.Labels(), itr2.Labels()) } - assert.Equal(t, false, forwardIterator.Next()) - assert.Equal(t, nil, forwardIterator.Error()) - assert.NoError(t, forwardIterator.Close()) + assert.Equal(t, false, reversedIter.Next()) + assert.Equal(t, nil, reversedIter.Error()) + assert.NoError(t, reversedIter.Close()) +} + +func TestReverseEntryIteratorUnlimited(t *testing.T) { + itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels) + itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}") + + heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD) + reversedIter, err := NewReversedIter(heapIterator, 0, false) + require.NoError(t, err) + + var ct int + expected := 2 * testSize + + for reversedIter.Next() { + ct++ + } + require.Equal(t, expected, ct) } func Test_PeekingIterator(t *testing.T) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9c976007b246f..410e2b0ccc527 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -303,7 +303,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, return nil, err } - reversedIterator, err := iter.NewEntryIteratorForward(histIterators, req.Limit, true) + reversedIterator, err := iter.NewReversedIter(histIterators, req.Limit, true) if err != nil { return nil, err }