Skip to content

Commit

Permalink
Fixes head chunk iterator direction. (#3383)
Browse files Browse the repository at this point in the history
* Fixes head chunk iterator direction.

For backward queries, since LogQL parser we are using a heapIterator in the headchunk to re-order properly entries.
But we also reverse all iterators in the memchunk code, even the headchunk which causes reversal of already reversed entries.

This PR skips reversal of the headchunk.

Fixes #3345
Fixes #3208

This has for side effects:
-  when using replication you would not dedupe data properly anymore, since the data is not correctly ordered accross batches.
-  when using limit it would miss entries.

Signed-off-by: Cyril Tovena <[email protected]>

* Fix reversal and time filtering of headchunk.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Feb 26, 2021
1 parent 7613197 commit 55b91fc
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 30 deletions.
49 changes: 37 additions & 12 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,27 +671,33 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)
var headIterator iter.EntryIterator

for _, b := range c.blocks {
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.isEmpty() {
its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline))
headIterator = c.head.iterator(ctx, direction, mint, maxt, pipeline)
}

if direction == logproto.FORWARD {
// add the headblock iterator at the end.
if headIterator != nil {
blockItrs = append(blockItrs, headIterator)
}
return iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(its, ""),
iter.NewNonOverlappingIterator(blockItrs, ""),
time.Unix(0, mint),
time.Unix(0, maxt),
), nil
}
for i, it := range its {
// reverse each block entries
for i, it := range blockItrs {
r, err := iter.NewEntryReversedIter(
iter.NewTimeRangedIterator(it,
time.Unix(0, mint),
Expand All @@ -700,14 +706,18 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if err != nil {
return nil, err
}
its[i] = r
blockItrs[i] = r
}

for i, j := 0, len(its)-1; i < j; i, j = i+1, j-1 {
its[i], its[j] = its[j], its[i]
// except the head block which is already reversed via the heapIterator.
if headIterator != nil {
blockItrs = append(blockItrs, headIterator)
}
// then reverse all iterators.
for i, j := 0, len(blockItrs)-1; i < j; i, j = i+1, j-1 {
blockItrs[i], blockItrs[j] = blockItrs[j], blockItrs[i]
}

return iter.NewNonOverlappingIterator(its, ""), nil
return iter.NewNonOverlappingIterator(blockItrs, ""), nil
}

// Iterator implements Chunk.
Expand Down Expand Up @@ -798,11 +808,16 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
// cutting of blocks.
chunkStats.HeadChunkLines += int64(len(hb.entries))
streams := map[uint64]*logproto.Stream{}
for _, e := range hb.entries {

process := func(e entry) {
// apply time filtering
if e.t < mint || e.t >= maxt {
return
}
chunkStats.HeadChunkBytes += int64(len(e.s))
newLine, parsedLbs, ok := pipeline.ProcessString(e.s)
if !ok {
continue
return
}
var stream *logproto.Stream
lhash := parsedLbs.Hash()
Expand All @@ -816,7 +831,16 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
Timestamp: time.Unix(0, e.t),
Line: newLine,
})
}

if direction == logproto.FORWARD {
for _, e := range hb.entries {
process(e)
}
} else {
for i := len(hb.entries) - 1; i >= 0; i-- {
process(hb.entries[i])
}
}

if len(streams) == 0 {
Expand Down Expand Up @@ -867,6 +891,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,3 +1023,35 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
})
}
}

func Test_HeadIteratorReverse(t *testing.T) {
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf(`msg="%d"`, i),
}
}
var i int64
for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 {
require.NoError(t, c.Append(e))
}

assertOrder := func(t *testing.T, total int64) {
expr, err := logql.ParseLogSelector(`{app="foo"} | logfmt`)
require.NoError(t, err)
p, err := expr.Pipeline()
require.NoError(t, err)
it, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(0, i), logproto.BACKWARD, p.ForStream(labels.Labels{{Name: "app", Value: "foo"}}))
require.NoError(t, err)
for it.Next() {
total--
require.Equal(t, total, it.Entry().Timestamp.UnixNano())
}
}

assertOrder(t, i)
// let's try again without the headblock.
require.NoError(t, c.cut())
assertOrder(t, i)
}
17 changes: 8 additions & 9 deletions pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc"
Expand All @@ -21,24 +20,24 @@ func testIteratorForward(t *testing.T, iter iter.EntryIterator, from, through in
i := from
for iter.Next() {
entry := iter.Entry()
assert.Equal(t, time.Unix(i, 0), entry.Timestamp)
assert.Equal(t, fmt.Sprintf("line %d", i), entry.Line)
require.Equal(t, time.Unix(i, 0).Unix(), entry.Timestamp.Unix())
require.Equal(t, fmt.Sprintf("line %d", i), entry.Line)
i++
}
assert.Equal(t, through, i)
assert.NoError(t, iter.Error())
require.Equal(t, through, i)
require.NoError(t, iter.Error())
}

