Skip to content

Commit

Permalink
LogQL: Introduce distinct (#8662)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduces `distinct` to LogQL.
Usage:
`{job="varlogs"} | distinct filename`
It is similar to `distinct` in SQL and especially useful for sampling.
Similar syntax exists in other log query languages.

**Example**
For the following log lines:
```json
{"event": "access", "id": "1", "time": "2023-02-28 15:12:11"}
{"event": "access", "id": "1", "time": "2023-02-28 15:13:11"}
{"event": "access", "id": "2", "time": "2023-02-28 15:14:11"}
{"event": "access", "id": "2", "time": "2023-02-28 15:15:11"}
```

The query below:
```logql
{app="order"} | json | distinct id
```

Will return:
```json
{"event": "access", `"id": "1",` "time": "2023-02-28 15:13:11"}
{"event": "access", `"id": "2", `"time": "2023-02-28 15:15:11"}
```

**Example with multiple labels**
```logql
{app="order"} | json | distinct id,time
```

**Which issue(s) this PR fixes**:
Fixes #8649 

---------

Co-authored-by: J Stickler <[email protected]>
Co-authored-by: Dylan Guedes <[email protected]>
Co-authored-by: Karsten Jeschkies <[email protected]>
  • Loading branch information
4 people authored Apr 28, 2023
1 parent 411f384 commit 38b298c
Show file tree
Hide file tree
Showing 10 changed files with 832 additions and 413 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
* [7754](https://github.com/grafana/loki/pull/7754) **ashwanthgoli** index-shipper: add support for multiple stores.
* [6675](https://github.com/grafana/loki/pull/6675) **btaani**: Add logfmt expression parser for selective extraction of labels from logfmt formatted logs
* [8474](https://github.com/grafana/loki/pull/8474) **farodin91**: Add support for short-lived S3 session tokens
* [8662](https://github.com/grafana/loki/pull/8662) **liguozhong**: LogQL: Introduce `distinct`
* [8774](https://github.com/grafana/loki/pull/8774) **slim-bean**: Add new logql template functions `bytes`, `duration`, `unixEpochMillis`, `unixEpochNanos`, `toDateInZone`, `b64Enc`, and `b64Dec`

##### Fixes
Expand Down
16 changes: 16 additions & 0 deletions docs/sources/logql/log_queries/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ To evaluate the logical `and` first, use parenthesis, as in this example:
Label filter expressions have support matching IP addresses. See [Matching IP addresses]({{<relref "../ip">}}) for details.

### Distinct filter expression

Distinct filter expression allows filtering log lines using their original and extracted labels to filter out duplicate label values. The first line occurrence of a distinct value is returned, and the others are dropped.

For example, for the following log lines:
```log
{"event": "access", "id": "1", "time": "2023-02-28 15:12:11"}
{"event": "access", "id": "1", "time": "2023-02-28 15:13:11"}
{"event": "access", "id": "2", "time": "2023-02-28 15:14:11"}
{"event": "access", "id": "2", "time": "2023-02-28 15:15:11"}
```
The expression `{app="order"} | json | distinct id` will return the distinct occurrences of `id`:
```log
{"event": "access", "id": "1", "time": "2023-02-28 15:13:11"}
{"event": "access", "id": "2", "time": "2023-02-28 15:15:11"}
```
### Parser expression

Parser expression can parse and extract labels from the log content. Those extracted labels can then be used for filtering using [label filter expressions](#label-filter-expression) or for [metric aggregations]({{<relref "../metric_queries">}}).
Expand Down
37 changes: 37 additions & 0 deletions pkg/logql/log/distinct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package log

type distinctFilter struct {
datas map[string]map[string]int
labels []string
}

func NewDistinctFilter(labels []string) (Stage, error) {
datas := make(map[string]map[string]int, 0)
for _, label := range labels {
datas[label] = make(map[string]int, 0)
}
return &distinctFilter{
labels: labels,
datas: datas,
}, nil
}
func (r *distinctFilter) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) {
keep := false
for _, label := range r.labels {
val, ok := lbs.Get(label)
if !ok {
return line, true
}
_, ok = r.datas[label][val]
if ok {
return line, false
}
r.datas[label][val] = 1
keep = true
}
return line, keep
}

func (r *distinctFilter) RequiredLabelNames() []string {
return []string{}
}
62 changes: 62 additions & 0 deletions pkg/logql/log/distinct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package log

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logqlmodel"
)

func Test_DistinctFilter(t *testing.T) {

c := struct {
name string
label []string
lbs labels.Labels
input []string
expectedCount int
expectedLines []string
}{
name: "distinct test",
label: []string{"id", "time", "none"},
lbs: labels.Labels{
{Name: logqlmodel.ErrorLabel, Value: errJSON},
{Name: "status", Value: "200"},
{Name: "method", Value: "POST"},
},
input: []string{
`{"event": "access", "id": "1", "time": "1"}`,
`{"event": "access", "id": "1", "time": "2"}`,
`{"event": "access", "id": "2", "time": "3"}`,
`{"event": "access", "id": "2", "time": "4"}`,
`{"event": "access", "id": "1", "time": "5"}`,
`{"event": "delete", "id": "1", "time": "1"}`,
},
expectedCount: 2,
expectedLines: []string{
`{"event": "access", "id": "1", "time": "1"}`,
`{"event": "access", "id": "2", "time": "3"}`,
},
}

distinctFilter, err := NewDistinctFilter(c.label)
require.NoError(t, err)

total := 0
lines := make([]string, 0)
b := NewBaseLabelsBuilder().ForLabels(c.lbs, c.lbs.Hash())
for _, line := range c.input {
NewJSONParser().Process(1, []byte(line), b)
_, ok := distinctFilter.Process(1, []byte(line), b)
if ok {
total++
lines = append(lines, line)
}
}

require.Equal(t, c.expectedCount, total)
require.Equal(t, c.expectedLines, lines)

}
35 changes: 34 additions & 1 deletion pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (e *PipelineExpr) Pipeline() (log.Pipeline, error) {
func (e *PipelineExpr) HasFilter() bool {
for _, p := range e.MultiStages {
switch p.(type) {
case *LineFilterExpr, *LabelFilterExpr:
case *LineFilterExpr, *LabelFilterExpr, *DistinctFilterExpr:
return true
default:
continue
Expand Down Expand Up @@ -631,6 +631,37 @@ func (j *JSONExpressionParser) String() string {
return sb.String()
}

type DistinctFilterExpr struct {
labels []string
implicit
}

func newDistinctFilterExpr(labels []string) *DistinctFilterExpr {
return &DistinctFilterExpr{
labels: labels,
}
}

func (e *DistinctFilterExpr) Shardable() bool { return false }

func (e *DistinctFilterExpr) Walk(f WalkFn) { f(e) }

func (e *DistinctFilterExpr) Stage() (log.Stage, error) {
return log.NewDistinctFilter(e.labels)
}

func (e *DistinctFilterExpr) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s %s ", OpPipe, OpFilterDistinct))
for i, label := range e.labels {
sb.WriteString(label)
if i+1 != len(e.labels) {
sb.WriteString(",")
}
}
return sb.String()
}

type internedStringSet map[string]struct {
s string
ok bool
Expand Down Expand Up @@ -904,6 +935,8 @@ const (
// function filters
OpFilterIP = "ip"

OpFilterDistinct = "distinct"

// drop labels
OpDrop = "drop"
)
Expand Down
9 changes: 9 additions & 0 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func Test_logSelectorExpr_String(t *testing.T) {
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | logfmt | b=ip("127.0.0.1") | level="error" | c=ip("::1")`, true}, // chain inside label filters.
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)" | ( ( foo<5.01 , bar>20ms ) or foo="bar" ) | line_format "blip{{.boop}}bap" | label_format foo=bar,bar="blip{{.blop}}"`, true},
{`{foo="bar"} | distinct id`, true},
{`{foo="bar"} | distinct id,time`, true},
}

for _, tt := range tests {
Expand Down Expand Up @@ -372,6 +374,13 @@ func Test_FilterMatcher(t *testing.T) {
},
[]linecheck{{"duration=5m total_bytes=5kB", true}, {"duration=1s total_bytes=256B", false}, {"duration=0s", false}},
},
{
`{app="foo"} | logfmt | distinct id`,
[]*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "app", "foo"),
},
[]linecheck{{"id=foo", true}, {"id=foo", false}, {"id=bar", true}},
},
} {
tt := tt
t.Run(tt.q, func(t *testing.T) {
Expand Down
23 changes: 18 additions & 5 deletions pkg/logql/syntax/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
LabelParser *LabelParserExpr
LineFilters *LineFilterExpr
LineFilter *LineFilterExpr
DistinctLabel []string
DistinctFilter *DistinctFilterExpr
PipelineExpr MultiStageExpr
PipelineStage StageExpr
BytesFilter log.LabelFilterer
Expand All @@ -64,7 +66,7 @@ import (
OffsetExpr *OffsetExpr
DropLabel log.DropLabel
DropLabels []log.DropLabel
DropLabelsExpr *DropLabelsExpr
DropLabelsExpr *DropLabelsExpr
}

%start root
Expand Down Expand Up @@ -102,6 +104,8 @@ import (
%type <LabelFilter> labelFilter
%type <LineFilters> lineFilters
%type <LineFilter> lineFilter
%type <DistinctFilter> distinctFilter
%type <DistinctLabel> distinctLabel
%type <LineFormatExpr> lineFormatExpr
%type <DecolorizeExpr> decolorizeExpr
%type <DropLabelsExpr> dropLabelsExpr
Expand All @@ -124,7 +128,7 @@ import (
%token <duration> DURATION RANGE
%token <val> MATCHERS LABELS EQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE RATE_COUNTER SUM SORT SORT_DESC AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
BYTES_OVER_TIME BYTES_RATE BOOL JSON DISTINCT REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
FIRST_OVER_TIME LAST_OVER_TIME ABSENT_OVER_TIME VECTOR LABEL_REPLACE UNPACK OFFSET PATTERN IP ON IGNORING GROUP_LEFT GROUP_RIGHT
DECOLORIZE DROP
Expand Down Expand Up @@ -266,7 +270,8 @@ pipelineStage:
| PIPE decolorizeExpr { $$ = $2 }
| PIPE labelFormatExpr { $$ = $2 }
| PIPE dropLabelsExpr { $$ = $2 }
;
| PIPE distinctFilter { $$ = $2 }
;

filterOp:
IP { $$ = OpFilterIP }
Expand All @@ -292,7 +297,7 @@ labelParser:

jsonExpressionParser:
JSON labelExtractionExpressionList { $$ = newJSONExpressionParser($2) }

logfmtExpressionParser:
LOGFMT labelExtractionExpressionList { $$ = newLogfmtExpressionParser($2)}

Expand All @@ -314,6 +319,14 @@ labelsFormat:
labelFormatExpr:
LABEL_FMT labelsFormat { $$ = newLabelFmtExpr($2) };

distinctLabel:
IDENTIFIER { $$ = []string{ $1 } }
| distinctLabel COMMA IDENTIFIER { $$ = append($1, $3) }
;

distinctFilter:
DISTINCT distinctLabel { $$ = newDistinctFilterExpr($2) };

labelFilter:
matcher { $$ = log.NewStringLabelFilter($1) }
| ipLabelFilter { $$ = $1 }
Expand Down Expand Up @@ -374,7 +387,7 @@ numberFilter:
| IDENTIFIER CMP_EQ NUMBER { $$ = log.NewNumericLabelFilter(log.LabelFilterEqual, $1, mustNewFloat($3))}
;

dropLabel:
dropLabel:
IDENTIFIER { $$ = log.NewDropLabel(nil, $1) }
| matcher { $$ = log.NewDropLabel($1, "") }

Expand Down
Loading

0 comments on commit 38b298c

Please sign in to comment.