Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Ring to IndexGateway #5358

Merged
merged 44 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
0ba13f0
Begin to add a Ring to IndexGateway
JordanRushing Feb 9, 2022
d928391
Implement missing methods for IndexGateway.
DylanGuedes Feb 10, 2022
325ada2
Fix failing linter
JordanRushing Feb 10, 2022
100e32c
Implement IndexGateway support for dynamic configs.
DylanGuedes Feb 23, 2022
c7797f4
Implement NewBasicService for the IndexGateway.
DylanGuedes Feb 23, 2022
b6d6629
Test IndexGateway dynamic configuration.
DylanGuedes Feb 26, 2022
99df166
Implement new IndexGatewayGRPCPool entity.
DylanGuedes Mar 2, 2022
6eb1595
Make IndexGateway attributes public.
DylanGuedes Mar 2, 2022
0417045
Implement IndexGatewayRing reader.
DylanGuedes Mar 2, 2022
55db3eb
Implement Ring mode in the IndexGatewayClient.
DylanGuedes Mar 7, 2022
d884301
Add new ring index gateway parameter to new Store calls.
DylanGuedes Mar 7, 2022
6cc5034
Use errors.Wrap instead of fmt.Errorf.
DylanGuedes Mar 7, 2022
82393f1
Extract tenantID from context instead of iterating on queries.
DylanGuedes Mar 8, 2022
a4f717c
Remove indexGateway ring param.
DylanGuedes Mar 14, 2022
88224a8
Split IndexGateway server from client implementation.
DylanGuedes Mar 15, 2022
6d68a6c
Fix imports order.
DylanGuedes Mar 15, 2022
947611c
Remove ring as parameter from IndexGateway-related funcs.
DylanGuedes Mar 17, 2022
1004469
Fix default flag value and IndexQuerier type.
DylanGuedes Mar 17, 2022
f0a6b31
Remove additional mode field and reuse it from cfg.
DylanGuedes Mar 17, 2022
cccb298
Remove redundant service init.
DylanGuedes Mar 17, 2022
e824c21
Add sanity check for IndexGateway client constructor.
DylanGuedes Mar 17, 2022
99d39f2
Move mode assigning to initStore method.
DylanGuedes Mar 22, 2022
bd910d9
Reorder IndexGateway constructor.
DylanGuedes Mar 22, 2022
7510b47
Rewrite indexClient chunk.IndexClient as querier Index.Querier.
DylanGuedes Mar 22, 2022
47c4fc2
Fix flag registration for IndexGateway server.
DylanGuedes Mar 22, 2022
3dc51d6
Fix flag registration for test.
DylanGuedes Mar 22, 2022
6323a94
Keep only one reference to indexQuerier.
DylanGuedes Mar 24, 2022
88829bf
Add guard-clause on IndexGatewayRing service.
DylanGuedes Mar 24, 2022
18559e9
Move IndexGatewayClientCfg to gateway_client file.
DylanGuedes Mar 25, 2022
d59f7ee
Merge pull request #13 from JordanRushing/ring-mode-index-gateway
JordanRushing Mar 30, 2022
4629273
Update CHANGELOG.md for `IndexGateway` support for `RingMode`
JordanRushing Mar 30, 2022
5c81c8e
Merge branch 'grafana:main' into add-ring-to-index-gateway
JordanRushing Mar 30, 2022
ebf12c0
Update GatewayClient to use dskit tenant package
JordanRushing Mar 30, 2022
ccbf492
Add listenport configuration for IndexGateway and Ring
JordanRushing Apr 7, 2022
0379649
Make IndexGateway replication factor configurable.
DylanGuedes Apr 8, 2022
a5dda13
Randomize replication set access.
DylanGuedes Apr 8, 2022
bda00af
Merge branch 'main' of github.com:grafana/loki into add-ring-to-index…
DylanGuedes Apr 8, 2022
03c5a7c
Merge branch 'main' of github.com:grafana/loki into add-ring-to-index…
DylanGuedes Apr 11, 2022
62dd62a
Remove unwanted merge HEAD tags.
DylanGuedes Apr 11, 2022
2d602ae
Move away from stores/chunk package.
DylanGuedes Apr 11, 2022
54d6a27
Pass util_log in factory.
DylanGuedes Apr 11, 2022
571b90e
Change index gateway client ring to ignore replicas.
DylanGuedes Apr 11, 2022
6fbd178
Refactor where the common replication factor is applied.
DylanGuedes Apr 11, 2022
b118248
Housekeeping config_wrapper IndexGateway configs.
DylanGuedes Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,84 @@ to include only the most relevant.
* [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option
* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides
* [5358](https://github.com/grafana/loki/pull/5358) **DylanGuedes**: Add `RingMode` support to `IndexGateway`
* [5435](https://github.com/grafana/loki/pull/5435) **slim-bean**: set match_max_concurrent true by default
* [5361](https://github.com/grafana/loki/pull/5361) **cyriltovena**: Add usage report into Loki.
* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: Refactor/remove global splitby
* [5229](https://github.com/grafana/loki/pull/5229) **chaudum**: Return early if push payload does not contain data
* [5217](https://github.com/grafana/loki/pull/5217) **sandeepsukhani**: step align start and end time of the original query while splitting it
* [5204](https://github.com/grafana/loki/pull/5204) **trevorwhitney**: Default max_outstanding_per_tenant to 2048
* [5181](https://github.com/grafana/loki/pull/5181) **sandeepsukhani**: align metric queries by step and other queries by split interval
* [5178](https://github.com/grafana/loki/pull/5178) **liguozhong**: Handle `context` cancellation in some of the `querier` store.index-cache-read.
* [5172](https://github.com/grafana/loki/pull/5172) **cyriltovena**: Avoid splitting large range vector aggregation.
* [5125](https://github.com/grafana/loki/pull/5125) **sasagarw**: Remove split-queries-by-interval validation
* [5091](https://github.com/grafana/loki/pull/5091) **owen-d**: better defaults for flush queue parallelism
* [5083](https://github.com/grafana/loki/pull/5083) **liguozhong**: [enhancement] querier cache: WriteBackCache should be off query path
* [5081](https://github.com/grafana/loki/pull/5081) **SasSwart**: Add the option to configure memory ballast for Loki
* [5077](https://github.com/grafana/loki/pull/5077) **trevorwhitney**: improve default config values
* [5067](https://github.com/grafana/loki/pull/5067) **cstyan**: Add an egress bytes total metric to the azure client.
* [5026](https://github.com/grafana/loki/pull/5026) **sandeepsukhani**: compactor changes for building per user index files in boltdb shipper
* [5023](https://github.com/grafana/loki/pull/5023) **ssncferreira**: Move querier.split-queries-by-interval to a per-tenant configuration
* [5022](https://github.com/grafana/loki/pull/5022) **owen-d**: adds instrumentation to azure object client
* [4942](https://github.com/grafana/loki/pull/4942) **cyriltovena**: Allow to disable http2 for GCS.
* [4891](https://github.com/grafana/loki/pull/4891) **liguozhong**: [optimization] cache prometheus : fix "loki_cache_request_duration_seconds_bucket" ‘status_code’ label always equals "200"
* [4737](https://github.com/grafana/loki/pull/4737) **owen-d**: ensures components with required SRV lookups use the correct port
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction with a config
* [4656](https://github.com/grafana/loki/pull/4656) **ssncferreira**: Fix dskit/ring metric with 'cortex_' prefix


#### Promtail

##### Enhancements
* [5359](https://github.com/grafana/loki/pull/5359) **JBSchami**: Lambda-promtail: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5290](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update promtail to support duration string formats
* [5051](https://github.com/grafana/loki/pull/5051) **liguozhong**: [new] promtail pipeline: Promtail Rate Limit stage #5048
* [5031](https://github.com/grafana/loki/pull/5031) **liguozhong**: [new] promtail: add readline rate limit
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Provide Docker target and discovery in Promtail.
* [4813](https://github.com/grafana/loki/pull/4813) **cyriltovena**: Promtail pull cloudflare logs
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Add GELF support for Promtail.
* [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail

##### Fixes
* [5497](https://github.com/grafana/loki/pull/5497) **MasslessParticle**: Fix orphaned metrics in the file tailer
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: promtail/targets/syslog: Enable best effort parsing for Syslog messages
* [5246](https://github.com/grafana/loki/pull/5246) **rsteneteg**: Promtail: skip glob search if filetarget path is an existing file and not a directory
* [5238](https://github.com/grafana/loki/pull/5238) **littlepangdi**: Promtail: fix TargetManager.run() not exit after stop is called
* [4874](https://github.com/grafana/loki/pull/4874) **Alan01252**: Promtail: Fix replace missing adjacent capture groups
* [4832](https://github.com/grafana/loki/pull/4832) **taisho6339**: Use http prefix path correctly in promtail
* [4716](https://github.com/grafana/loki/pull/4716) **cyriltovena**: Fixes Promtail User-Agent.
* [5698](https://github.com/grafana/loki/pull/5698) **paullryan**: Promtail: Fix retry/stop when erroring for out of cloudflare retention range (e.g. over 168 hours old)

##### Changes
* [5377](https://github.com/grafana/loki/pull/5377) **slim-bean**: Promtail: Remove promtail_log_entries_bytes_bucket histogram
* [5266](https://github.com/grafana/loki/pull/5266) **jeschkies**: Write Promtail position file atomically.
* [4794](https://github.com/grafana/loki/pull/4794) **taisho6339**: Aggregate inotify watcher to file target manager
* [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels

#### Logcli
* [5477](https://github.com/grafana/loki/pull/5477) **atomic77**: logcli: Remove port from TLS server name when provided in --addr
* [4667](https://github.com/grafana/loki/pull/4667) **jeschkies**: Package logcli as rpm and deb.
* [4606](https://github.com/grafana/loki/pull/4606) **kavirajk**: Execute Loki queries on raw log data piped to stdin

#### Lambda-Promtail
* [5065](https://github.com/grafana/loki/pull/5065) **AndreZiviani**: lambda-promtail: Add ability to ingest logs from S3

#### Fluent Bit
* [5223](https://github.com/grafana/loki/pull/5223) **cyriltovena**: fluent-bit: Attempt to unmarshal nested json.

#### FluentD
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum**: fluentd: Fix bug that caused lines to be dropped when containing non utf-8 characters
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum**: Fix encoding error in fluentd client

### Notes

This release was created from a branch starting at commit 614912181e6f3988b2b22791053278cfb64e169c but it may also contain backported changes from main.

Check the history of the branch `release-2.5.x`.

### Dependencies

* Go Version: 1.17.8


# 2.4.1 (2021/11/07)

Expand Down
13 changes: 10 additions & 3 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,19 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring,
// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed
// again after the dynamic config has been applied, so will take higher precedence.
// 2. If no explicit ring config is set, use the common ring configured if provided.
// 2. If no explicit ring config is set, use the common ring configured if provided. If a common replication factor is given,
// it will be reused by Ingester and IndexGateway ring.
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
// 3. If no common ring was provided, use the memberlist config if provided.
// 4. If no common ring or memberlist were provided, use the ingester's ring configuration.
//
// When using the ingester or common ring config, the loopback interface will be appended to the end of
// the list of default interface names
// the list of default interface names.
func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
if r.Common.ReplicationFactor != defaults.Common.ReplicationFactor {
r.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor = r.Common.ReplicationFactor
r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
}

sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) {
// common ring is provided, use that for all rings, merging with
// any specific configs provided for each ring
Expand Down Expand Up @@ -249,7 +255,7 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
}

// IndexGateway
if mergeWithExisting || reflect.DeepEqual(r.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring, defaults.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring) {
if mergeWithExisting || reflect.DeepEqual(r.IndexGateway.Ring, defaults.IndexGateway.Ring) {
r.IndexGateway.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.IndexGateway.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.IndexGateway.Ring.InstancePort = rc.InstancePort
Expand All @@ -259,6 +265,7 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
r.IndexGateway.Ring.InstanceZone = rc.InstanceZone
r.IndexGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.IndexGateway.Ring.KVStore = rc.KVStore
r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
}
}

Expand Down
32 changes: 30 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,10 +1363,38 @@ func Test_replicationFactor(t *testing.T) {
join_members:
- foo.bar.example.com
common:
replication_factor: 1`
replication_factor: 2`
config, _, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, 1, config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor)
assert.Equal(t, 2, config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor)
assert.Equal(t, 2, config.IndexGateway.Ring.ReplicationFactor)
})
}

func Test_IndexGatewayRingReplicationFactor(t *testing.T) {
t.Run("default replication factor is 3", func(t *testing.T) {
const emptyConfigString = `---
server:
http_listen_port: 80`
config, _, err := configWrapperFromYAML(t, emptyConfigString, nil)
assert.NoError(t, err)
assert.Equal(t, 3, config.IndexGateway.Ring.ReplicationFactor)
})

t.Run("explicit replication factor for the index gateway should override all other definitions", func(t *testing.T) {
yamlContent := `ingester:
lifecycler:
ring:
replication_factor: 15
common:
replication_factor: 30
index_gateway:
ring:
replication_factor: 7`

config, _, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, 7, config.IndexGateway.Ring.ReplicationFactor)
})
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

objectClient, err := chunk_storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
Expand Down Expand Up @@ -793,7 +794,8 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {

t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(indexgateway.RingReplicationFactor)
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(t.Cfg.IndexGateway.Ring.ReplicationFactor)
reg := prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)
t.indexGatewayRing, err = ring.New(ringCfg, indexgateway.RingIdentifier, indexgateway.RingKey, util_log.Logger, reg)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/stores/shipper/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -234,6 +235,11 @@ func (s *GatewayClient) ringModeDoQueries(ctx context.Context, gatewayQueries []
}

addrs := rs.GetAddresses()
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})

for _, addr := range addrs {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
genericClient, err := s.pool.GetClientFor(addr)
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions pkg/storage/stores/shipper/indexgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ const (
RingMode Mode = "ring"
)

// RingCfg is a wrapper for our Index Gateway ring configuration plus the replication factor.
type RingCfg struct {
// InternalRingCfg configures the Index Gateway ring.
loki_util.RingConfig `yaml:",inline"`

// ReplicationFactor defines how many Index Gateway instances are assigned to each tenant.
//
// If the replication factor is higher than the number of instances, the ring is considered not ready.
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
// Whenever the store queries the ring key-value store for the Index Gateway instance responsible for tenant X,
// multiple Index Gateway instances are expected to be returned as Index Gateway might be busy/locked for specific
// reasons (this is assured by the spikey behavior of Index Gateway latencies).
ReplicationFactor int `yaml:"replication_factor"`
}

// RegisterFlagsWithPrefix register all Index Gateway flags related to its ring but with a proper store prefix to avoid conflicts.
func (cfg *RingCfg) RegisterFlags(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, "replication-factor", 3, "how many index gateway instances are assigned to each tenant")
}

// Config configures an Index Gateway server.
type Config struct {
// Mode configures in which mode the client will be running when querying and communicating with an Index Gateway instance.
Expand All @@ -56,11 +76,11 @@ type Config struct {
//
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring loki_util.RingConfig `yaml:"ring,omitempty"`
Ring RingCfg `yaml:"ring,omitempty"`
}

// RegisterFlags register all IndexGatewayClientConfig flags and all the flags of its subconfigs but with a prefix (ex: shipper).
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix("index-gateway.", "collectors/", f)
cfg.Ring.RegisterFlags("index-gateway.", "collectors/", f)
f.StringVar((*string)(&cfg.Mode), "index-gateway.mode", SimpleMode.String(), "mode in which the index gateway client will be running")
}
9 changes: 1 addition & 8 deletions pkg/storage/stores/shipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ const (

// RingKey is the name of the key used to register the different Index Gateway instances in the key-value store.
RingKey = "index-gateway"

// RingReplicationFactor is the number of instances that will be assigned a ring value, defining redundance.
//
// Whenever the store queries the ring key-value store for the Index Gateway instance responsible for tenant X,
// multiple Index Gateway instances are expected to be returned as Index Gateway might be busy/locked for specific
// reasons (this is assured by the spikey behavior of Index Gateway latencies).
RingReplicationFactor = 3
)

type IndexQuerier interface {
Expand Down Expand Up @@ -100,7 +93,7 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere
return nil, errors.Wrap(err, "index gateway create ring lifecycler")
}

ringCfg := cfg.Ring.ToRingConfig(RingReplicationFactor)
ringCfg := cfg.Ring.ToRingConfig(cfg.Ring.ReplicationFactor)
g.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), log)
if err != nil {
return nil, errors.Wrap(err, "index gateway create ring client")
Expand Down