Skip to content

Commit

Permalink
receive: use gRPC for forwarding messages (#1970)
Browse files Browse the repository at this point in the history
* receive: use grpc to forward remote write requests

Signed-off-by: Brett Jones <[email protected]>

* test/e2e: update receive e2e tests to use gRPC

Signed-off-by: Lucas Servén Marín <[email protected]>

* CHANGELOG: add note about receive config

Signed-off-by: Lucas Servén Marín <[email protected]>

Co-authored-by: Lucas Servén Marín <[email protected]>
  • Loading branch information
2 people authored and brancz committed Jan 10, 2020
1 parent 9c968a8 commit 3cf366a
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 241 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags
- [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction
- [#1975](https://github.com/thanos-io/thanos/pull/1975) Store Gateway: fixed panic caused by memcached servers selector when there's 1 memcached node
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.

### Fixed

Expand Down
53 changes: 4 additions & 49 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -21,9 +19,12 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -34,11 +35,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -165,48 +162,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
// store nodes, merging and duplicating the data to satisfy user query.
func runQuery(
Expand Down Expand Up @@ -251,7 +206,7 @@ func runQuery(
})
reg.MustRegister(duplicatedStores)

dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
Expand All @@ -27,7 +30,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
Expand Down Expand Up @@ -189,6 +191,11 @@ func runReceive(
if err != nil {
return err
}
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, rwServerCert != "", rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName)
if err != nil {
return err
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: rwAddress,
Registry: reg,
Expand All @@ -199,6 +206,7 @@ func runReceive(
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
DialOpts: dialOpts,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -376,8 +384,12 @@ func runReceive(
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)
rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
WriteableStoreServer: webHandler,
}

s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, tsdbStore,
s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
59 changes: 59 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package extgrpc

import (
"math"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}
Loading

0 comments on commit 3cf366a

Please sign in to comment.