Skip to content

Commit

Permalink
working on usage service
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Dec 19, 2024
1 parent 47ac9b1 commit 37662f5
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 107 deletions.
32 changes: 32 additions & 0 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,38 @@ func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
return *d.stream, nil
}

// DecodeUsage converts a Kafka record's byte data into a logproto.StreamUsage and labels.Labels.
// Similar to Decode, it parses and caches labels for efficiency.
func (d *Decoder) DecodeUsage(data []byte) (logproto.StreamUsage, labels.Labels, error) {
var usage logproto.StreamUsage
if err := usage.Unmarshal(data); err != nil {
return logproto.StreamUsage{}, nil, fmt.Errorf("failed to unmarshal stream usage: %w", err)
}

var ls labels.Labels
if cachedLabels, ok := d.cache.Get(usage.Labels); ok {
ls = cachedLabels
} else {
var err error
ls, err = syntax.ParseLabels(usage.Labels)
if err != nil {
return logproto.StreamUsage{}, nil, fmt.Errorf("failed to parse labels: %w", err)
}
d.cache.Add(usage.Labels, ls)
}

return usage, ls, nil
}

// DecodeUsageWithoutLabels converts a Kafka record's byte data into a logproto.StreamUsage without parsing labels.
func (d *Decoder) DecodeUsageWithoutLabels(data []byte) (logproto.StreamUsage, error) {
var usage logproto.StreamUsage
if err := usage.Unmarshal(data); err != nil {
return logproto.StreamUsage{}, fmt.Errorf("failed to unmarshal stream usage: %w", err)
}
return usage, nil
}

// sovPush calculates the size of varint-encoded uint64.
// It is used to determine the number of bytes needed to encode a uint64 value
// in Protocol Buffers' variable-length integer format.
Expand Down
36 changes: 34 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/fatih/color"
"github.com/felixge/fgprof"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcutil"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/tracing"
"github.com/grafana/loki/v3/pkg/usage"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/fakeauth"
Expand Down Expand Up @@ -398,6 +400,7 @@ type Loki struct {
Metrics *server.Metrics

UsageTracker push.UsageTracker
usageService *usage.Service
}

// New makes a new Loki.
Expand Down Expand Up @@ -701,6 +704,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockScheduler, t.initBlockScheduler)
mm.RegisterModule(Usage, t.initUsage)

mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
Expand Down Expand Up @@ -740,12 +744,13 @@ func (t *Loki) setupModuleManager() error {
MemberlistKV: {Server},
BlockBuilder: {PartitionRing, Store, Server},
BlockScheduler: {Server},
Usage: {Server},

Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, Usage},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, Usage},
}

if t.Cfg.Querier.PerRequestLimitsEnabled {
Expand Down Expand Up @@ -861,3 +866,30 @@ func (t *Loki) recursiveIsModuleActive(target, m string) bool {
}
return false
}

func (t *Loki) initUsage() (services.Service, error) {
if !t.Cfg.Distributor.KafkaEnabled {
return nil, nil
}

logger := log.With(util_log.Logger, "component", "usage")
service, err := usage.NewService(
t.Cfg.KafkaConfig,
"usage-consumer",
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}

t.usageService = service

// Register HTTP endpoint
t.Server.HTTP.Path("/usage").Methods("GET").Handler(service)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service)
}

return service, nil
}
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const (
PartitionRing string = "partition-ring"
BlockBuilder string = "block-builder"
BlockScheduler string = "block-scheduler"
Usage string = "usage"
)

const (
Expand Down Expand Up @@ -1855,7 +1856,6 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
logger,
prometheus.DefaultRegisterer,
)

if err != nil {
return nil, err
}
Expand Down
137 changes: 137 additions & 0 deletions pkg/usage/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package usage

import (
"html/template"
"net/http"
"sort"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
)

type streamRate struct {
TenantID string
Hash uint64
BytesPS string
}

type partitionView struct {
Partition int32
Offset int64
LastUpdate string
TopStreams []streamRate
}

var statsTemplate = template.Must(template.New("stats").Parse(`
<!DOCTYPE html>
<html>
<head>
<title>Usage Statistics</title>
<style>
body { font-family: sans-serif; margin: 20px; }
.partition { margin-bottom: 30px; }
.partition h2 { color: #333; }
.partition .meta { color: #666; margin-bottom: 10px; }
table { border-collapse: collapse; width: 100%; max-width: 800px; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #f5f5f5; }
tr:hover { background-color: #f9f9f9; }
</style>
</head>
<body>
<h1>Usage Statistics</h1>
{{range .Partitions}}
<div class="partition">
<h2>Partition {{.Partition}}</h2>
<div class="meta">
Offset: {{.Offset}} | Last Update: {{.LastUpdate}}
</div>
<table>
<thead>
<tr>
<th>Tenant ID</th>
<th>Stream Hash</th>
<th>Rate</th>
</tr>
</thead>
<tbody>
{{range .TopStreams}}
<tr>
<td>{{.TenantID}}</td>
<td>{{.Hash}}</td>
<td>{{.BytesPS}}/s</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{end}}
</body>
</html>
`))

func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.statsMtx.RLock()
defer s.statsMtx.RUnlock()

// Calculate rates for the last minute
window := time.Minute
since := time.Now().Add(-window)

var partitions []partitionView
for partition, pStats := range s.stats.stats {
view := partitionView{
Partition: partition,
Offset: pStats.offset,
LastUpdate: humanize.Time(pStats.timestamp),
}

// Collect all stream rates for this partition
var rates []streamRate
for tenantID, tStats := range pStats.tenants {
for hash, sStats := range tStats.streams {
bytes := sStats.totalBytesSince(since)
if bytes > 0 {
bytesPerSec := float64(bytes) / window.Seconds()
rates = append(rates, streamRate{
TenantID: tenantID,
Hash: hash,
BytesPS: humanize.Bytes(uint64(bytesPerSec)),
})
}
}
}

// Sort by bytes per second in descending order (need to parse the humanized strings)
sort.Slice(rates, func(i, j int) bool {
bytesI, _ := humanize.ParseBytes(rates[i].BytesPS)
bytesJ, _ := humanize.ParseBytes(rates[j].BytesPS)
return bytesI > bytesJ
})

// Take top 10 streams
if len(rates) > 10 {
rates = rates[:10]
}
view.TopStreams = rates
partitions = append(partitions, view)
}

// Sort partitions by partition number
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Partition < partitions[j].Partition
})

// Render template
err := statsTemplate.Execute(w, struct {
Partitions []partitionView
}{
Partitions: partitions,
})
if err != nil {
level.Error(s.logger).Log("msg", "error executing template", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}
Loading

0 comments on commit 37662f5

Please sign in to comment.