Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Dec 18, 2024
1 parent caea502 commit 7f6ee82
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
30 changes: 27 additions & 3 deletions pkg/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package usage

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
Expand All @@ -11,24 +12,47 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

const (
topic = "ingest.usage"
windowSize = 1 * time.Minute
)

type service struct {
client *kgo.Client
}

func newService(kafkaCfg kafka.Config, consumerGroup string, logger log.Logger, registrar prometheus.Registerer) (*service, error) {
windowsSize := 1 * time.Minute
kprom := client.NewReaderClientMetrics("usage-consumer", registrar)
client, err := client.NewReaderClient(kafkaCfg, kprom, logger,
kgo.ConsumerGroup(consumerGroup),
kgo.ConsumeTopics("ingest.usage"),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-windowsSize).UnixMilli())),
// kgo.Balancers(balancers ...kgo.GroupBalancer)
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-windowSize).UnixMilli())),
kgo.DisableAutoCommit(),
kgo.OnPartitionsAssigned(func(ctx context.Context, c *kgo.Client, m map[string][]int32) {
}),
)
if err != nil {
return nil, err
}

return &service{client: client}, nil
}

func (s *service) fetch(ctx context.Context) error {
fetches := s.client.PollRecords(ctx, -1)
for _, fetch := range fetches {
for _, topicFetch := range fetch.Topics {
for _, partitionFetch := range topicFetch.Partitions {
for _, record := range partitionFetch.Records {
fmt.Println(record.Key, record.Value, partitionFetch.Partition, partitionFetch)
}
}
}
}
return nil
}

func (s *service) Consume(ctx context.Context, records []*kgo.Record) error {
return nil
}
4 changes: 3 additions & 1 deletion tools/dev/kafka/loki-local-config.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ querier:

kafka_config:
topic: "loki.push"
address: "localhost:9092"

distributor:
kafka_writes_enabled: true
ingester_writes_enabled: false

frontend:
encoding: protobuf

0 comments on commit 7f6ee82

Please sign in to comment.