Skip to content

Commit

Permalink
Regexp simplification (#1787)
Browse files Browse the repository at this point in the history
* Refactor line filter to support custom parsing for perl regexp.

Signed-off-by: Cyril Tovena <[email protected]>

* Support alternate, concat regex simplification

Signed-off-by: Cyril Tovena <[email protected]>

* Add benchmark between normal and simplified regex.

Signed-off-by: Cyril Tovena <[email protected]>

* Working through all possible concat operation.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes concat operations.

Signed-off-by: Cyril Tovena <[email protected]>

* Adds more type of capture group

Signed-off-by: Cyril Tovena <[email protected]>

* Improve test robustness

Signed-off-by: Cyril Tovena <[email protected]>

* Support for nexted concat.
git push

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes bug with anychar.

Signed-off-by: Cyril Tovena <[email protected]>

* add more not supported cases.

Signed-off-by: Cyril Tovena <[email protected]>

* Refactor the concat alternates function.

Signed-off-by: Cyril Tovena <[email protected]>

* Improve documentation and refactor code.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes all code using line filter.

Signed-off-by: Cyril Tovena <[email protected]>

* Improve tests to check nil filter.

Signed-off-by: Cyril Tovena <[email protected]>

* Ensure benchmark is not optimized by the compilation.

Signed-off-by: Cyril Tovena <[email protected]>

* Review feedback.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Mar 10, 2020
1 parent 5b1316e commit d34b0a9
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 81 deletions.
2 changes: 1 addition & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *dumbChunk) Utilization() float64 {

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ logql.LineFilter) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Blocks() int
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/lazy_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LazyChunk struct {
}

// Iterator returns an entry iterator.
func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error) {
// If the chunk is already loaded, then use that.
if c.Chunk.Data != nil {
lokiChunk := c.Chunk.Data.(*Facade).LokiChunk()
Expand Down
14 changes: 7 additions & 7 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

Expand All @@ -493,14 +493,14 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
return iter.NewReversedIter(iterForward, 0, false)
}

func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filter) iter.EntryIterator {
func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.LineFilter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator
}
return newBufferedIterator(ctx, pool, b.b, filter)
}

