From 6d5cb3be794fc441dfc6565aef1dac41f9ec8863 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 20 Nov 2024 11:55:50 -0500 Subject: [PATCH 1/2] Metrics generator read from kafka first pass --- cmd/tempo/app/modules.go | 13 +- modules/generator/config.go | 17 ++ modules/generator/generator.go | 60 ++++- modules/generator/generator_kafka.go | 231 ++++++++++++++++++ modules/generator/generator_test.go | 2 +- modules/generator/instance.go | 14 ++ .../processor/localblocks/processor.go | 2 +- 7 files changed, 330 insertions(+), 9 deletions(-) create mode 100644 modules/generator/generator_kafka.go diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index a68f1337690..16cb85d62c0 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -308,7 +308,16 @@ func (t *App) initGenerator() (services.Service, error) { } t.cfg.Generator.Ring.ListenPort = t.cfg.Server.GRPCListenPort - genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.store, log.Logger) + + t.cfg.Generator.Ingest = t.cfg.Ingest + t.cfg.Generator.Ingest.Kafka.ConsumerGroup = generator.ConsumerGroup + + if t.cfg.Target == SingleBinary && len(t.cfg.Generator.AssignedPartitions) == 0 { + // In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups. + t.cfg.Generator.AssignedPartitions = append(t.cfg.Generator.AssignedPartitions, 0) + } + + genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.partitionRing, t.store, log.Logger) if errors.Is(err, generator.ErrUnconfigured) && t.cfg.Target != MetricsGenerator { // just warn if we're not running the metrics-generator level.Warn(log.Logger).Log("msg", "metrics-generator is not configured.", "err", err) return services.NewIdleService(nil, nil), nil @@ -660,7 +669,7 @@ func (t *App) setupModuleManager() error { QueryFrontend: {Common, Store, OverridesAPI}, Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing}, Ingester: {Common, Store, MemberlistKV, PartitionRing}, - MetricsGenerator: {Common, OptionalStore, MemberlistKV, BlockBuilder}, + MetricsGenerator: {Common, OptionalStore, MemberlistKV, BlockBuilder, PartitionRing}, Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing}, Compactor: {Common, Store, MemberlistKV}, BlockBuilder: {Common, Store, MemberlistKV, PartitionRing}, diff --git a/modules/generator/config.go b/modules/generator/config.go index 03721391b63..1b634983795 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/tempo/modules/generator/processor/spanmetrics" "github.com/grafana/tempo/modules/generator/registry" "github.com/grafana/tempo/modules/generator/storage" + "github.com/grafana/tempo/pkg/ingest" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" ) @@ -20,6 +21,8 @@ const ( // ringNameForServer is the name of the ring used by the metrics-generator server. ringNameForServer = "metrics-generator" + + ConsumerGroup = "metrics-generator" ) // Config for a generator. @@ -34,6 +37,10 @@ type Config struct { MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"` QueryTimeout time.Duration `yaml:"query_timeout"` OverrideRingKey string `yaml:"override_ring_key"` + + // This config is dynamically injected because defined outside the ingester config. + Ingest ingest.Config `yaml:"-"` + AssignedPartitions []int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."` } // RegisterFlagsAndApplyDefaults registers the flags. @@ -51,6 +58,16 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = generatorRingKey } +func (cfg *Config) Validate() error { + if cfg.Ingest.Enabled { + if err := cfg.Ingest.Kafka.Validate(); err != nil { + return err + } + } + + return nil +} + type ProcessorConfig struct { ServiceGraphs servicegraphs.Config `yaml:"service_graphs"` SpanMetrics spanmetrics.Config `yaml:"span_metrics"` diff --git a/modules/generator/generator.go b/modules/generator/generator.go index 2b0078c03f8..5ae4c9c3ac5 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -11,17 +11,20 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.uber.org/atomic" "github.com/grafana/tempo/modules/generator/storage" objStorage "github.com/grafana/tempo/modules/storage" + "github.com/grafana/tempo/pkg/ingest" "github.com/grafana/tempo/pkg/tempopb" tempodb_wal "github.com/grafana/tempo/tempodb/wal" ) @@ -65,14 +68,23 @@ type Generator struct { reg prometheus.Registerer logger log.Logger + + kafkaWG sync.WaitGroup + kafkaStop chan struct{} + kafkaClient *kgo.Client + partitionRing ring.PartitionRingReader } // New makes a new Generator. -func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, store objStorage.Store, logger log.Logger) (*Generator, error) { +func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, partitionRing ring.PartitionRingReader, store objStorage.Store, logger log.Logger) (*Generator, error) { if cfg.Storage.Path == "" { return nil, ErrUnconfigured } + if err := cfg.Validate(); err != nil { + return nil, err + } + err := os.MkdirAll(cfg.Storage.Path, 0o700) if err != nil { return nil, fmt.Errorf("failed to mkdir on %s: %w", cfg.Storage.Path, err) @@ -84,10 +96,10 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist instances: map[string]*instance{}, - store: store, - - reg: reg, - logger: logger, + store: store, + partitionRing: partitionRing, + reg: reg, + logger: logger, } // Lifecycler and ring @@ -147,10 +159,43 @@ func (g *Generator) starting(ctx context.Context) (err error) { return fmt.Errorf("unable to start metrics-generator dependencies: %w", err) } + if g.cfg.Ingest.Enabled { + g.kafkaClient, err = ingest.NewReaderClient( + g.cfg.Ingest.Kafka, + ingest.NewReaderClientMetrics("generator", nil), + g.logger, + ) + if err != nil { + return fmt.Errorf("failed to create kafka reader client: %w", err) + } + + boff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Minute, // If there is a network hiccup, we prefer to wait longer retrying, than fail the service. + MaxRetries: 10, + }) + + for boff.Ongoing() { + err := g.kafkaClient.Ping(ctx) + if err == nil { + break + } + level.Warn(g.logger).Log("msg", "ping kafka; will retry", "err", err) + boff.Wait() + } + if err := boff.ErrCause(); err != nil { + return fmt.Errorf("failed to ping kafka: %w", err) + } + } + return nil } func (g *Generator) running(ctx context.Context) error { + if g.cfg.Ingest.Enabled { + g.startKafka() + } + for { select { case <-ctx.Done(): @@ -175,6 +220,11 @@ func (g *Generator) stopping(_ error) error { // Mark as read-only after we have removed ourselves from the ring g.stopIncomingRequests() + // Stop reading from queue and wait for oustanding data to be processed and committed + if g.cfg.Ingest.Enabled { + g.stopKafka() + } + var wg sync.WaitGroup wg.Add(len(g.instances)) diff --git a/modules/generator/generator_kafka.go b/modules/generator/generator_kafka.go new file mode 100644 index 00000000000..a6f0774ddad --- /dev/null +++ b/modules/generator/generator_kafka.go @@ -0,0 +1,231 @@ +package generator + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/tempo/pkg/ingest" + "github.com/grafana/tempo/pkg/tempopb" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" +) + +func (g *Generator) startKafka() { + g.kafkaStop = make(chan struct{}) + g.kafkaWG.Add(1) + go g.listenKafka() +} + +func (g *Generator) stopKafka() { + close(g.kafkaStop) + g.kafkaWG.Wait() +} + +func (g *Generator) listenKafka() { + defer g.kafkaWG.Done() + + level.Info(g.logger).Log("msg", "generator now listening to kafka") + ctx := context.Background() + for { + select { + case <-time.After(2 * time.Second): + if g.readOnly.Load() { + // Starting up or shutting down + continue + } + err := g.readKafka(ctx) + if err != nil { + level.Error(g.logger).Log("msg", "readKafka failed", "err", err) + continue + } + case <-g.kafkaStop: + return + case <-ctx.Done(): + return + } + } +} + +func (g *Generator) readKafka(ctx context.Context) error { + fallback := time.Now().Add(-time.Minute) + + groupLag, err := getGroupLag( + ctx, + kadm.NewClient(g.kafkaClient), + g.cfg.Ingest.Kafka.Topic, + g.cfg.Ingest.Kafka.ConsumerGroup, + fallback, + ) + if err != nil { + return fmt.Errorf("failed to get group lag: %w", err) + } + + assignedPartitions := g.getAssignedActivePartitions() + + for _, partition := range assignedPartitions { + if ctx.Err() != nil { + return ctx.Err() + } + + partitionLag, ok := groupLag.Lookup(g.cfg.Ingest.Kafka.Topic, partition) + if !ok { + return fmt.Errorf("lag for partition %d not found", partition) + } + + if partitionLag.Lag <= 0 { + // Nothing to consume + continue + } + + err := g.consumePartition(ctx, partition, partitionLag) + if err != nil { + return err + } + } + return nil +} + +func (g *Generator) consumePartition(ctx context.Context, partition int32, lag kadm.GroupMemberLag) error { + d := ingest.NewDecoder() + + // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). + // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. + // In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes + // from one partition at a time. I.e. when this partition is consumed, we start consuming the next one. + g.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + g.cfg.Ingest.Kafka.Topic: { + partition: kgo.NewOffset().At(lag.Commit.At), + }, + }) + defer g.kafkaClient.RemoveConsumePartitions(map[string][]int32{g.cfg.Ingest.Kafka.Topic: {partition}}) + + fetches := g.kafkaClient.PollFetches(ctx) + fetches.EachError(func(_ string, _ int32, err error) { + if !errors.Is(err, context.Canceled) { + level.Error(g.logger).Log("msg", "failed to fetch records", "err", err) + } + }) + + for _, f := range fetches { + for _, t := range f.Topics { + for _, p := range t.Partitions { + if p.Partition != partition { + continue + } + + for _, r := range p.Records { + tenant := string(r.Key) + + i, err := g.getOrCreateInstance(tenant) + if err != nil { + return err + } + + d.Reset() + req, err := d.Decode(r.Value) + if err != nil { + return err + } + + for _, tr := range req.Traces { + trace := &tempopb.Trace{} + err = trace.Unmarshal(tr.Slice) + if err != nil { + return err + } + + i.pushSpansFromQueue(ctx, &tempopb.PushSpansRequest{ + Batches: trace.ResourceSpans, + }) + } + } + } + } + } + + offsets := kadm.OffsetsFromFetches(fetches) + adm := kadm.NewClient(g.kafkaClient) + err := adm.CommitAllOffsets(ctx, g.cfg.Ingest.Kafka.ConsumerGroup, offsets) + if err != nil { + return fmt.Errorf("generator failed to commit offsets: %w", err) + } + + return nil +} + +func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallback time.Time) (kadm.GroupLag, error) { + offsets, err := admClient.FetchOffsets(ctx, group) + if err != nil { + if !errors.Is(err, kerr.GroupIDNotFound) { + return nil, fmt.Errorf("fetch offsets: %w", err) + } + } + if err := offsets.Error(); err != nil { + return nil, fmt.Errorf("fetch offsets got error in response: %w", err) + } + + startOffsets, err := admClient.ListStartOffsets(ctx, topic) + if err != nil { + return nil, err + } + endOffsets, err := admClient.ListEndOffsets(ctx, topic) + if err != nil { + return nil, err + } + + resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) { + return admClient.ListOffsetsAfterMilli(ctx, fallback.UnixMilli(), topic) + }) + // If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at. + for topic, pt := range startOffsets.Offsets() { + for partition, startOffset := range pt { + if _, ok := offsets.Lookup(topic, partition); ok { + continue + } + fallbackOffsets, err := resolveFallbackOffsets() + if err != nil { + return nil, fmt.Errorf("resolve fallback offsets: %w", err) + } + o, ok := fallbackOffsets.Lookup(topic, partition) + if !ok { + return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic) + } + if o.Offset < startOffset.At { + // Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition). + // This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream. + continue + } + offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{ + Topic: o.Topic, + Partition: o.Partition, + At: o.Offset, + LeaderEpoch: o.LeaderEpoch, + }}) + } + } + + descrGroup := kadm.DescribedGroup{ + // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, + // because we don't use group consumption. + State: "Empty", + } + return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil +} + +func (g *Generator) getAssignedActivePartitions() []int32 { + activePartitionsCount := g.partitionRing.PartitionRing().ActivePartitionsCount() + assignedActivePartitions := make([]int32, 0, activePartitionsCount) + for _, partition := range g.cfg.AssignedPartitions { + if partition > int32(activePartitionsCount) { + break + } + assignedActivePartitions = append(assignedActivePartitions, partition) + } + return assignedActivePartitions +} diff --git a/modules/generator/generator_test.go b/modules/generator/generator_test.go index a3ad47c1b6c..cfc8da45dfb 100644 --- a/modules/generator/generator_test.go +++ b/modules/generator/generator_test.go @@ -63,7 +63,7 @@ overrides: generatorConfig.Storage.Path = t.TempDir() generatorConfig.Ring.KVStore.Store = "inmemory" generatorConfig.Processor.SpanMetrics.RegisterFlagsAndApplyDefaults("", nil) - g, err := New(generatorConfig, o, prometheus.NewRegistry(), nil, newTestLogger(t)) + g, err := New(generatorConfig, o, prometheus.NewRegistry(), nil, nil, newTestLogger(t)) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), g)) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index a3c8a0db6a3..0081908cfa6 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -358,6 +358,20 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) } } +func (i *instance) pushSpansFromQueue(ctx context.Context, req *tempopb.PushSpansRequest) { + i.preprocessSpans(req) + i.processorsMtx.RLock() + defer i.processorsMtx.RUnlock() + + for _, processor := range i.processors { + // Same as normal push except we skip the local blocks processor + if processor.Name() == localblocks.Name { + continue + } + processor.PushSpans(ctx, req) + } +} + func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { // TODO - uniqify all strings? // Doesn't help allocs, but should greatly reduce inuse space diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index ec3458eda43..cbb79c4afda 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -130,7 +130,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid } func (*Processor) Name() string { - return "LocalBlocksProcessor" + return Name } func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest) { From 76ea213e54f8fcf8f4479fd4b255c9a893718f23 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 21 Nov 2024 11:15:26 -0500 Subject: [PATCH 2/2] review feedback --- modules/generator/config.go | 2 +- modules/generator/generator.go | 4 ++ modules/generator/generator_kafka.go | 61 ++++++++++++---------------- 3 files changed, 31 insertions(+), 36 deletions(-) diff --git a/modules/generator/config.go b/modules/generator/config.go index 1b634983795..06f78b2c3eb 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -38,7 +38,7 @@ type Config struct { QueryTimeout time.Duration `yaml:"query_timeout"` OverrideRingKey string `yaml:"override_ring_key"` - // This config is dynamically injected because defined outside the ingester config. + // This config is dynamically injected because defined outside the generator config. Ingest ingest.Config `yaml:"-"` AssignedPartitions []int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."` } diff --git a/modules/generator/generator.go b/modules/generator/generator.go index 5ae4c9c3ac5..afd000623e9 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -72,6 +73,7 @@ type Generator struct { kafkaWG sync.WaitGroup kafkaStop chan struct{} kafkaClient *kgo.Client + kafkaAdm *kadm.Client partitionRing ring.PartitionRingReader } @@ -186,6 +188,8 @@ func (g *Generator) starting(ctx context.Context) (err error) { if err := boff.ErrCause(); err != nil { return fmt.Errorf("failed to ping kafka: %w", err) } + + g.kafkaAdm = kadm.NewClient(g.kafkaClient) } return nil diff --git a/modules/generator/generator_kafka.go b/modules/generator/generator_kafka.go index a6f0774ddad..d8503db9180 100644 --- a/modules/generator/generator_kafka.go +++ b/modules/generator/generator_kafka.go @@ -112,46 +112,37 @@ func (g *Generator) consumePartition(ctx context.Context, partition int32, lag k } }) - for _, f := range fetches { - for _, t := range f.Topics { - for _, p := range t.Partitions { - if p.Partition != partition { - continue - } - - for _, r := range p.Records { - tenant := string(r.Key) - - i, err := g.getOrCreateInstance(tenant) - if err != nil { - return err - } - - d.Reset() - req, err := d.Decode(r.Value) - if err != nil { - return err - } - - for _, tr := range req.Traces { - trace := &tempopb.Trace{} - err = trace.Unmarshal(tr.Slice) - if err != nil { - return err - } - - i.pushSpansFromQueue(ctx, &tempopb.PushSpansRequest{ - Batches: trace.ResourceSpans, - }) - } - } + for iter := fetches.RecordIter(); !iter.Done(); { + r := iter.Next() + + tenant := string(r.Key) + + i, err := g.getOrCreateInstance(tenant) + if err != nil { + return err + } + + d.Reset() + req, err := d.Decode(r.Value) + if err != nil { + return err + } + + for _, tr := range req.Traces { + trace := &tempopb.Trace{} + err = trace.Unmarshal(tr.Slice) + if err != nil { + return err } + + i.pushSpansFromQueue(ctx, &tempopb.PushSpansRequest{ + Batches: trace.ResourceSpans, + }) } } offsets := kadm.OffsetsFromFetches(fetches) - adm := kadm.NewClient(g.kafkaClient) - err := adm.CommitAllOffsets(ctx, g.cfg.Ingest.Kafka.ConsumerGroup, offsets) + err := g.kafkaAdm.CommitAllOffsets(ctx, g.cfg.Ingest.Kafka.ConsumerGroup, offsets) if err != nil { return fmt.Errorf("generator failed to commit offsets: %w", err) }