func testIteratorBackward(t *testing.T, iter iter.EntryIterator, from, through int64) {
i := through - 1
for iter.Next() {
entry := iter.Entry()
assert.Equal(t, time.Unix(i, 0), entry.Timestamp)
assert.Equal(t, fmt.Sprintf("line %d", i), entry.Line)
require.Equal(t, time.Unix(i, 0).Unix(), entry.Timestamp.Unix())
require.Equal(t, fmt.Sprintf("line %d", i), entry.Line)
i--
}
assert.Equal(t, from-1, i)
assert.NoError(t, iter.Error())
require.Equal(t, from-1, i)
require.NoError(t, iter.Error())
}

func TestIterator(t *testing.T) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return err
}
}

}

return appendErr
Expand Down Expand Up @@ -391,7 +390,6 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
series = make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)

}
}

Expand Down Expand Up @@ -529,7 +527,13 @@ func isDone(ctx context.Context) bool {
}
}

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
// QuerierQueryServer is the GRPC server stream we use to send batch of entries.
type QuerierQueryServer interface {
Context() context.Context
Send(res *logproto.QueryResponse) error
}

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit uint32) error {
ingStats := stats.GetIngesterData(ctx)
if limit == 0 {
// send all batches.
Expand Down Expand Up @@ -617,10 +621,8 @@ func (o *OnceSwitch) Trigger() {
// TriggerAnd will ensure the switch is on and run the provided function if
// the switch was not already toggled on.
func (o *OnceSwitch) TriggerAnd(fn func()) {

triggeredPrior := o.triggered.Swap(true)
if !triggeredPrior && fn != nil {
fn()
}

}
81 changes: 79 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -245,7 +247,6 @@ func Test_SeriesQuery(t *testing.T) {
require.Equal(t, tc.expectedResponse, resp.Series)
})
}

}

func entries(n int, t time.Time) []logproto.Entry {
Expand Down Expand Up @@ -333,7 +334,6 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
inst.addTailersToNewStream(newStream(nil, 0, lbs, NilMetrics))
}
})

}

func Benchmark_OnceSwitch(b *testing.B) {
Expand All @@ -359,3 +359,80 @@ func Benchmark_OnceSwitch(b *testing.B) {
wg.Wait()
}
}

func Test_Iterator(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), noopWAL{}, NilMetrics, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)

// insert data.
for i := 0; i < 10; i++ {
stream := "dispatcher"
if i%2 == 0 {
stream = "worker"
}
require.NoError(t,
instance.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)},
},
},
},
}),
)
}

// prepare iterators.
itrs, err := instance.Query(ctx,
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"} | logfmt`,
Limit: limit,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: direction,
},
},
)
require.NoError(t, err)
heapItr := iter.NewHeapIterator(ctx, itrs, direction)

// assert the order is preserved.
var res *logproto.QueryResponse
require.NoError(t,
sendBatches(ctx, heapItr,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
return nil
},
),
limit),
)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream
require.Equal(t, 1, len(res.Streams[0].Entries))
require.Equal(t, 1, len(res.Streams[1].Entries))
// sort by entries we expect 9 and 8 this is because readbatch uses a map to build the response.
// map have no order guarantee
sort.Slice(res.Streams, func(i, j int) bool {
return res.Streams[i].Entries[0].Timestamp.UnixNano() > res.Streams[j].Entries[0].Timestamp.UnixNano()
})
require.Equal(t, int64(9), res.Streams[0].Entries[0].Timestamp.UnixNano())
require.Equal(t, int64(8), res.Streams[1].Entries[0].Timestamp.UnixNano())
}

type fakeQueryServer func(*logproto.QueryResponse) error

func (f fakeQueryServer) Send(res *logproto.QueryResponse) error {
return f(res)
}
func (f fakeQueryServer) Context() context.Context { return context.TODO() }
1 change: 0 additions & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ func (s *stream) Bounds() (from, to time.Time) {
_, to = s.chunks[len(s.chunks)-1].chunk.Bounds()
}
return from, to

}

// Returns an iterator.
Expand Down
1 change: 0 additions & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func TestStreamIterator(t *testing.T) {
}
})
}

}

func Benchmark_PushStream(b *testing.B) {
Expand Down
1 change: 1 addition & 0 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ type timeRangedIterator struct {
}

// NewTimeRangedIterator returns an iterator which filters entries by time range.
// Note: Only works with iterators that go forwards.
func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator {
return &timeRangedIterator{
EntryIterator: it,
Expand Down

0 comments on commit 55b91fc

Please sign in to comment.