diff --git a/pkg/kafka/encoding.go b/pkg/kafka/encoding.go
index 4a4ae518873e2..f9c4ecc56a929 100644
--- a/pkg/kafka/encoding.go
+++ b/pkg/kafka/encoding.go
@@ -207,6 +207,38 @@ func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
return *d.stream, nil
}
+// DecodeUsage converts a Kafka record's byte data into a logproto.StreamUsage and labels.Labels.
+// Similar to Decode, it parses and caches labels for efficiency.
+func (d *Decoder) DecodeUsage(data []byte) (logproto.StreamUsage, labels.Labels, error) {
+ var usage logproto.StreamUsage
+ if err := usage.Unmarshal(data); err != nil {
+ return logproto.StreamUsage{}, nil, fmt.Errorf("failed to unmarshal stream usage: %w", err)
+ }
+
+ var ls labels.Labels
+ if cachedLabels, ok := d.cache.Get(usage.Labels); ok {
+ ls = cachedLabels
+ } else {
+ var err error
+ ls, err = syntax.ParseLabels(usage.Labels)
+ if err != nil {
+ return logproto.StreamUsage{}, nil, fmt.Errorf("failed to parse labels: %w", err)
+ }
+ d.cache.Add(usage.Labels, ls)
+ }
+
+ return usage, ls, nil
+}
+
+// DecodeUsageWithoutLabels converts a Kafka record's byte data into a logproto.StreamUsage without parsing labels.
+func (d *Decoder) DecodeUsageWithoutLabels(data []byte) (logproto.StreamUsage, error) {
+ var usage logproto.StreamUsage
+ if err := usage.Unmarshal(data); err != nil {
+ return logproto.StreamUsage{}, fmt.Errorf("failed to unmarshal stream usage: %w", err)
+ }
+ return usage, nil
+}
+
// sovPush calculates the size of varint-encoded uint64.
// It is used to determine the number of bytes needed to encode a uint64 value
// in Protocol Buffers' variable-length integer format.
diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go
index 14963c09940c4..784a70620841a 100644
--- a/pkg/loki/loki.go
+++ b/pkg/loki/loki.go
@@ -15,6 +15,7 @@ import (
"github.com/fatih/color"
"github.com/felixge/fgprof"
+ "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcutil"
@@ -63,6 +64,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/tracing"
+ "github.com/grafana/loki/v3/pkg/usage"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/fakeauth"
@@ -398,6 +400,7 @@ type Loki struct {
Metrics *server.Metrics
UsageTracker push.UsageTracker
+ usageService *usage.Service
}
// New makes a new Loki.
@@ -701,6 +704,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockScheduler, t.initBlockScheduler)
+ mm.RegisterModule(Usage, t.initUsage)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
@@ -740,12 +744,13 @@ func (t *Loki) setupModuleManager() error {
MemberlistKV: {Server},
BlockBuilder: {PartitionRing, Store, Server},
BlockScheduler: {Server},
+ Usage: {Server},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},
- Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},
+ Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway, Usage},
- All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor},
+ All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, Usage},
}
if t.Cfg.Querier.PerRequestLimitsEnabled {
@@ -861,3 +866,30 @@ func (t *Loki) recursiveIsModuleActive(target, m string) bool {
}
return false
}
+
+func (t *Loki) initUsage() (services.Service, error) {
+ if !t.Cfg.Distributor.KafkaEnabled {
+ return nil, nil
+ }
+
+ logger := log.With(util_log.Logger, "component", "usage")
+ service, err := usage.NewService(
+ t.Cfg.KafkaConfig,
+ "usage-consumer",
+ logger,
+ prometheus.DefaultRegisterer,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ t.usageService = service
+
+ // Register HTTP endpoint
+ t.Server.HTTP.Path("/usage").Methods("GET").Handler(service)
+ if t.Cfg.InternalServer.Enable {
+ t.InternalServer.HTTP.Path("/usage").Methods("GET").Handler(service)
+ }
+
+ return service, nil
+}
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index 49a26498b8e31..a77abdf5c8677 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -142,6 +142,7 @@ const (
PartitionRing string = "partition-ring"
BlockBuilder string = "block-builder"
BlockScheduler string = "block-scheduler"
+ Usage string = "usage"
)
const (
@@ -1855,7 +1856,6 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
logger,
prometheus.DefaultRegisterer,
)
-
if err != nil {
return nil, err
}
diff --git a/pkg/usage/http.go b/pkg/usage/http.go
new file mode 100644
index 0000000000000..cf5c62b1768be
--- /dev/null
+++ b/pkg/usage/http.go
@@ -0,0 +1,137 @@
+package usage
+
+import (
+ "html/template"
+ "net/http"
+ "sort"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/go-kit/log/level"
+)
+
+type streamRate struct {
+ TenantID string
+ Hash uint64
+ BytesPS string
+}
+
+type partitionView struct {
+ Partition int32
+ Offset int64
+ LastUpdate string
+ TopStreams []streamRate
+}
+
+var statsTemplate = template.Must(template.New("stats").Parse(`
+
+
+
+ Usage Statistics
+
+
+
+ Usage Statistics
+ {{range .Partitions}}
+
+
Partition {{.Partition}}
+
+ Offset: {{.Offset}} | Last Update: {{.LastUpdate}}
+
+
+
+
+ Tenant ID |
+ Stream Hash |
+ Rate |
+
+
+
+ {{range .TopStreams}}
+
+ {{.TenantID}} |
+ {{.Hash}} |
+ {{.BytesPS}}/s |
+
+ {{end}}
+
+
+
+ {{end}}
+
+
+`))
+
+func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ s.statsMtx.RLock()
+ defer s.statsMtx.RUnlock()
+
+ // Calculate rates for the last minute
+ window := time.Minute
+ since := time.Now().Add(-window)
+
+ var partitions []partitionView
+ for partition, pStats := range s.stats.stats {
+ view := partitionView{
+ Partition: partition,
+ Offset: pStats.offset,
+ LastUpdate: humanize.Time(pStats.timestamp),
+ }
+
+ // Collect all stream rates for this partition
+ var rates []streamRate
+ for tenantID, tStats := range pStats.tenants {
+ for hash, sStats := range tStats.streams {
+ bytes := sStats.totalBytesSince(since)
+ if bytes > 0 {
+ bytesPerSec := float64(bytes) / window.Seconds()
+ rates = append(rates, streamRate{
+ TenantID: tenantID,
+ Hash: hash,
+ BytesPS: humanize.Bytes(uint64(bytesPerSec)),
+ })
+ }
+ }
+ }
+
+ // Sort by bytes per second in descending order (need to parse the humanized strings)
+ sort.Slice(rates, func(i, j int) bool {
+ bytesI, _ := humanize.ParseBytes(rates[i].BytesPS)
+ bytesJ, _ := humanize.ParseBytes(rates[j].BytesPS)
+ return bytesI > bytesJ
+ })
+
+ // Take top 10 streams
+ if len(rates) > 10 {
+ rates = rates[:10]
+ }
+ view.TopStreams = rates
+ partitions = append(partitions, view)
+ }
+
+ // Sort partitions by partition number
+ sort.Slice(partitions, func(i, j int) bool {
+ return partitions[i].Partition < partitions[j].Partition
+ })
+
+ // Render template
+ err := statsTemplate.Execute(w, struct {
+ Partitions []partitionView
+ }{
+ Partitions: partitions,
+ })
+ if err != nil {
+ level.Error(s.logger).Log("msg", "error executing template", "err", err)
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/pkg/usage/service.go b/pkg/usage/service.go
new file mode 100644
index 0000000000000..d6ef6f2219fde
--- /dev/null
+++ b/pkg/usage/service.go
@@ -0,0 +1,137 @@
+package usage
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/go-kit/log"
+ "github.com/go-kit/log/level"
+ "github.com/grafana/dskit/services"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/twmb/franz-go/pkg/kgo"
+
+ "github.com/grafana/loki/v3/pkg/kafka"
+ "github.com/grafana/loki/v3/pkg/kafka/client"
+)
+
+const (
+ topic = "ingest.usage"
+ windowSize = 1 * time.Minute
+)
+
+type Service struct {
+ client *kgo.Client
+ services.Service
+ decoder *kafka.Decoder
+ logger log.Logger
+
+ stats *usageStats
+ statsMtx sync.RWMutex
+}
+
+func NewService(kafkaCfg kafka.Config, consumerGroup string, logger log.Logger, registrar prometheus.Registerer) (*Service, error) {
+ kprom := client.NewReaderClientMetrics("usage-consumer", registrar)
+ client, err := client.NewReaderClient(kafkaCfg, kprom, logger,
+ kgo.ConsumerGroup(consumerGroup),
+ kgo.ConsumeTopics(topic),
+ kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-windowSize).UnixMilli())),
+ kgo.DisableAutoCommit(),
+ )
+ if err != nil {
+ return nil, err
+ }
+ decoder, err := kafka.NewDecoder()
+ if err != nil {
+ return nil, err
+ }
+ s := &Service{
+ client: client,
+ logger: logger,
+ decoder: decoder,
+ stats: newUsageStats(),
+ }
+ s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
+ return s, nil
+}
+
+func (s *Service) starting(_ context.Context) error {
+ return nil
+}
+
+func (s *Service) processRecord(partition int32, record *kgo.Record) error {
+ tenantID := string(record.Key)
+ stream, err := s.decoder.DecodeUsageWithoutLabels(record.Value)
+ if err != nil {
+ return fmt.Errorf("error decoding usage: %w", err)
+ }
+
+ s.statsMtx.Lock()
+ defer s.statsMtx.Unlock()
+
+ s.stats.addEntry(
+ partition,
+ tenantID,
+ stream.Hash,
+ stream.LineSize+stream.StructuredMetadataSize,
+ record.Timestamp,
+ record.Offset,
+ )
+
+ return nil
+}
+
+func (s *Service) evictOldData() {
+ s.statsMtx.Lock()
+ defer s.statsMtx.Unlock()
+
+ s.stats.evictBefore(time.Now().Add(-windowSize))
+}
+
+func (s *Service) runEviction(ctx context.Context) {
+ ticker := time.NewTicker(windowSize / 2)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ s.evictOldData()
+ }
+ }
+}
+
+func (s *Service) running(ctx context.Context) error {
+ go s.runEviction(ctx)
+
+ for {
+ fetches := s.client.PollRecords(ctx, -1)
+ for _, fetch := range fetches {
+ for _, topicFetch := range fetch.Topics {
+ for _, partitionFetch := range topicFetch.Partitions {
+ if partitionFetch.Err != nil {
+ level.Error(s.logger).Log("msg", "error polling records", "err", partitionFetch.Err)
+ return partitionFetch.Err
+ }
+ for _, record := range partitionFetch.Records {
+ if err := s.processRecord(partitionFetch.Partition, record); err != nil {
+ level.Error(s.logger).Log("msg", "error processing record", "err", err)
+ continue
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+func (s *Service) stopping(failureCase error) error {
+ if errors.Is(failureCase, context.Canceled) || errors.Is(failureCase, kgo.ErrClientClosed) {
+ return nil
+ }
+ s.client.Close()
+ return failureCase
+}
diff --git a/pkg/usage/stats.go b/pkg/usage/stats.go
new file mode 100644
index 0000000000000..9e40b6dbde7e8
--- /dev/null
+++ b/pkg/usage/stats.go
@@ -0,0 +1,117 @@
+package usage
+
+import "time"
+
+const (
+ initialEntriesCapacity = 1000
+)
+
+type usageEntry struct {
+ bytes uint64
+ timestamp time.Time
+}
+
+type streamStats struct {
+ entries []usageEntry
+}
+
+type tenantStats struct {
+ streams map[uint64]*streamStats
+}
+
+type partitionStats struct {
+ tenants map[string]*tenantStats
+ offset int64
+ timestamp time.Time
+}
+
+type usageStats struct {
+ stats map[int32]*partitionStats
+}
+
+func newUsageStats() *usageStats {
+ return &usageStats{
+ stats: make(map[int32]*partitionStats),
+ }
+}
+
+func (s *streamStats) totalBytesSince(since time.Time) uint64 {
+ var total uint64
+ for _, entry := range s.entries {
+ if entry.timestamp.After(since) {
+ total += entry.bytes
+ }
+ }
+ return total
+}
+
+func (u *usageStats) addEntry(partition int32, tenantID string, streamHash uint64, bytes uint64, timestamp time.Time, offset int64) {
+ // Get or create partition stats
+ pStats, ok := u.stats[partition]
+ if !ok {
+ pStats = &partitionStats{
+ tenants: make(map[string]*tenantStats),
+ }
+ u.stats[partition] = pStats
+ }
+
+ pStats.offset = offset
+ pStats.timestamp = timestamp
+
+ // Get or create tenant stats
+ tStats, ok := pStats.tenants[tenantID]
+ if !ok {
+ tStats = &tenantStats{
+ streams: make(map[uint64]*streamStats),
+ }
+ pStats.tenants[tenantID] = tStats
+ }
+
+ // Get or create stream stats
+ sStats, ok := tStats.streams[streamHash]
+ if !ok {
+ sStats = &streamStats{
+ entries: make([]usageEntry, 0, initialEntriesCapacity),
+ }
+ tStats.streams[streamHash] = sStats
+ }
+
+ // Simply append the new entry
+ sStats.entries = append(sStats.entries, usageEntry{
+ bytes: bytes,
+ timestamp: timestamp,
+ })
+}
+
+func (u *usageStats) evictBefore(cutoff time.Time) {
+ for partition, pStats := range u.stats {
+ for tenantID, tStats := range pStats.tenants {
+ for streamKey, sStats := range tStats.streams {
+ // Filter entries in place
+ n := 0
+ for _, entry := range sStats.entries {
+ if entry.timestamp.After(cutoff) {
+ sStats.entries[n] = entry
+ n++
+ }
+ }
+
+ if n == 0 {
+ // All entries are old, remove the stream
+ delete(tStats.streams, streamKey)
+ } else {
+ // Reslice to keep only newer entries
+ sStats.entries = sStats.entries[:n]
+ }
+ }
+ // Clean up empty tenant entries
+ if len(tStats.streams) == 0 {
+ delete(pStats.tenants, tenantID)
+ }
+ }
+ // Clean up empty partition entries
+ if len(pStats.tenants) == 0 {
+ delete(u.stats, partition)
+ }
+ }
+}
diff --git a/pkg/usage/usage.go b/pkg/usage/usage.go
deleted file mode 100644
index b234c8ee71a9e..0000000000000
--- a/pkg/usage/usage.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package usage
-
-import (
- "context"
- "errors"
- "fmt"
- "time"
-
- "github.com/go-kit/log"
- "github.com/go-kit/log/level"
- "github.com/grafana/dskit/services"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/twmb/franz-go/pkg/kgo"
-
- "github.com/grafana/loki/v3/pkg/kafka"
- "github.com/grafana/loki/v3/pkg/kafka/client"
-)
-
-const (
- topic = "ingest.usage"
- windowSize = 1 * time.Minute
-)
-
-type service struct {
- client *kgo.Client
- services.Service
-
- logger log.Logger
-}
-
-type partitionTenants struct {
- tenants map[string]struct{}
-}
-
-type partitionStreams struct {
- streams map[string]struct{}
-}
-
-func newService(kafkaCfg kafka.Config, consumerGroup string, logger log.Logger, registrar prometheus.Registerer) (*service, error) {
- kprom := client.NewReaderClientMetrics("usage-consumer", registrar)
- client, err := client.NewReaderClient(kafkaCfg, kprom, logger,
- kgo.ConsumerGroup(consumerGroup),
- // kgo.Balancers(balancers ...kgo.GroupBalancer)
- kgo.ConsumeTopics(topic),
- kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-windowSize).UnixMilli())),
- kgo.DisableAutoCommit(),
- // kgo.OnPartitionsAssigned(func(ctx context.Context, c *kgo.Client, m map[string][]int32) {
- // }),
- )
- if err != nil {
- return nil, err
- }
- s := &service{
- client: client,
- logger: logger,
- }
- s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
- return s, nil
-}
-
-func (s *service) starting(_ context.Context) error {
- return nil
-}
-
-func (s *service) running(ctx context.Context) error {
- for {
- fetches := s.client.PollRecords(ctx, -1)
- for _, fetch := range fetches {
- for _, topicFetch := range fetch.Topics {
- for _, partitionFetch := range topicFetch.Partitions {
- if partitionFetch.Err != nil {
- level.Error(s.logger).Log("msg", "error polling records", "err", partitionFetch.Err)
- return partitionFetch.Err
- }
- for _, record := range partitionFetch.Records {
- fmt.Println(record.Key, record.Value, partitionFetch.Partition, partitionFetch)
- }
- }
- }
- }
- }
-}
-
-func (s *service) stopping(failureCase error) error {
- if errors.Is(failureCase, context.Canceled) || errors.Is(failureCase, kgo.ErrClientClosed) {
- return nil
- }
- s.client.Close()
- return failureCase
-}
diff --git a/tools/dev/kafka/loki-local-config.debug.yaml b/tools/dev/kafka/loki-local-config.debug.yaml
index 29b32415cdf19..59394cd7b628e 100644
--- a/tools/dev/kafka/loki-local-config.debug.yaml
+++ b/tools/dev/kafka/loki-local-config.debug.yaml
@@ -3,7 +3,7 @@ auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
- log_level: debug
+ log_level: info
grpc_server_max_concurrent_streams: 1000
common:
@@ -15,16 +15,34 @@ common:
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
+ instance_id: local
kvstore:
store: inmemory
+kafka_config:
+ topic: "loki"
+
+querier:
+ query_partition_ingesters: true
+
+ingester:
+ kafka_ingestion:
+ enabled: true
+
+distributor:
+ kafka_writes_enabled: true
+ ingester_writes_enabled: false
+
query_range:
results_cache:
cache:
embedded_cache:
- enabled: true
+ enabled: false
max_size_mb: 100
+limits_config:
+ metric_aggregation_enabled: true
+
schema_config:
configs:
- from: 2020-10-24
@@ -35,19 +53,25 @@ schema_config:
prefix: index_
period: 24h
-ingester:
- kafka_ingestion:
- enabled: true
-
-querier:
- query_partition_ingesters: true
-
-kafka_config:
- topic: "loki.push"
+pattern_ingester:
+ enabled: true
+ metric_aggregation:
+ loki_address: localhost:3100
-distributor:
- kafka_writes_enabled: true
- ingester_writes_enabled: false
+ruler:
+ alertmanager_url: http://localhost:9093
frontend:
encoding: protobuf
+# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
+# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
+#
+# Statistics help us better understand how Loki is used, and they show us performance
+# levels for most users. This helps us prioritize features and documentation.
+# For more information on what's sent, look at
+# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
+# Refer to the buildReport method to see what goes into a report.
+#
+# If you would like to disable reporting, uncomment the following lines:
+#analytics:
+# reporting_enabled: false