Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental support to write incoming data to a Kafka-compatible backend #6888

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/prometheus/procfs v0.12.0
github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb
github.com/twmb/franz-go v1.15.3
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
github.com/twmb/franz-go/plugin/kprom v1.1.0
github.com/xlab/treeprint v1.2.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0018
go.opentelemetry.io/otel v1.19.0
Expand Down Expand Up @@ -98,6 +102,7 @@ require (
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.1-vault-5 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
)

Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -942,6 +944,14 @@ github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb h1:3s/a99MWpt5Z
github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb/go.mod h1:RMvJQnpB4QQiYGg1gF8mnPJg6IkIPY28Buh8f6b+F0c=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/franz-go v1.15.3 h1:96nCgxz4DvGPSCumz6giquYy8GGDNsYCwWcloBdjJ4w=
github.com/twmb/franz-go v1.15.3/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1 h1:xbSGm02av1df+hkaY+2jGfkuj/XwGaDnUpLo0VvOrY0=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20231206062516-c09dc92d2db1/go.mod h1:n45fs28DdNx7PRAiYwBTwOORJGUMGqHzmFlr0pcW+BY=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/plugin/kprom v1.1.0 h1:grGeIJbm4llUBF8jkDjTb/b8rKllWSXjMwIqeCCcNYQ=
github.com/twmb/franz-go/plugin/kprom v1.1.0/go.mod h1:cTDrPMSkyrO99LyGx3AtiwF9W6+THHjZrkDE2+TEBIU=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
Expand Down
34 changes: 32 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -157,6 +158,9 @@ type Distributor struct {
// It can be nil, in which case a simple `go f()` will be used.
// See Config.ReusableIngesterPushWorkers on how to configure this.
ingesterDoBatchPushWorkers func(func())

// ingestStorageWriter is the writer used when ingest storage is enabled.
ingestStorageWriter *ingest.Writer
}

// Config contains the configuration required to
Expand Down Expand Up @@ -187,6 +191,9 @@ type Config struct {
MinimizeIngesterRequests bool `yaml:"-"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"-"`

// IngestStorageConfig is dynamically injected because defined outside of distributor config.
IngestStorageConfig ingest.Config `yaml:"-"`

// Limits for distributor
DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
Expand Down Expand Up @@ -462,6 +469,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
))
}

if cfg.IngestStorageConfig.Enabled {
d.ingestStorageWriter = ingest.NewWriter(d.cfg.IngestStorageConfig.KafkaConfig, log, reg)
subservices = append(subservices, d.ingestStorageWriter)
}

d.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1336,7 +1348,13 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
}
}

err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, userID, ingester, timeseries, metadata, req.Source)
} else {
err = d.sendToIngester(localCtx, ingester, timeseries, metadata, req.Source)
}

if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
Expand Down Expand Up @@ -1388,7 +1406,8 @@ func copyString(s string) string {
return string([]byte(s))
}

func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
// sendToIngester sends received data to a specific ingester. This function is used when ingest storage is disabled.
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
Expand All @@ -1406,6 +1425,17 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
return wrapIngesterPushError(err, ingester.Id)
}

// sendToStorage sends received data to the object storage, computing the partition based on the input ingester.
// This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, ingester ring.InstanceDesc, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
partitionID, err := ingest.IngesterPartition(ingester.Id)
if err != nil {
return err
}

return d.ingestStorageWriter.WriteSync(ctx, partitionID, userID, timeseries, metadata, source)
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (T, error)) ([]T, error) {
wrappedF := func(ctx context.Context, ing *ring.InstanceDesc) (T, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5561,10 +5561,10 @@ func TestSendMessageMetadata(t *testing.T) {
Source: mimirpb.API,
}

err = d.send(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req.Timeseries, nil, req.Source)
err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req.Timeseries, nil, req.Source)
require.NoError(t, err)

// Verify that d.send added message size to metadata.
// Verify that d.sendToIngester added message size to metadata.
require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize])
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
rulestorelocal "github.com/grafana/mimir/pkg/ruler/rulestore/local"
"github.com/grafana/mimir/pkg/scheduler"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway"
"github.com/grafana/mimir/pkg/usagestats"
Expand Down Expand Up @@ -119,6 +120,7 @@ type Config struct {
LimitsConfig validation.Limits `yaml:"limits"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
Expand Down Expand Up @@ -175,6 +177,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.LimitsConfig.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.Frontend.RegisterFlags(f, logger)
c.IngestStorage.RegisterFlags(f)
c.BlocksStorage.RegisterFlags(f)
c.Compactor.RegisterFlags(f, logger)
c.StoreGateway.RegisterFlags(f, logger)
Expand Down Expand Up @@ -235,6 +238,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Ruler.Validate(c.LimitsConfig); err != nil {
return errors.Wrap(err, "invalid ruler config")
}
if err := c.IngestStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid ingest storage config")
}
if err := c.BlocksStorage.Validate(c.Ingester.ActiveSeriesMetrics, log); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.StreamingChunksPerIngesterSeriesBufferSize = t.Cfg.Querier.StreamingChunksPerIngesterSeriesBufferSize
t.Cfg.Distributor.MinimizeIngesterRequests = t.Cfg.Querier.MinimizeIngesterRequests
t.Cfg.Distributor.MinimiseIngesterRequestsHedgingDelay = t.Cfg.Querier.MinimiseIngesterRequestsHedgingDelay
t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.ActiveGroupsCleanup, t.IngesterRing, canJoinDistributorsRing, t.Registerer, util_log.Logger)
if err != nil {
Expand Down
71 changes: 71 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"errors"
"flag"
"time"
)

var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
)

type Config struct {
Enabled bool `yaml:"enabled"`
KafkaConfig KafkaConfig `yaml:"kafka"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.")

cfg.KafkaConfig.RegisterFlagsWithPrefix("ingest-storage.kafka", f)
}

