From 7b6a1ab33934976a8b4ef4bc4809ef8158f4ae08 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 19 Dec 2024 16:47:51 +0100 Subject: [PATCH] Add tenant rates --- pkg/loki/loki.go | 36 +++------------------- pkg/loki/modules.go | 30 +++++++++++++++++- pkg/usage/http.go | 75 ++++++++++++++++++++++++++++++++++++++------- 3 files changed, 97 insertions(+), 44 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 784a70620841a..13d21bae6418b 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -15,7 +15,6 @@ import ( "github.com/fatih/color" "github.com/felixge/fgprof" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/grpcutil" @@ -704,7 +703,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) mm.RegisterModule(BlockBuilder, t.initBlockBuilder) mm.RegisterModule(BlockScheduler, t.initBlockScheduler) - mm.RegisterModule(Usage, t.initUsage) + mm.RegisterModule(IngestUsage, t.initUsage) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -744,13 +743,13 @@ func (t *Loki) setupModuleManager() error { MemberlistKV: {Server}, BlockBuilder: {PartitionRing, Store, Server}, BlockScheduler: {Server}, - Usage: {Server}, + IngestUsage: {Server}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, - Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, Usage}, + Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, IngestUsage}, - All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, Usage}, + All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, IngestUsage}, } if t.Cfg.Querier.PerRequestLimitsEnabled { @@ -866,30 +865,3 @@ func (t *Loki) recursiveIsModuleActive(target, m string) bool { } return false } - -func (t *Loki) initUsage() (services.Service, error) { - if !t.Cfg.Distributor.KafkaEnabled { - return nil, nil - } - - logger := log.With(util_log.Logger, "component", "usage") - service, err := usage.NewService( - t.Cfg.KafkaConfig, - "usage-consumer", - logger, - prometheus.DefaultRegisterer, - ) - if err != nil { - return nil, err - } - - t.usageService = service - - // Register HTTP endpoint - t.Server.HTTP.Path("/usage").Methods("GET").Handler(service) - if t.Cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service) - } - - return service, nil -} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 59617ec654602..6a34340e4db8b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -81,6 +81,7 @@ import ( boltdbcompactor "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/boltdb/compactor" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/usage" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/limiter" @@ -142,7 +143,7 @@ const ( PartitionRing string = "partition-ring" BlockBuilder string = "block-builder" BlockScheduler string = "block-scheduler" - Usage string = "usage" + IngestUsage string = "ingest-usage" ) const ( @@ -1897,6 +1898,33 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { return s, nil } +func (t *Loki) initUsage() (services.Service, error) { + if !t.Cfg.Distributor.KafkaEnabled { + return nil, nil + } + + logger := log.With(util_log.Logger, "component", "usage") + service, err := usage.NewService( + t.Cfg.KafkaConfig, + "usage-consumer", + logger, + prometheus.DefaultRegisterer, + ) + if err != nil { + return nil, err + } + + t.usageService = service + + // Register HTTP endpoint + t.Server.HTTP.Path("/usage").Methods("GET").Handler(service) + if t.Cfg.InternalServer.Enable { + t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service) + } + + return service, nil +} + func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil diff --git a/pkg/usage/http.go b/pkg/usage/http.go index cf5c62b1768be..3a1d3eedc59a4 100644 --- a/pkg/usage/http.go +++ b/pkg/usage/http.go @@ -16,11 +16,17 @@ type streamRate struct { BytesPS string } +type tenantRate struct { + TenantID string + BytesPS string +} + type partitionView struct { - Partition int32 - Offset int64 - LastUpdate string - TopStreams []streamRate + Partition int32 + Offset int64 + LastUpdate string + LastUpdateTime string + TopStreams []streamRate } var statsTemplate = template.Must(template.New("stats").Parse(` @@ -37,15 +43,37 @@ var statsTemplate = template.Must(template.New("stats").Parse(` th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; } th { background-color: #f5f5f5; } tr:hover { background-color: #f9f9f9; } + .tenant-totals { margin-bottom: 30px; }

Usage Statistics

+ +
+

Tenant Total Rates

+ + + + + + + + + {{range .TenantTotals}} + + + + + {{end}} + +
Tenant IDTotal Rate
{{.TenantID}}{{.BytesPS}}/s
+
+ {{range .Partitions}}

Partition {{.Partition}}

- Offset: {{.Offset}} | Last Update: {{.LastUpdate}} + Offset: {{.Offset}} | Last Update: {{.LastUpdate}} ({{.LastUpdateTime}})
@@ -79,21 +107,27 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { window := time.Minute since := time.Now().Add(-window) + // Track tenant total rates + tenantTotals := make(map[string]float64) + var partitions []partitionView for partition, pStats := range s.stats.stats { view := partitionView{ - Partition: partition, - Offset: pStats.offset, - LastUpdate: humanize.Time(pStats.timestamp), + Partition: partition, + Offset: pStats.offset, + LastUpdate: humanize.Time(pStats.timestamp), + LastUpdateTime: pStats.timestamp.Format("2006-01-02 15:04:05 MST"), } // Collect all stream rates for this partition var rates []streamRate for tenantID, tStats := range pStats.tenants { + tenantTotal := float64(0) for hash, sStats := range tStats.streams { bytes := sStats.totalBytesSince(since) if bytes > 0 { bytesPerSec := float64(bytes) / window.Seconds() + tenantTotal += bytesPerSec rates = append(rates, streamRate{ TenantID: tenantID, Hash: hash, @@ -101,9 +135,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } } + tenantTotals[tenantID] += tenantTotal } - // Sort by bytes per second in descending order (need to parse the humanized strings) + // Sort by bytes per second in descending order sort.Slice(rates, func(i, j int) bool { bytesI, _ := humanize.ParseBytes(rates[i].BytesPS) bytesJ, _ := humanize.ParseBytes(rates[j].BytesPS) @@ -118,6 +153,22 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { partitions = append(partitions, view) } + // Convert tenant totals to sorted slice + var tenantRates []tenantRate + for tenantID, bytesPerSec := range tenantTotals { + tenantRates = append(tenantRates, tenantRate{ + TenantID: tenantID, + BytesPS: humanize.Bytes(uint64(bytesPerSec)), + }) + } + + // Sort tenant rates by bytes per second in descending order + sort.Slice(tenantRates, func(i, j int) bool { + bytesI, _ := humanize.ParseBytes(tenantRates[i].BytesPS) + bytesJ, _ := humanize.ParseBytes(tenantRates[j].BytesPS) + return bytesI > bytesJ + }) + // Sort partitions by partition number sort.Slice(partitions, func(i, j int) bool { return partitions[i].Partition < partitions[j].Partition @@ -125,9 +176,11 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Render template err := statsTemplate.Execute(w, struct { - Partitions []partitionView + Partitions []partitionView + TenantTotals []tenantRate }{ - Partitions: partitions, + Partitions: partitions, + TenantTotals: tenantRates, }) if err != nil { level.Error(s.logger).Log("msg", "error executing template", "err", err)