From a35c4b05565efe401da3e57279fed361597dd958 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 16:56:31 +0100 Subject: [PATCH 1/2] Add subqueries count Signed-off-by: Cyril Tovena --- pkg/logqlmodel/stats/context.go | 1 + pkg/logqlmodel/stats/context_test.go | 5 + pkg/logqlmodel/stats/stats.pb.go | 136 ++++++++++++++++++--------- pkg/logqlmodel/stats/stats.proto | 2 + 4 files changed, 97 insertions(+), 47 deletions(-) diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index c666eadb1d751..d82ca90ff048b 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -169,6 +169,7 @@ func (i *Ingester) Merge(m Ingester) { } func (r *Result) Merge(m Result) { + r.Summary.Subqueries++ r.Querier.Merge(m.Querier) r.Ingester.Merge(m.Ingester) r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime), diff --git a/pkg/logqlmodel/stats/context_test.go b/pkg/logqlmodel/stats/context_test.go index 5f4e3fc6cbaff..1f4af90e92517 100644 --- a/pkg/logqlmodel/stats/context_test.go +++ b/pkg/logqlmodel/stats/context_test.go @@ -67,6 +67,7 @@ func TestResult(t *testing.T) { LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), TotalLinesProcessed: int64(100), + Subqueries: 1, }, } require.Equal(t, expected, res) @@ -113,6 +114,7 @@ func TestSnapshot_JoinResults(t *testing.T) { LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), TotalLinesProcessed: int64(100), + Subqueries: 2, }, } @@ -189,6 +191,7 @@ func TestResult_Merge(t *testing.T) { } res.Merge(toMerge) + toMerge.Summary.Subqueries = 2 require.Equal(t, toMerge, res) // merge again @@ -232,6 +235,7 @@ func TestResult_Merge(t *testing.T) { LinesProcessedPerSecond: int64(50), TotalBytesProcessed: 2 * int64(84), TotalLinesProcessed: 2 * int64(100), + Subqueries: 3, }, }, res) } @@ -243,6 +247,7 @@ func TestReset(t *testing.T) { require.NotEmpty(t, res) statsCtx.Reset() res = statsCtx.Result(0, 0) + res.Summary.Subqueries = 0 require.Empty(t, res) } diff --git a/pkg/logqlmodel/stats/stats.pb.go b/pkg/logqlmodel/stats/stats.pb.go index d99bda0dcb20e..7a8f0fdf44b04 100644 --- a/pkg/logqlmodel/stats/stats.pb.go +++ b/pkg/logqlmodel/stats/stats.pb.go @@ -104,6 +104,8 @@ type Summary struct { // In addition to internal calculations this is also returned by the HTTP API. // Grafana expects time values to be returned in seconds as float. QueueTime float64 `protobuf:"fixed64,6,opt,name=queueTime,proto3" json:"queueTime"` + // Total of subqueries created to fulfill this query. + Subqueries int64 `protobuf:"varint,7,opt,name=subqueries,proto3" json:"subqueries"` } func (m *Summary) Reset() { *m = Summary{} } @@ -180,6 +182,13 @@ func (m *Summary) GetQueueTime() float64 { return 0 } +func (m *Summary) GetSubqueries() int64 { + if m != nil { + return m.Subqueries + } + return 0 +} + type Querier struct { Store Store `protobuf:"bytes,1,opt,name=store,proto3" json:"store"` } @@ -473,52 +482,53 @@ func init() { func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) } var fileDescriptor_6cdfe5d2aea33ebb = []byte{ - // 714 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x3d, 0x6f, 0xd3, 0x5c, - 0x14, 0x8e, 0x93, 0xd7, 0x49, 0x7a, 0xdf, 0x7e, 0x71, 0xab, 0xd2, 0x00, 0x92, 0x5d, 0x65, 0xaa, - 0x04, 0x34, 0xe2, 0x63, 0x01, 0xd1, 0xc5, 0xad, 0x90, 0x2a, 0x81, 0x28, 0xa7, 0xb0, 0xb0, 0x39, - 0xce, 0x6d, 0x62, 0xd5, 0xf1, 0x4d, 0xfd, 0x21, 0xe8, 0xc6, 0xc6, 0x08, 0x3f, 0x82, 0x81, 0x85, - 0x9f, 0xc0, 0xde, 0xb1, 0x63, 0x27, 0x8b, 0xba, 0x0b, 0xf2, 0xd4, 0x9f, 0x80, 0x7c, 0xae, 0x63, - 0xd7, 0x37, 0x8e, 0xc4, 0x92, 0x9c, 0xf3, 0x3c, 0xe7, 0x39, 0xe7, 0xfa, 0x3c, 0xd7, 0x32, 0xd9, - 0x9c, 0x1c, 0x0f, 0x7b, 0x0e, 0x1f, 0x9e, 0x38, 0x63, 0x3e, 0x60, 0x4e, 0xcf, 0x0f, 0xcc, 0xc0, - 0x17, 0xbf, 0xdb, 0x13, 0x8f, 0x07, 0x9c, 0xaa, 0x98, 0xdc, 0x7d, 0x38, 0xb4, 0x83, 0x51, 0xd8, - 0xdf, 0xb6, 0xf8, 0xb8, 0x37, 0xe4, 0x43, 0xde, 0x43, 0xb6, 0x1f, 0x1e, 0x61, 0x86, 0x09, 0x46, - 0x42, 0xd5, 0xfd, 0xa5, 0x90, 0x26, 0x30, 0x3f, 0x74, 0x02, 0xfa, 0x8c, 0xb4, 0xfc, 0x70, 0x3c, - 0x36, 0xbd, 0xd3, 0x8e, 0xb2, 0xa9, 0x6c, 0xfd, 0xff, 0x78, 0x79, 0x5b, 0xf4, 0x3f, 0x14, 0xa8, - 0xb1, 0x72, 0x16, 0xe9, 0xb5, 0x24, 0xd2, 0xa7, 0x65, 0x30, 0x0d, 0x52, 0xe9, 0x49, 0xc8, 0x3c, - 0x9b, 0x79, 0x9d, 0x7a, 0x49, 0xfa, 0x56, 0xa0, 0x85, 0x34, 0x2b, 0x83, 0x69, 0x40, 0x77, 0x48, - 0xdb, 0x76, 0x87, 0xcc, 0x0f, 0x98, 0xd7, 0x69, 0xa0, 0x76, 0x25, 0xd3, 0xee, 0x67, 0xb0, 0xb1, - 0x9a, 0x89, 0xf3, 0x42, 0xc8, 0xa3, 0xee, 0xf7, 0x06, 0x69, 0x65, 0xe7, 0xa3, 0xef, 0xc9, 0x46, - 0xff, 0x34, 0x60, 0xfe, 0x81, 0xc7, 0x2d, 0xe6, 0xfb, 0x6c, 0x70, 0xc0, 0xbc, 0x43, 0x66, 0x71, - 0x77, 0x80, 0x0f, 0xd4, 0x30, 0xee, 0x25, 0x91, 0x3e, 0xaf, 0x04, 0xe6, 0x11, 0x69, 0x5b, 0xc7, - 0x76, 0x2b, 0xdb, 0xd6, 0x8b, 0xb6, 0x73, 0x4a, 0x60, 0x1e, 0x41, 0xf7, 0xc9, 0x5a, 0xc0, 0x03, - 0xd3, 0x31, 0x4a, 0x63, 0x71, 0x07, 0x0d, 0x63, 0x23, 0x89, 0xf4, 0x2a, 0x1a, 0xaa, 0xc0, 0xbc, - 0xd5, 0xab, 0xd2, 0xa8, 0xce, 0x7f, 0x52, 0xab, 0x32, 0x0d, 0x55, 0x20, 0xdd, 0x22, 0x6d, 0xf6, - 0x89, 0x59, 0xef, 0xec, 0x31, 0xeb, 0xa8, 0x9b, 0xca, 0x96, 0x62, 0x2c, 0xa6, 0x9b, 0x9f, 0x62, - 0x90, 0x47, 0xf4, 0x3e, 0x59, 0x38, 0x09, 0x59, 0xc8, 0xb0, 0xb4, 0x89, 0xa5, 0x4b, 0x49, 0xa4, - 0x17, 0x20, 0x14, 0x61, 0xf7, 0x05, 0x69, 0x65, 0x57, 0x81, 0x3e, 0x22, 0xaa, 0x1f, 0x70, 0x8f, - 0x65, 0x97, 0x6c, 0x71, 0x7a, 0xc9, 0x52, 0xcc, 0x58, 0xca, 0xac, 0x16, 0x25, 0x20, 0xfe, 0xba, - 0x3f, 0xeb, 0xa4, 0x3d, 0xbd, 0x0d, 0xf4, 0x29, 0x59, 0xc4, 0x83, 0x03, 0x33, 0xad, 0x11, 0x13, - 0xd6, 0xaa, 0xc6, 0x6a, 0x12, 0xe9, 0x25, 0x1c, 0x4a, 0x19, 0x7d, 0x49, 0x28, 0xe6, 0xbb, 0xa3, - 0xd0, 0x3d, 0xf6, 0x5f, 0x9b, 0x01, 0x6a, 0x85, 0x7f, 0xb7, 0x93, 0x48, 0xaf, 0x60, 0xa1, 0x02, - 0xcb, 0xa7, 0x1b, 0x98, 0xfb, 0x99, 0x5d, 0xc5, 0xf4, 0x0c, 0x87, 0x52, 0x46, 0x9f, 0x93, 0xe5, - 0x62, 0xd9, 0x87, 0xcc, 0x0d, 0x32, 0x6f, 0x68, 0x12, 0xe9, 0x12, 0x03, 0x52, 0x5e, 0xec, 0x4b, - 0xfd, 0xe7, 0x7d, 0x7d, 0xad, 0x13, 0x15, 0xf9, 0x7c, 0xb0, 0x78, 0x08, 0x60, 0x47, 0xd9, 0x9b, - 0x50, 0x0c, 0xce, 0x19, 0x90, 0x72, 0xfa, 0x86, 0xac, 0xdf, 0x40, 0xf6, 0xf8, 0x47, 0xd7, 0xe1, - 0xe6, 0x20, 0xdf, 0xda, 0x9d, 0x24, 0xd2, 0xab, 0x0b, 0xa0, 0x1a, 0x4e, 0x3d, 0xb0, 0x4a, 0x18, - 0x5e, 0x9d, 0x46, 0xe1, 0xc1, 0x2c, 0x0b, 0x15, 0x58, 0xba, 0x11, 0x44, 0x71, 0x89, 0xc5, 0x46, - 0x70, 0x5e, 0xb1, 0x11, 0x2c, 0x01, 0xf1, 0xd7, 0xfd, 0xd2, 0x20, 0x2a, 0xf2, 0xe9, 0x46, 0x46, - 0xcc, 0x1c, 0x88, 0xe2, 0xf4, 0x35, 0xba, 0x69, 0x45, 0x99, 0x01, 0x29, 0x2f, 0x69, 0xd1, 0x20, - 0xf4, 0x44, 0xd6, 0x22, 0x03, 0x52, 0x4e, 0x77, 0xc9, 0xad, 0x01, 0xb3, 0xf8, 0x78, 0xe2, 0xe1, - 0x8b, 0x26, 0x46, 0x37, 0x51, 0xbe, 0x9e, 0x44, 0xfa, 0x2c, 0x09, 0xb3, 0x90, 0xdc, 0x44, 0x9c, - 0xa1, 0x55, 0xdd, 0x44, 0x1c, 0x63, 0x16, 0xa2, 0x3b, 0x64, 0x45, 0x3e, 0x47, 0x1b, 0x5b, 0xac, - 0x25, 0x91, 0x2e, 0x53, 0x20, 0x03, 0xa9, 0x1c, 0xed, 0xdd, 0x0b, 0x27, 0x8e, 0x6d, 0x99, 0xa9, - 0x7c, 0xa1, 0x90, 0x4b, 0x14, 0xc8, 0x80, 0xd1, 0x3f, 0xbf, 0xd4, 0x6a, 0x17, 0x97, 0x5a, 0xed, - 0xfa, 0x52, 0x53, 0x3e, 0xc7, 0x9a, 0xf2, 0x23, 0xd6, 0x94, 0xb3, 0x58, 0x53, 0xce, 0x63, 0x4d, - 0xf9, 0x1d, 0x6b, 0xca, 0x9f, 0x58, 0xab, 0x5d, 0xc7, 0x9a, 0xf2, 0xed, 0x4a, 0xab, 0x9d, 0x5f, - 0x69, 0xb5, 0x8b, 0x2b, 0xad, 0xf6, 0xe1, 0xc1, 0xcd, 0xaf, 0x9a, 0x67, 0x1e, 0x99, 0xae, 0xd9, - 0x73, 0xf8, 0xb1, 0xdd, 0xab, 0xfa, 0x2c, 0xf6, 0x9b, 0xf8, 0x6d, 0x7b, 0xf2, 0x37, 0x00, 0x00, - 0xff, 0xff, 0xf4, 0x8b, 0xe3, 0xd1, 0x35, 0x07, 0x00, 0x00, + // 734 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4d, 0x6f, 0xd3, 0x4c, + 0x10, 0xb6, 0x93, 0xd7, 0x49, 0xba, 0x6f, 0xbf, 0xd8, 0xaa, 0x34, 0x80, 0x64, 0x57, 0x39, 0x55, + 0x02, 0x12, 0xf1, 0x71, 0x01, 0xd1, 0x8b, 0x5b, 0x21, 0x55, 0x02, 0x51, 0xa6, 0x70, 0xe1, 0xe6, + 0x38, 0xdb, 0xc4, 0xaa, 0xe3, 0x4d, 0xfd, 0x21, 0xe8, 0x8d, 0x1b, 0x47, 0xf8, 0x19, 0x5c, 0xf8, + 0x09, 0xdc, 0x7b, 0xec, 0x81, 0x43, 0x4f, 0x16, 0x75, 0x2f, 0xc8, 0xa7, 0xfe, 0x04, 0xe4, 0x59, + 0xc7, 0x8e, 0x1d, 0x47, 0xe2, 0x92, 0xcc, 0x3c, 0xcf, 0x3c, 0x33, 0xeb, 0x99, 0x59, 0x2d, 0xd9, + 0x9e, 0x9c, 0x0c, 0x7b, 0x36, 0x1f, 0x9e, 0xda, 0x63, 0x3e, 0x60, 0x76, 0xcf, 0xf3, 0x0d, 0xdf, + 0x13, 0xbf, 0xdd, 0x89, 0xcb, 0x7d, 0x4e, 0x15, 0x74, 0xee, 0x3e, 0x1c, 0x5a, 0xfe, 0x28, 0xe8, + 0x77, 0x4d, 0x3e, 0xee, 0x0d, 0xf9, 0x90, 0xf7, 0x90, 0xed, 0x07, 0xc7, 0xe8, 0xa1, 0x83, 0x96, + 0x50, 0x75, 0x7e, 0xca, 0xa4, 0x01, 0xcc, 0x0b, 0x6c, 0x9f, 0x3e, 0x23, 0x4d, 0x2f, 0x18, 0x8f, + 0x0d, 0xf7, 0xac, 0x2d, 0x6f, 0xcb, 0x3b, 0xff, 0x3f, 0x5e, 0xed, 0x8a, 0xfc, 0x47, 0x02, 0xd5, + 0xd7, 0xce, 0x43, 0x4d, 0x8a, 0x43, 0x6d, 0x1a, 0x06, 0x53, 0x23, 0x91, 0x9e, 0x06, 0xcc, 0xb5, + 0x98, 0xdb, 0xae, 0x15, 0xa4, 0x6f, 0x05, 0x9a, 0x4b, 0xd3, 0x30, 0x98, 0x1a, 0x74, 0x97, 0xb4, + 0x2c, 0x67, 0xc8, 0x3c, 0x9f, 0xb9, 0xed, 0x3a, 0x6a, 0xd7, 0x52, 0xed, 0x41, 0x0a, 0xeb, 0xeb, + 0xa9, 0x38, 0x0b, 0x84, 0xcc, 0xea, 0xfc, 0xaa, 0x93, 0x66, 0x7a, 0x3e, 0xfa, 0x9e, 0x6c, 0xf5, + 0xcf, 0x7c, 0xe6, 0x1d, 0xba, 0xdc, 0x64, 0x9e, 0xc7, 0x06, 0x87, 0xcc, 0x3d, 0x62, 0x26, 0x77, + 0x06, 0xf8, 0x41, 0x75, 0xfd, 0x5e, 0x1c, 0x6a, 0x8b, 0x42, 0x60, 0x11, 0x91, 0xa4, 0xb5, 0x2d, + 0xa7, 0x32, 0x6d, 0x2d, 0x4f, 0xbb, 0x20, 0x04, 0x16, 0x11, 0xf4, 0x80, 0x6c, 0xf8, 0xdc, 0x37, + 0x6c, 0xbd, 0x50, 0x16, 0x7b, 0x50, 0xd7, 0xb7, 0xe2, 0x50, 0xab, 0xa2, 0xa1, 0x0a, 0xcc, 0x52, + 0xbd, 0x2a, 0x94, 0x6a, 0xff, 0x57, 0x4a, 0x55, 0xa4, 0xa1, 0x0a, 0xa4, 0x3b, 0xa4, 0xc5, 0x3e, + 0x31, 0xf3, 0x9d, 0x35, 0x66, 0x6d, 0x65, 0x5b, 0xde, 0x91, 0xf5, 0xe5, 0xa4, 0xf3, 0x53, 0x0c, + 0x32, 0x8b, 0xde, 0x27, 0x4b, 0xa7, 0x01, 0x0b, 0x18, 0x86, 0x36, 0x30, 0x74, 0x25, 0x0e, 0xb5, + 0x1c, 0x84, 0xdc, 0xa4, 0x5d, 0x42, 0xbc, 0xa0, 0x2f, 0x66, 0xee, 0xb5, 0x9b, 0x78, 0xb0, 0xd5, + 0x38, 0xd4, 0x66, 0x50, 0x98, 0xb1, 0x3b, 0x2f, 0x48, 0x33, 0x5d, 0x1d, 0xfa, 0x88, 0x28, 0x9e, + 0xcf, 0x5d, 0x96, 0x2e, 0xe5, 0xf2, 0x74, 0x29, 0x13, 0x4c, 0x5f, 0x49, 0x57, 0x43, 0x84, 0x80, + 0xf8, 0xeb, 0xfc, 0xa8, 0x91, 0xd6, 0x74, 0x7b, 0xe8, 0x53, 0xb2, 0x8c, 0x1f, 0x0a, 0xcc, 0x30, + 0x47, 0x4c, 0xac, 0x82, 0xa2, 0xaf, 0xc7, 0xa1, 0x56, 0xc0, 0xa1, 0xe0, 0xd1, 0x97, 0x84, 0xa2, + 0xbf, 0x37, 0x0a, 0x9c, 0x13, 0xef, 0xb5, 0xe1, 0xa3, 0x56, 0xcc, 0xfb, 0x76, 0x1c, 0x6a, 0x15, + 0x2c, 0x54, 0x60, 0x59, 0x75, 0x1d, 0x7d, 0x2f, 0x1d, 0x6f, 0x5e, 0x3d, 0xc5, 0xa1, 0xe0, 0xd1, + 0xe7, 0x64, 0x35, 0x1f, 0xce, 0x11, 0x73, 0xfc, 0x74, 0x96, 0x34, 0x0e, 0xb5, 0x12, 0x03, 0x25, + 0x3f, 0xef, 0x97, 0xf2, 0xcf, 0xfd, 0xfa, 0x5a, 0x23, 0x0a, 0xf2, 0x59, 0x61, 0xf1, 0x11, 0xc0, + 0x8e, 0xd3, 0x9b, 0x93, 0x17, 0xce, 0x18, 0x28, 0xf9, 0xf4, 0x0d, 0xd9, 0x9c, 0x41, 0xf6, 0xf9, + 0x47, 0xc7, 0xe6, 0xc6, 0x20, 0xeb, 0xda, 0x9d, 0x38, 0xd4, 0xaa, 0x03, 0xa0, 0x1a, 0x4e, 0x66, + 0x60, 0x16, 0x30, 0x5c, 0xb5, 0x7a, 0x3e, 0x83, 0x79, 0x16, 0x2a, 0xb0, 0xa4, 0x23, 0x88, 0x62, + 0x13, 0xf3, 0x8e, 0x60, 0xbd, 0xbc, 0x23, 0x18, 0x02, 0xe2, 0xaf, 0xf3, 0xa5, 0x4e, 0x14, 0xe4, + 0x93, 0x8e, 0x8c, 0x98, 0x31, 0x10, 0xc1, 0xc9, 0xb5, 0x9b, 0x1d, 0x45, 0x91, 0x81, 0x92, 0x5f, + 0xd0, 0xe2, 0x80, 0x70, 0x26, 0x65, 0x2d, 0x32, 0x50, 0xf2, 0xe9, 0x1e, 0xb9, 0x35, 0x60, 0x26, + 0x1f, 0x4f, 0x5c, 0xbc, 0x98, 0xa2, 0x74, 0x03, 0xe5, 0x9b, 0x71, 0xa8, 0xcd, 0x93, 0x30, 0x0f, + 0x95, 0x93, 0x88, 0x33, 0x34, 0xab, 0x93, 0x88, 0x63, 0xcc, 0x43, 0x74, 0x97, 0xac, 0x95, 0xcf, + 0xd1, 0xc2, 0x14, 0x1b, 0x71, 0xa8, 0x95, 0x29, 0x28, 0x03, 0x89, 0x1c, 0xc7, 0xbb, 0x1f, 0x4c, + 0x6c, 0xcb, 0x34, 0x12, 0xf9, 0x52, 0x2e, 0x2f, 0x51, 0x50, 0x06, 0xf4, 0xfe, 0xc5, 0x95, 0x2a, + 0x5d, 0x5e, 0xa9, 0xd2, 0xcd, 0x95, 0x2a, 0x7f, 0x8e, 0x54, 0xf9, 0x7b, 0xa4, 0xca, 0xe7, 0x91, + 0x2a, 0x5f, 0x44, 0xaa, 0xfc, 0x3b, 0x52, 0xe5, 0x3f, 0x91, 0x2a, 0xdd, 0x44, 0xaa, 0xfc, 0xed, + 0x5a, 0x95, 0x2e, 0xae, 0x55, 0xe9, 0xf2, 0x5a, 0x95, 0x3e, 0x3c, 0x98, 0x7d, 0x05, 0x5d, 0xe3, + 0xd8, 0x70, 0x8c, 0x9e, 0xcd, 0x4f, 0xac, 0x5e, 0xd5, 0x33, 0xda, 0x6f, 0xe0, 0x5b, 0xf8, 0xe4, + 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4e, 0x99, 0x9a, 0xa3, 0x65, 0x07, 0x00, 0x00, } func (this *Result) Equal(that interface{}) bool { @@ -588,6 +598,9 @@ func (this *Summary) Equal(that interface{}) bool { if this.QueueTime != that1.QueueTime { return false } + if this.Subqueries != that1.Subqueries { + return false + } return true } func (this *Querier) Equal(that interface{}) bool { @@ -738,7 +751,7 @@ func (this *Summary) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 10) + s := make([]string, 0, 11) s = append(s, "&stats.Summary{") s = append(s, "BytesProcessedPerSecond: "+fmt.Sprintf("%#v", this.BytesProcessedPerSecond)+",\n") s = append(s, "LinesProcessedPerSecond: "+fmt.Sprintf("%#v", this.LinesProcessedPerSecond)+",\n") @@ -746,6 +759,7 @@ func (this *Summary) GoString() string { s = append(s, "TotalLinesProcessed: "+fmt.Sprintf("%#v", this.TotalLinesProcessed)+",\n") s = append(s, "ExecTime: "+fmt.Sprintf("%#v", this.ExecTime)+",\n") s = append(s, "QueueTime: "+fmt.Sprintf("%#v", this.QueueTime)+",\n") + s = append(s, "Subqueries: "+fmt.Sprintf("%#v", this.Subqueries)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -882,6 +896,11 @@ func (m *Summary) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Subqueries != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.Subqueries)) + i-- + dAtA[i] = 0x38 + } if m.QueueTime != 0 { i -= 8 encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.QueueTime)))) @@ -1154,6 +1173,9 @@ func (m *Summary) Size() (n int) { if m.QueueTime != 0 { n += 9 } + if m.Subqueries != 0 { + n += 1 + sovStats(uint64(m.Subqueries)) + } return n } @@ -1267,6 +1289,7 @@ func (this *Summary) String() string { `TotalLinesProcessed:` + fmt.Sprintf("%v", this.TotalLinesProcessed) + `,`, `ExecTime:` + fmt.Sprintf("%v", this.ExecTime) + `,`, `QueueTime:` + fmt.Sprintf("%v", this.QueueTime) + `,`, + `Subqueries:` + fmt.Sprintf("%v", this.Subqueries) + `,`, `}`, }, "") return s @@ -1610,6 +1633,25 @@ func (m *Summary) Unmarshal(dAtA []byte) error { v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) iNdEx += 8 m.QueueTime = float64(math.Float64frombits(v)) + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Subqueries", wireType) + } + m.Subqueries = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Subqueries |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/logqlmodel/stats/stats.proto b/pkg/logqlmodel/stats/stats.proto index b829662def234..fdf3f14d2d0fe 100644 --- a/pkg/logqlmodel/stats/stats.proto +++ b/pkg/logqlmodel/stats/stats.proto @@ -34,6 +34,8 @@ message Summary { // In addition to internal calculations this is also returned by the HTTP API. // Grafana expects time values to be returned in seconds as float. double queueTime = 6 [(gogoproto.jsontag) = "queueTime"]; + // Total of subqueries created to fulfill this query. + int64 subqueries = 7 [(gogoproto.jsontag) = "subqueries"]; } message Querier { From 9ed264b125508f2c86272c7ced16e14476298a33 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 15 Feb 2022 14:17:17 +0100 Subject: [PATCH 2/2] Fixes tests Signed-off-by: Cyril Tovena --- pkg/logql/metrics.go | 1 + pkg/logql/metrics_test.go | 2 +- pkg/logqlmodel/stats/context.go | 12 ++--- pkg/querier/queryrange/codec_test.go | 41 +++++++++-------- pkg/querier/queryrange/prometheus_test.go | 1 + .../queryrange/split_by_interval_test.go | 45 +++++++++++-------- pkg/util/marshal/legacy/marshal_test.go | 1 + pkg/util/marshal/marshal_test.go | 3 ++ 8 files changed, 62 insertions(+), 44 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 56b82b5091a89..f135252e0bb98 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), "queue_time", logql_stats.ConvertSecondsToNanoseconds(stats.Summary.QueueTime), + "subqueries", stats.Summary.Subqueries, }...) logValues = append(logValues, tagsToKeyValues(queryTags)...) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index ecf7d4a7437d7..9be0522494579 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -83,7 +83,7 @@ func TestLogSlowQuery(t *testing.T) { }, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) require.Equal(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns source=logvolhist feature=beta\n", + "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns subqueries=0 source=logvolhist feature=beta\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index d82ca90ff048b..6a842153622e4 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -132,12 +132,10 @@ func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration) r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines r.Summary.ExecTime = execTime.Seconds() if execTime != 0 { - r.Summary.BytesProcessedPerSecond = - int64(float64(r.Summary.TotalBytesProcessed) / - execTime.Seconds()) - r.Summary.LinesProcessedPerSecond = - int64(float64(r.Summary.TotalLinesProcessed) / - execTime.Seconds()) + r.Summary.BytesProcessedPerSecond = int64(float64(r.Summary.TotalBytesProcessed) / + execTime.Seconds()) + r.Summary.LinesProcessedPerSecond = int64(float64(r.Summary.TotalLinesProcessed) / + execTime.Seconds()) } if queueTime != 0 { r.Summary.QueueTime = queueTime.Seconds() @@ -168,6 +166,8 @@ func (i *Ingester) Merge(m Ingester) { i.TotalReached += m.TotalReached } +// Merge merges two results of statistics. +// This will increase the total number of Subqueries. func (r *Result) Merge(m Result) { r.Summary.Subqueries++ r.Querier.Merge(m.Querier) diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index aa5075be451bc..d55bcaaf3e11c 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -414,6 +414,7 @@ func Test_codec_MergeResponse(t *testing.T) { }, }, &LokiPromResponse{ + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 1}}, Response: &queryrangebase.PrometheusResponse{ Status: loghttp.QueryStatusSuccess, Data: queryrangebase.PrometheusData{ @@ -480,10 +481,11 @@ func Test_codec_MergeResponse(t *testing.T) { }, }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.BACKWARD, - Limit: 100, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.BACKWARD, + Limit: 100, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -567,10 +569,11 @@ func Test_codec_MergeResponse(t *testing.T) { }, }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.BACKWARD, - Limit: 6, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.BACKWARD, + Limit: 6, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -651,17 +654,17 @@ func Test_codec_MergeResponse(t *testing.T) { }, }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.FORWARD, - Limit: 100, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.FORWARD, + Limit: 100, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ { Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 5), Line: "5"}, {Timestamp: time.Unix(0, 6), Line: "6"}, {Timestamp: time.Unix(0, 15), Line: "15"}, @@ -739,17 +742,17 @@ func Test_codec_MergeResponse(t *testing.T) { }, }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.FORWARD, - Limit: 5, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.FORWARD, + Limit: 5, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ { Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 5), Line: "5"}, {Timestamp: time.Unix(0, 6), Line: "6"}, }, @@ -905,6 +908,7 @@ var ( "execTime": 22, "linesProcessedPerSecond": 23, "queueTime": 21, + "subqueries": 1, "totalBytesProcessed": 24, "totalLinesProcessed": 25 } @@ -1056,6 +1060,7 @@ var ( ExecTime: 22, LinesProcessedPerSecond: 23, TotalBytesProcessed: 24, + Subqueries: 1, TotalLinesProcessed: 25, }, Querier: stats.Querier{ diff --git a/pkg/querier/queryrange/prometheus_test.go b/pkg/querier/queryrange/prometheus_test.go index 261b0420796bd..809353d130dc9 100644 --- a/pkg/querier/queryrange/prometheus_test.go +++ b/pkg/querier/queryrange/prometheus_test.go @@ -52,6 +52,7 @@ var emptyStats = `"stats": { "execTime": 0, "linesProcessedPerSecond": 0, "queueTime": 0, + "subqueries": 0, "totalBytesProcessed":0, "totalLinesProcessed":0 } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 5264f04e630c1..a5da3f74b018a 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/loghttp" @@ -572,7 +573,6 @@ func Test_splitByInterval_Do(t *testing.T) { { Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano())}, }, }, @@ -606,10 +606,11 @@ func Test_splitByInterval_Do(t *testing.T) { Path: "/api/prom/query_range", }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.BACKWARD, - Limit: 1000, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.BACKWARD, + Limit: 1000, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 4}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -638,10 +639,11 @@ func Test_splitByInterval_Do(t *testing.T) { Path: "/api/prom/query_range", }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.FORWARD, - Limit: 1000, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.FORWARD, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 4}}, + Limit: 1000, + Version: 1, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -670,10 +672,11 @@ func Test_splitByInterval_Do(t *testing.T) { Path: "/api/prom/query_range", }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.FORWARD, - Limit: 2, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.FORWARD, + Limit: 2, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -700,10 +703,11 @@ func Test_splitByInterval_Do(t *testing.T) { Path: "/api/prom/query_range", }, &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: logproto.BACKWARD, - Limit: 2, - Version: 1, + Status: loghttp.QueryStatusSuccess, + Direction: logproto.BACKWARD, + Limit: 2, + Version: 1, + Statistics: stats.Result{Summary: stats.Summary{Subqueries: 2}}, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -818,7 +822,6 @@ func Test_ExitEarly(t *testing.T) { { Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - { Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), @@ -853,6 +856,11 @@ func Test_ExitEarly(t *testing.T) { Direction: logproto.FORWARD, Limit: 2, Version: 1, + Statistics: stats.Result{ + Summary: stats.Summary{ + Subqueries: 2, + }, + }, Data: LokiData{ ResultType: loghttp.ResultTypeStream, Result: []logproto.Stream{ @@ -895,7 +903,6 @@ func Test_DoesntDeadlock(t *testing.T) { { Labels: `{foo="bar", level="debug"}`, Entries: []logproto.Entry{ - { Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), diff --git a/pkg/util/marshal/legacy/marshal_test.go b/pkg/util/marshal/legacy/marshal_test.go index cb4eac7696d4c..19e9cda4c137a 100644 --- a/pkg/util/marshal/legacy/marshal_test.go +++ b/pkg/util/marshal/legacy/marshal_test.go @@ -83,6 +83,7 @@ var queryTests = []struct { "execTime": 0, "linesProcessedPerSecond": 0, "queueTime": 0, + "subqueries": 0, "totalBytesProcessed":0, "totalLinesProcessed":0 } diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index cc7dedf77b1f6..248f8e32428a8 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -89,6 +89,7 @@ var queryTests = []struct { "execTime": 0, "linesProcessedPerSecond": 0, "queueTime": 0, + "subqueries": 0, "totalBytesProcessed":0, "totalLinesProcessed":0 } @@ -197,6 +198,7 @@ var queryTests = []struct { "execTime": 0, "linesProcessedPerSecond": 0, "queueTime": 0, + "subqueries": 0, "totalBytesProcessed":0, "totalLinesProcessed":0 } @@ -322,6 +324,7 @@ var queryTests = []struct { "execTime": 0, "linesProcessedPerSecond": 0, "queueTime": 0, + "subqueries": 0, "totalBytesProcessed":0, "totalLinesProcessed":0 }