func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.Filter) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.LineFilter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
Expand All @@ -515,7 +515,7 @@ func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logq
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
if filter == nil || filter([]byte(e.s)) {
if filter == nil || filter.Filter([]byte(e.s)) {
entries = append(entries, e)
}
}
Expand Down Expand Up @@ -577,10 +577,10 @@ type bufferedIterator struct {

closed bool

filter logql.Filter
filter logql.LineFilter
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.LineFilter) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.CompressedBytes += int64(len(b))
return &bufferedIterator{
Expand Down Expand Up @@ -610,7 +610,7 @@ func (si *bufferedIterator) Next() bool {
// we decode always the line length and ts as varint
si.stats.DecompressedBytes += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.DecompressedLines++
if si.filter != nil && !si.filter(line) {
if si.filter != nil && !si.filter.Filter(line) {
continue
}
si.cur.Line = string(line)
Expand Down
5 changes: 3 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc/testdata"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

var testEncoding = []Encoding{
Expand Down Expand Up @@ -504,9 +505,9 @@ func TestGenerateDataSize(t *testing.T) {
bytesRead := uint64(0)
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool {
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, logql.LineFilterFunc(func(line []byte) bool {
return true // return all
})
}))
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t
}

// Returns an iterator.
func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
itr, err := c.chunk.Iterator(ctx, from, through, direction, filter)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type tailer struct {
id uint32
orgID string
matchers []*labels.Matcher
filter logql.Filter
filter logql.LineFilter
expr logql.Expr

sendChan chan *logproto.Stream
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t *tailer) filterEntriesInStream(stream *logproto.Stream) {

var filteredEntries []logproto.Entry
for _, e := range stream.Entries {
if t.filter([]byte(e.Line)) {
if t.filter.Filter([]byte(e.Line)) {
filteredEntries = append(filteredEntries, e)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

func TestTransferOut(t *testing.T) {
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestTransferOut(t *testing.T) {
time.Unix(0, 0),
time.Unix(10, 0),
logproto.FORWARD,
func([]byte) bool { return true },
logql.LineFilterFunc(func([]byte) bool { return true }),
)
if !assert.NoError(t, err) {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
matchers []*labels.Matcher
filter logql.Filter
filter logql.LineFilter
pipeline Stage
action string
}
Expand All @@ -122,7 +122,7 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter
return
}
}
if m.filter == nil || m.filter([]byte(*entry)) {
if m.filter == nil || m.filter.Filter([]byte(*entry)) {
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
Expand Down
57 changes: 10 additions & 47 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package logql

import (
"bytes"
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -48,12 +46,9 @@ type Querier interface {
Select(context.Context, SelectParams) (iter.EntryIterator, error)
}

// Filter is a function to filter logs.
type Filter func(line []byte) bool

// LogSelectorExpr is a LogQL expression filtering and returning logs.
type LogSelectorExpr interface {
Filter() (Filter, error)
Filter() (LineFilter, error)
Matchers() []*labels.Matcher
Expr
}
Expand Down Expand Up @@ -83,7 +78,7 @@ func (e *matchersExpr) String() string {
return sb.String()
}

func (e *matchersExpr) Filter() (Filter, error) {
func (e *matchersExpr) Filter() (LineFilter, error) {
return nil, nil
}

Expand Down Expand Up @@ -126,49 +121,17 @@ func (e *filterExpr) String() string {
return sb.String()
}

func (e *filterExpr) Filter() (Filter, error) {
var f func([]byte) bool
switch e.ty {
case labels.MatchRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = re.Match

case labels.MatchNotRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = func(line []byte) bool {
return !re.Match(line)
}

case labels.MatchEqual:
mb := []byte(e.match)
f = func(line []byte) bool {
return bytes.Contains(line, mb)
}

case labels.MatchNotEqual:
mb := []byte(e.match)
f = func(line []byte) bool {
return !bytes.Contains(line, mb)
}

default:
return nil, fmt.Errorf("unknown matcher: %v", e.match)
func (e *filterExpr) Filter() (LineFilter, error) {
f, err := newFilter(e.match, e.ty)
if err != nil {
return nil, err
}
next, ok := e.left.(*filterExpr)
if ok {
nextFilter, err := next.Filter()
if nextExpr, ok := e.left.(*filterExpr); ok {
nextFilter, err := nextExpr.Filter()
if err != nil {
return nil, err
}
return func(line []byte) bool {
return nextFilter(line) && f(line)
}, nil
return newAndFilter(nextFilter, f), nil
}
return f, nil
}
Expand Down Expand Up @@ -457,7 +420,7 @@ func (e *literalExpr) String() string {
// to facilitate sum types. We'll be type switching when evaluating them anyways
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) Filter() (Filter, error) { return nil, nil }
func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }

// helper used to impl Stringer for vector and range aggregations
Expand Down
37 changes: 26 additions & 11 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,31 @@ import (

func Test_logSelectorExpr_String(t *testing.T) {
t.Parallel()
tests := []string{
`{foo!~"bar"}`,
`{foo="bar", bar!="baz"}`,
`{foo="bar", bar!="baz"} != "bip" !~ ".+bop"`,
`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`,
tests := []struct {
selector string
expectFilter bool
}{
{`{foo!~"bar"}`, false},
{`{foo="bar", bar!="baz"}`, false},
{`{foo="bar", bar!="baz"} != "bip" !~ ".+bop"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, true},
}

for _, tt := range tests {
tt := tt
t.Run(tt, func(t *testing.T) {
t.Run(tt.selector, func(t *testing.T) {
t.Parallel()
expr, err := ParseLogSelector(tt)
expr, err := ParseLogSelector(tt.selector)
if err != nil {
t.Fatalf("failed to parse log selector: %s", err)
}
if expr.String() != strings.Replace(tt, " ", "", -1) {
t.Fatalf("error expected: %s got: %s", tt, expr.String())
f, err := expr.Filter()
if err != nil {
t.Fatalf("failed to get filter: %s", err)
}
require.Equal(t, tt.expectFilter, f != nil)
if expr.String() != strings.Replace(tt.selector, " ", "", -1) {
t.Fatalf("error expected: %s got: %s", tt.selector, expr.String())
}
})
}
Expand Down Expand Up @@ -122,6 +130,13 @@ func Test_FilterMatcher(t *testing.T) {
},
[]linecheck{{"foo", false}, {"bar", false}, {"foobar", true}},
},
{
`{app="foo"} |~ "foo"`,
[]*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "app", "foo"),
},
[]linecheck{{"foo", true}, {"bar", false}, {"foobar", true}},
},
} {
tt := tt
t.Run(tt.q, func(t *testing.T) {
Expand All @@ -135,7 +150,7 @@ func Test_FilterMatcher(t *testing.T) {
assert.Nil(t, f)
} else {
for _, lc := range tt.lines {
assert.Equal(t, lc.e, f([]byte(lc.l)))
assert.Equal(t, lc.e, f.Filter([]byte(lc.l)))
}
}
})
Expand All @@ -158,7 +173,7 @@ func BenchmarkContainsFilter(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
if !f(line) {
if !f.Filter(line) {
b.Fatal("doesn't match")
}
}
Expand Down
Loading

0 comments on commit d34b0a9

Please sign in to comment.