Skip to content

Commit

Permalink
Receiver: Use proxy store for MultiTSDB (thanos-io#5552)
Browse files Browse the repository at this point in the history
* Introduce store type

- This commit introduces StoreType and end replaces / expands the Addr() method to be StoreInfo() method.
This method returns information on whether the store is local (case of MultiTSDB) or remote. If store is remote, we also return it's address in the StoreInfo() method
- Adjust relevant parts of code in proxy
- Adjust proxy tests

Signed-off-by: Matej Gera <[email protected]>

* Adjust existing proxy.Client implementations

- Adjust (and slightly refactor) endpoint set client
- Adjust test implementations

Signed-off-by: Matej Gera <[email protected]>

* Introduce local client implementation of proxy.Client interface

- Introduces new type in MultiTSDB
- Introduce method to obtain client for a tenant. This client leverages the server-as-client store client implementation.
- Adjust tests

Signed-off-by: Matej Gera <[email protected]>

* Add receive to interactive test

Signed-off-by: Matej Gera <[email protected]>

* Remove old MultiTSDB code

Signed-off-by: Matej Gera <[email protected]>

* Fix imports

Signed-off-by: Matej Gera <[email protected]>

* Fix formatting

Signed-off-by: Matej Gera <[email protected]>

* Remove forgotten dead code

Signed-off-by: Matej Gera <[email protected]>

* Remove store type and adjust Addr method instead

Signed-off-by: Matej Gera <[email protected]>

* Disable timeout in receiver proxy

Signed-off-by: Matej Gera <[email protected]>

* store: fix nil panic in proxy heap

With lazy proxying we need to wait for at least one response before we
can build a heap properly.

Closes thanos-io#5717
Fixes thanos-io#5552.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* Use lazy retrieval strategy for receive proxy

Signed-off-by: Matej Gera <[email protected]>

Signed-off-by: Matej Gera <[email protected]>
Signed-off-by: Giedrius Statkevičius <[email protected]>
Co-authored-by: Giedrius Statkevičius <[email protected]>
Signed-off-by: utukj <[email protected]>
  • Loading branch information
2 people authored and utukJ committed Oct 13, 2022
1 parent a1cdfc9 commit f840156
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 830 deletions.
7 changes: 5 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,14 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

mts := store.NewMultiTSDBStore(
mts := store.NewProxyStore(
logger,
reg,
dbs.TSDBLocalClients,
comp,
dbs.TSDBStores,
labels.Labels{},
0,
store.LazyRetrieval,
)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
Expand Down
13 changes: 8 additions & 5 deletions examples/interactive/interactive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (

"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/providers/s3"

"github.com/thanos-io/thanos/pkg/testutil"
tracingclient "github.com/thanos-io/thanos/pkg/tracing/client"
"github.com/thanos-io/thanos/pkg/tracing/jaeger"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

const (
Expand Down Expand Up @@ -248,6 +248,8 @@ func TestReadOnlyThanosSetup(t *testing.T) {
sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}))
sidecar2 := e2edb.NewThanosSidecar(e, "sidecar2", prom2, e2edb.WithImage("thanos:latest"))

receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithIngestionEnabled().Init()

testutil.Ok(t, exec("cp", "-r", prom1Data+"/.", promHA0.Dir()))
testutil.Ok(t, exec("sh", "-c", "find "+prom1Data+"/ -maxdepth 1 -type d | tail -5 | xargs -I {} cp -r {} "+promHA1.Dir())) // Copy only 5 blocks from 9 to mimic replica 1 with partial data set.
testutil.Ok(t, exec("cp", "-r", prom2Data+"/.", prom2.Dir()))
Expand Down Expand Up @@ -278,9 +280,9 @@ func TestReadOnlyThanosSetup(t *testing.T) {
}))

testutil.Ok(t, e2e.StartAndWaitReady(m1))
testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2))
testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2, receive1))

// Let's start query on top of all those 5 store APIs (global query engine).
// Let's start query on top of all those 6 store APIs (global query engine).
//
// ┌──────────────┐
// │ │
Expand Down Expand Up @@ -331,14 +333,15 @@ func TestReadOnlyThanosSetup(t *testing.T) {
sidecarHA0.InternalEndpoint("grpc"),
sidecarHA1.InternalEndpoint("grpc"),
sidecar2.InternalEndpoint("grpc"),
receive1.InternalEndpoint("grpc"),
},
e2edb.WithImage("thanos:latest"),
e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}),
)
testutil.Ok(t, e2e.StartAndWaitReady(query1))

// Wait until we have 5 gRPC connections.
testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(5), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))
// Wait until we have 6 gRPC connections.
testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(6), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

const path = "graph?g0.expr=sum(continuous_app_metric0)%20by%20(cluster%2C%20replica)&g0.tab=0&g0.stacked=0&g0.range_input=2w&g0.max_source_resolution=0s&g0.deduplicate=0&g0.partial_response=0&g0.store_matches=%5B%5D&g0.end_input=2021-07-27%2000%3A00%3A00"
testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s/%s", query1.Endpoint("http"), path)))
Expand Down
20 changes: 9 additions & 11 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec {
return &GRPCEndpointSpec{addr: addr, isStrictStatic: isStrictStatic}
}

