Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the total number of subqueries to query statistics #5397

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"queue_time", logql_stats.ConvertSecondsToNanoseconds(stats.Summary.QueueTime),
"subqueries", stats.Summary.Subqueries,
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestLogSlowQuery(t *testing.T) {
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns subqueries=0 source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
13 changes: 7 additions & 6 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,10 @@ func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration)
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines
r.Summary.ExecTime = execTime.Seconds()
if execTime != 0 {
r.Summary.BytesProcessedPerSecond =
int64(float64(r.Summary.TotalBytesProcessed) /
execTime.Seconds())
r.Summary.LinesProcessedPerSecond =
int64(float64(r.Summary.TotalLinesProcessed) /
execTime.Seconds())
r.Summary.BytesProcessedPerSecond = int64(float64(r.Summary.TotalBytesProcessed) /
execTime.Seconds())
r.Summary.LinesProcessedPerSecond = int64(float64(r.Summary.TotalLinesProcessed) /
execTime.Seconds())
}
if queueTime != 0 {
r.Summary.QueueTime = queueTime.Seconds()
Expand Down Expand Up @@ -168,7 +166,10 @@ func (i *Ingester) Merge(m Ingester) {
i.TotalReached += m.TotalReached
}

// Merge merges two results of statistics.
// This will increase the total number of Subqueries.
func (r *Result) Merge(m Result) {
r.Summary.Subqueries++
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime),
Expand Down
5 changes: 5 additions & 0 deletions pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestResult(t *testing.T) {
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
Subqueries: 1,
},
}
require.Equal(t, expected, res)
Expand Down Expand Up @@ -113,6 +114,7 @@ func TestSnapshot_JoinResults(t *testing.T) {
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
TotalLinesProcessed: int64(100),
Subqueries: 2,
},
}

Expand Down Expand Up @@ -189,6 +191,7 @@ func TestResult_Merge(t *testing.T) {
}

res.Merge(toMerge)
toMerge.Summary.Subqueries = 2
require.Equal(t, toMerge, res)

// merge again
Expand Down Expand Up @@ -232,6 +235,7 @@ func TestResult_Merge(t *testing.T) {
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: 2 * int64(84),
TotalLinesProcessed: 2 * int64(100),
Subqueries: 3,
},
}, res)
}
Expand All @@ -243,6 +247,7 @@ func TestReset(t *testing.T) {
require.NotEmpty(t, res)
statsCtx.Reset()
res = statsCtx.Result(0, 0)
res.Summary.Subqueries = 0
require.Empty(t, res)
}

Expand Down
136 changes: 89 additions & 47 deletions pkg/logqlmodel/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/logqlmodel/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ message Summary {
// In addition to internal calculations this is also returned by the HTTP API.
// Grafana expects time values to be returned in seconds as float.
double queueTime = 6 [(gogoproto.jsontag) = "queueTime"];
// Total of subqueries created to fulfill this query.
int64 subqueries = 7 [(gogoproto.jsontag) = "subqueries"];
}

message Querier {
Expand Down
Loading