Skip to content

Commit

Permalink
Add tenant rates
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Dec 19, 2024
1 parent 13161da commit 7b6a1ab
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 44 deletions.
36 changes: 4 additions & 32 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/fatih/color"
"github.com/felixge/fgprof"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcutil"
Expand Down Expand Up @@ -704,7 +703,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockScheduler, t.initBlockScheduler)
mm.RegisterModule(Usage, t.initUsage)
mm.RegisterModule(IngestUsage, t.initUsage)

mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
Expand Down Expand Up @@ -744,13 +743,13 @@ func (t *Loki) setupModuleManager() error {
MemberlistKV: {Server},
BlockBuilder: {PartitionRing, Store, Server},
BlockScheduler: {Server},
Usage: {Server},
IngestUsage: {Server},

Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, Usage},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, IngestUsage},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, Usage},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, IngestUsage},
}

if t.Cfg.Querier.PerRequestLimitsEnabled {
Expand Down Expand Up @@ -866,30 +865,3 @@ func (t *Loki) recursiveIsModuleActive(target, m string) bool {
}
return false
}

func (t *Loki) initUsage() (services.Service, error) {
if !t.Cfg.Distributor.KafkaEnabled {
return nil, nil
}

logger := log.With(util_log.Logger, "component", "usage")
service, err := usage.NewService(
t.Cfg.KafkaConfig,
"usage-consumer",
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}

t.usageService = service

// Register HTTP endpoint
t.Server.HTTP.Path("/usage").Methods("GET").Handler(service)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service)
}

return service, nil
}
30 changes: 29 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
boltdbcompactor "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/boltdb/compactor"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/usage"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
Expand Down Expand Up @@ -142,7 +143,7 @@ const (
PartitionRing string = "partition-ring"
BlockBuilder string = "block-builder"
BlockScheduler string = "block-scheduler"
Usage string = "usage"
IngestUsage string = "ingest-usage"
)

const (
Expand Down Expand Up @@ -1897,6 +1898,33 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
return s, nil
}

func (t *Loki) initUsage() (services.Service, error) {
if !t.Cfg.Distributor.KafkaEnabled {
return nil, nil
}

logger := log.With(util_log.Logger, "component", "usage")
service, err := usage.NewService(
t.Cfg.KafkaConfig,
"usage-consumer",
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}

t.usageService = service

// Register HTTP endpoint
t.Server.HTTP.Path("/usage").Methods("GET").Handler(service)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service)
}

return service, nil
}

func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand Down
75 changes: 64 additions & 11 deletions pkg/usage/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ type streamRate struct {
BytesPS string
}

type tenantRate struct {
TenantID string
BytesPS string
}

type partitionView struct {
Partition int32
Offset int64
LastUpdate string
TopStreams []streamRate
Partition int32
Offset int64
LastUpdate string
LastUpdateTime string
TopStreams []streamRate
}

var statsTemplate = template.Must(template.New("stats").Parse(`
Expand All @@ -37,15 +43,37 @@ var statsTemplate = template.Must(template.New("stats").Parse(`
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #f5f5f5; }
tr:hover { background-color: #f9f9f9; }
.tenant-totals { margin-bottom: 30px; }
</style>
</head>
<body>
<h1>Usage Statistics</h1>
<div class="tenant-totals">
<h2>Tenant Total Rates</h2>
<table>
<thead>
<tr>
<th>Tenant ID</th>
<th>Total Rate</th>
</tr>
</thead>
<tbody>
{{range .TenantTotals}}
<tr>
<td>{{.TenantID}}</td>
<td>{{.BytesPS}}/s</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{range .Partitions}}
<div class="partition">
<h2>Partition {{.Partition}}</h2>
<div class="meta">
Offset: {{.Offset}} | Last Update: {{.LastUpdate}}
Offset: {{.Offset}} | Last Update: {{.LastUpdate}} ({{.LastUpdateTime}})
</div>
<table>
<thead>
Expand Down Expand Up @@ -79,31 +107,38 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
window := time.Minute
since := time.Now().Add(-window)

// Track tenant total rates
tenantTotals := make(map[string]float64)

var partitions []partitionView
for partition, pStats := range s.stats.stats {
view := partitionView{
Partition: partition,
Offset: pStats.offset,
LastUpdate: humanize.Time(pStats.timestamp),
Partition: partition,
Offset: pStats.offset,
LastUpdate: humanize.Time(pStats.timestamp),
LastUpdateTime: pStats.timestamp.Format("2006-01-02 15:04:05 MST"),
}

// Collect all stream rates for this partition
var rates []streamRate
for tenantID, tStats := range pStats.tenants {
tenantTotal := float64(0)
for hash, sStats := range tStats.streams {
bytes := sStats.totalBytesSince(since)
if bytes > 0 {
bytesPerSec := float64(bytes) / window.Seconds()
tenantTotal += bytesPerSec
rates = append(rates, streamRate{
TenantID: tenantID,
Hash: hash,
BytesPS: humanize.Bytes(uint64(bytesPerSec)),
})
}
}
tenantTotals[tenantID] += tenantTotal
}

// Sort by bytes per second in descending order (need to parse the humanized strings)
// Sort by bytes per second in descending order
sort.Slice(rates, func(i, j int) bool {
bytesI, _ := humanize.ParseBytes(rates[i].BytesPS)
bytesJ, _ := humanize.ParseBytes(rates[j].BytesPS)
Expand All @@ -118,16 +153,34 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
partitions = append(partitions, view)
}

// Convert tenant totals to sorted slice
var tenantRates []tenantRate
for tenantID, bytesPerSec := range tenantTotals {
tenantRates = append(tenantRates, tenantRate{
TenantID: tenantID,
BytesPS: humanize.Bytes(uint64(bytesPerSec)),
})
}

// Sort tenant rates by bytes per second in descending order
sort.Slice(tenantRates, func(i, j int) bool {
bytesI, _ := humanize.ParseBytes(tenantRates[i].BytesPS)
bytesJ, _ := humanize.ParseBytes(tenantRates[j].BytesPS)
return bytesI > bytesJ
})

// Sort partitions by partition number
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Partition < partitions[j].Partition
})

// Render template
err := statsTemplate.Execute(w, struct {
Partitions []partitionView
Partitions []partitionView
TenantTotals []tenantRate
}{
Partitions: partitions,
Partitions: partitions,
TenantTotals: tenantRates,
})
if err != nil {
level.Error(s.logger).Log("msg", "error executing template", "err", err)
Expand Down

0 comments on commit 7b6a1ab

Please sign in to comment.