// IsStrictStatic returns true if the endpoint has been statically defined and it is under a strict mode.
func (es *GRPCEndpointSpec) IsStrictStatic() bool {
return es.isStrictStatic
}

func (es *GRPCEndpointSpec) Addr() string {
// API address should not change between state changes.
return es.addr
Expand Down Expand Up @@ -387,7 +382,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
e.endpoints[addr] = er
}
for addr, er := range staleRefs {
level.Info(er.logger).Log("msg", unhealthyEndpointMessage, "address", er.Addr(), "extLset", labelpb.PromLabelSetsToString(er.LabelSets()))
level.Info(er.logger).Log("msg", unhealthyEndpointMessage, "address", er.addr, "extLset", labelpb.PromLabelSetsToString(er.LabelSets()))
er.Close()
delete(e.endpoints, addr)
}
Expand Down Expand Up @@ -442,7 +437,7 @@ func (e *EndpointSet) getTimedOutRefs() map[string]*endpointRef {

lastCheck := er.getStatus().LastCheck
if now.Sub(lastCheck) >= e.unhealthyEndpointTimeout {
result[er.Addr()] = er
result[er.addr] = er
}
}

Expand Down Expand Up @@ -604,7 +599,7 @@ func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec
logger: e.logger,
created: e.now(),
addr: spec.Addr(),
isStrict: spec.IsStrictStatic(),
isStrict: spec.isStrictStatic,
cc: conn,
}, nil
}
Expand Down Expand Up @@ -787,11 +782,14 @@ func (er *endpointRef) SendsSortedSeries() bool {

func (er *endpointRef) String() string {
mint, maxt := er.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", er.addr, labelpb.PromLabelSetsToString(er.LabelSets()), mint, maxt)
return fmt.Sprintf(
"Addr: %s LabelSets: %v Mint: %d Maxt: %d",
er.addr, labelpb.PromLabelSetsToString(er.LabelSets()), mint, maxt,
)
}

func (er *endpointRef) Addr() string {
return er.addr
func (er *endpointRef) Addr() (string, bool) {
return er.addr, false
}

func (er *endpointRef) Close() {
Expand Down
9 changes: 6 additions & 3 deletions pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,14 @@ func (s *storeRef) SendsSortedSeries() bool {

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt)
return fmt.Sprintf(
"Addr: %s LabelSets: %v Mint: %d Maxt: %d",
s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt,
)
}

func (s *storeRef) Addr() string {
return s.addr
func (s *storeRef) Addr() (string, bool) {
return s.addr, false
}

func (s *storeRef) close() {
Expand Down
9 changes: 6 additions & 3 deletions pkg/query/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,10 @@ func (i inProcessClient) TimeRange() (mint, maxt int64) {
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }
func (i inProcessClient) SupportsSharding() bool {
return false
}

func (i inProcessClient) SendsSortedSeries() bool { return false }
func (i inProcessClient) SupportsSharding() bool { return false }
func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() (string, bool) { return i.name, true }
66 changes: 59 additions & 7 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -33,6 +34,7 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

type TSDBStats interface {
Expand Down Expand Up @@ -88,6 +90,49 @@ func NewMultiTSDB(
}
}

type localClient struct {
storepb.StoreClient

labelSetFunc func() []labelpb.ZLabelSet
timeRangeFunc func() (int64, int64)
}

func newLocalClient(
c storepb.StoreClient,
labelSetFunc func() []labelpb.ZLabelSet,
timeRangeFunc func() (int64, int64),
) *localClient {
return &localClient{c, labelSetFunc, timeRangeFunc}
}

func (l *localClient) LabelSets() []labels.Labels {
return labelpb.ZLabelSetsToPromLabelSets(l.labelSetFunc()...)
}

func (l *localClient) TimeRange() (mint int64, maxt int64) {
return l.timeRangeFunc()
}

func (l *localClient) String() string {
mint, maxt := l.timeRangeFunc()
return fmt.Sprintf(
"LabelSets: %v Mint: %d Maxt: %d",
labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt,
)
}

func (l *localClient) Addr() (string, bool) {
return "", true
}

func (l *localClient) SupportsSharding() bool {
return true
}

func (l *localClient) SendsSortedSeries() bool {
return true
}

type tenant struct {
readyS *ReadyStorage
storeTSDB *store.TSDBStore
Expand All @@ -114,6 +159,15 @@ func (t *tenant) store() *store.TSDBStore {
return t.storeTSDB
}

func (t *tenant) client() store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

store := t.store()
client := storepb.ServerAsClient(store, 0)
return newLocalClient(client, store.LabelSet, store.TimeRange)
}

func (t *tenant) exemplars() *exemplars.TSDB {
t.mtx.RLock()
defer t.mtx.RUnlock()
Expand Down Expand Up @@ -363,17 +417,15 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
return merr.Err()
}

func (t *MultiTSDB) TSDBStores() map[string]store.InfoStoreServer {
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]store.InfoStoreServer, len(t.tenants))
for k, tenant := range t.tenants {
s := tenant.store()
if s != nil {
res[k] = s
}
res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
res = append(res, tenant.client())
}

return res
}

Expand Down
Loading

0 comments on commit f840156

Please sign in to comment.