// Validate the config.
func (cfg *Config) Validate() error {
// Skip validation if disabled.
if !cfg.Enabled {
return nil
}

if err := cfg.KafkaConfig.Validate(); err != nil {
return err
}

return nil
}

// KafkaConfig holds the generic config for the Kafka backend.
type KafkaConfig struct {
Address string `yaml:"address"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
DialTimeout time.Duration `yaml:"dial_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+".address", "", "The Kafka backend address.")
f.StringVar(&cfg.Topic, prefix+".topic", "", "The Kafka topic name.")
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.")
f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")
}

func (cfg *KafkaConfig) Validate() error {
if cfg.Address == "" {
return ErrMissingKafkaAddress
}
if cfg.Topic == "" {
return ErrMissingKafkaTopic
}

return nil
}
52 changes: 52 additions & 0 deletions pkg/storage/ingest/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"testing"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
setup func(*Config)
expectedErr error
}{
"should pass with the default config": {
setup: func(_ *Config) {},
},
"should fail if ingest storage is enabled and Kafka address is not configured": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Topic = "test"
},
expectedErr: ErrMissingKafkaAddress,
},
"should fail if ingest storage is enabled and Kafka topic is not configured": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
},
expectedErr: ErrMissingKafkaTopic,
},
"should pass if ingest storage is enabled and required config is set": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
testData.setup(&cfg)

assert.Equal(t, testData.expectedErr, cfg.Validate())
})
}
}
40 changes: 40 additions & 0 deletions pkg/storage/ingest/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/twmb/franz-go/pkg/kgo"
)

type kafkaLogger struct {
logger log.Logger
}

func newKafkaLogger(logger log.Logger) *kafkaLogger {
return &kafkaLogger{
logger: log.With(logger, "component", "kafka_client"),
}
}

func (l *kafkaLogger) Level() kgo.LogLevel {
// The Kafka client calls Level() to check whether debug level is enabled or not.
// To keep it simple, we always return Info, so the Kafka client will never try
// to log expensive debug messages.
return kgo.LogLevelInfo
}

func (l *kafkaLogger) Log(lev kgo.LogLevel, msg string, keyvals ...any) {
keyvals = append([]any{"msg", msg}, keyvals...)
switch lev {
case kgo.LogLevelDebug:
level.Debug(l.logger).Log(keyvals...)
case kgo.LogLevelInfo:
level.Info(l.logger).Log(keyvals...)
case kgo.LogLevelWarn:
level.Warn(l.logger).Log(keyvals...)
case kgo.LogLevelError:
level.Error(l.logger).Log(keyvals...)
}
}
47 changes: 47 additions & 0 deletions pkg/storage/ingest/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingest

import (
"fmt"
"strconv"
"strings"

"github.com/grafana/regexp"
)

// Regular expression used to parse the ingester numeric ID.
var ingesterIDRegexp = regexp.MustCompile("-(zone-.-)?([0-9]+)$")

// IngesterPartition returns the partition ID to use to write to a specific ingester partition.
// The input ingester ID is expected to end either with "zone-X-Y" or only "-Y" where "X" is a letter in the range [a,d]
// and "Y" is a positive integer number. This means that this function supports up to 4 zones starting
// with letter "a" or no zones at all.
func IngesterPartition(ingesterID string) (int32, error) {
match := ingesterIDRegexp.FindStringSubmatch(ingesterID)
if len(match) == 0 {
return 0, fmt.Errorf("name doesn't match regular expression %s %q", ingesterID, ingesterIDRegexp.String())
}

// Convert the zone ID to a number starting from 0.
var zoneID int32
if wholeZoneStr := match[1]; len(wholeZoneStr) > 1 {
if !strings.HasPrefix(wholeZoneStr, "zone-") {
return 0, fmt.Errorf("invalid zone ID %s in %s", wholeZoneStr, ingesterID)
}

zoneID = rune(wholeZoneStr[len(wholeZoneStr)-2]) - 'a'
if zoneID < 0 || zoneID > 4 {
return 0, fmt.Errorf("zone ID is not between a and d %s", ingesterID)
}
}

// Parse the ingester sequence number.
ingesterSeq, err := strconv.Atoi(match[2])
if err != nil {
return 0, fmt.Errorf("no ingester sequence in name %s", ingesterID)
}

partitionID := int32(ingesterSeq<<2) | (zoneID & 0b11)
return partitionID, nil
}
Loading