Skip to content

Commit

Permalink
Merge branch 'main' into fus-replicate-ignore-deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
FUSAKLA authored Feb 3, 2022
2 parents 69db471 + 4357002 commit ef90283
Show file tree
Hide file tree
Showing 9 changed files with 699 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4974](https://github.com/thanos-io/thanos/pull/4974) Store: Support tls_config configuration for connecting with Azure storage.
- [#4999](https://github.com/thanos-io/thanos/pull/4999) COS: Support `endpoint` configuration for vpc internal endpoint.
- [#5059](https://github.com/thanos-io/thanos/pull/5059) Compactor: Adding minimum retention flag validation for downsampling retention.
- [#5111](https://github.com/thanos-io/thanos/pull/5111) Add matcher support to Query Rules endpoint.
- [#5117](https://github.com/thanos-io/thanos/pull/5117) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark.

### Fixed
Expand Down
42 changes: 41 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,47 @@ Following options are used for metadata caching (meta.json files, deletion mark

The yml structure for setting the in memory cache configs for caching bucket is the same as the [in-memory index cache](#in-memory-index-cache) and all the options to configure Caching Buket mentioned above can be used.

Note that chunks and metadata cache is an experimental feature, and these fields may be renamed or removed completely in the future.
In addition to the same cache backends memcached/in-memory/redis, caching bucket supports another type of backend.

### *EXPERIMENTAL* Groupcache Caching Bucket Provider

Groupcache is an experimental cache backend for the caching bucket introduced from version `v0.25` of Thanos.

With groupcache, you do not need any external components for the caching layer because the caching layer becomes shared between all of the processes of Thanos Store. Another benefit that it provides is that it is a cache filling library meaning that given enough space in memory, the values will only be loaded once. For example, if the same metric is used in multiple concurrent queries then with groupcache Thanos Store would only load the metric's data from remote object storage once.

All in all, it should be a superior caching solution to all other currently supported solutions. It just needs some battle-testing. So, help is needed with testing in real life scenarios! Please create an issue if you've found any problem. 🤗

Here is how it looks like:

<img src="../img/groupcache.png" class="img-fluid" alt="Example of a groupcache group showing that each Thanos Store instance communicates with all others in the group"/>

Note that with groupcache enabled, new routes are registed on the HTTP server with the prefix `/_groupcache`. Using those routes, anyone can access any kind of data in the configured remote object storage. So, if you are exposing your Thanos Store to the Internet then it is highly recommended to use a reverse proxy in front and disable access to `/_groupcache/...`.

Currently TLS *is* supported but on the client's side no verification is done of the received certificate. This will be added in the future. HTTP2 over cleartext is also enabled to improve the performance for users that don't use TLS.

Example configuration that you could provide to the caching bucket configuration flags with the explanation of each configuration key:

```yaml
type: GROUPCACHE
config:
self_url: http://10.123.22.3:8080
peers:
- http://10.123.22.3:8080
- http://10.123.22.10:8080
- http://10.123.22.100:8080
groupcache_group: test_group
dns_interval: 1s
```

In this case, three Thanos Store nodes are running in the same group meaning that they all point to the same remote object storage.

- `self_url` - our own URL. On each node this will be different. This should be the external IP through which other nodes could access us;
- `groupcache_group` - the groupcache group's name. All nodes using the same remote object storage configuration should use the same name. It is used in the HTTP requests. If it is different then nodes will not be able to load data from each other.
- `dns_internal` - how often DNS lookups should be made.

In the `peers` section it is possible to use the prefix form to automatically look up the peers using DNS. For example, you could use `dns+http://store.thanos.consul.svc:8080` to automatically look up healthy nodes from Consul using its DNS interface.

Note that there must be no trailing slash in the `peers` configuration i.e. one of the strings must be identical to `self_url` and others should have the same form. Without this, loading data from peers may fail.

## Index Header

Expand Down
Binary file added docs/img/groupcache.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,10 +787,15 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(
typ = int32(rulespb.RulesRequest_ALL)
}

if err := r.ParseForm(); err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error parsing request form='%v'", MatcherParam)}
}

// TODO(bwplotka): Allow exactly the same functionality as query API: passing replica, dedup and partial response as HTTP params as well.
req := &rulespb.RulesRequest{
Type: rulespb.RulesRequest_Type(typ),
PartialResponseStrategy: ps,
MatcherString: r.Form[MatcherParam],
}
tracing.DoInSpan(ctx, "retrieve_rules", func(ctx context.Context) {
groups, warnings, err = client.Rules(ctx, req)
Expand Down
11 changes: 11 additions & 0 deletions pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -78,6 +80,15 @@ func parseGroupcacheConfig(conf []byte) (GroupcacheConfig, error) {
config.Peers = append(config.Peers, config.SelfURL)
}

for i, peer := range config.Peers {
// Workaround for https://github.com/thanos-community/galaxycache/blob/master/http/http.go#L205-L210.
// If the peer has a slash at the end then the router redirects
// and then the request fails.
if strings.HasSuffix(peer, "/") {
return GroupcacheConfig{}, fmt.Errorf("peer %d must not have a trailing slash (%s)", i, peer)
}
}

return config, nil
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"context"
"sort"
"sync"
"text/template"
"text/template/parse"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/rules/rulespb"
Expand Down Expand Up @@ -58,6 +61,16 @@ func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) (*ru
return nil, nil, errors.Wrap(err, "proxy Rules")
}

var err error
matcherSets := make([][]*labels.Matcher, len(req.MatcherString))
for i, s := range req.MatcherString {
matcherSets[i], err = parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, errors.Wrap(err, "parser ParseMetricSelector")
}
}

resp.groups = filterRules(resp.groups, matcherSets)
// TODO(bwplotka): Move to SortInterface with equal method and heap.
resp.groups = dedupGroups(resp.groups)
for _, g := range resp.groups {
Expand All @@ -67,6 +80,52 @@ func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) (*ru
return &rulespb.RuleGroups{Groups: resp.groups}, resp.warnings, nil
}

// filterRules filters rules in a group according to given matcherSets.
func filterRules(ruleGroups []*rulespb.RuleGroup, matcherSets [][]*labels.Matcher) []*rulespb.RuleGroup {
if len(matcherSets) == 0 {
return ruleGroups
}

for _, g := range ruleGroups {
filteredRules := g.Rules[:0]
for _, r := range g.Rules {
rl := r.GetLabels()
if matches(matcherSets, rl) {
filteredRules = append(filteredRules, r)
}
}
g.Rules = filteredRules
}

return ruleGroups
}

// matches returns whether the non-templated labels satisfy all the matchers in matcherSets.
func matches(matcherSets [][]*labels.Matcher, l labels.Labels) bool {
if len(matcherSets) == 0 {
return true
}

var nonTemplatedLabels labels.Labels
labelTemplate := template.New("label")
for _, label := range l {
t, err := labelTemplate.Parse(label.Value)
// Label value is non-templated if it is one node of type NodeText.
if err == nil && len(t.Root.Nodes) == 1 && t.Root.Nodes[0].Type() == parse.NodeText {
nonTemplatedLabels = append(nonTemplatedLabels, label)
}
}

for _, matchers := range matcherSets {
for _, m := range matchers {
if v := nonTemplatedLabels.Get(m.Name); !m.Matches(v) {
return false
}
}
}
return true
}

// dedupRules re-sorts the set so that the same series with different replica
// labels are coming right after each other.
func dedupRules(rules []*rulespb.Rule, replicaLabels map[string]struct{}) []*rulespb.Rule {
Expand Down
Loading

0 comments on commit ef90283

Please sign in to comment.