Skip to content

Commit

Permalink
*: Add initial remote-write-receive component
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Mar 12, 2019
1 parent 432785e commit 1ab4011
Show file tree
Hide file tree
Showing 15 changed files with 1,223 additions and 126 deletions.
26 changes: 19 additions & 7 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (
func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

return grpcBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA
}

func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

httpBindAddr = regHTTPAddrFlag(cmd)
grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd)
grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
registerCompact(cmds, app, "compact")
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
231 changes: 231 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package main

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpMetricsBindAddr := regHTTPAddrFlag(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runReceive(
g,
logger,
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
)
}
}

func runReceive(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

tsdbCfg := &tsdb.Options{
Retention: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
MinBlockDuration: model.Duration(time.Hour * 2),
MaxBlockDuration: model.Duration(time.Hour * 2),
}

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
})

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}

level.Debug(logger).Log("msg", "setting up endpoint readiness")
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
break
case <-cancel:
reloadReady.Close()
return nil
}

reloadReady.Close()

webHandler.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
return fmt.Errorf("opening storage failed: %s", err)
}
level.Info(logger).Log("msg", "tsdb started")

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
}
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
return err
}

level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
)
g.Add(func() error {
select {
case <-dbOpen:
break
}

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
if s != nil {
s.Stop()
}
if l != nil {
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
}
})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
return fmt.Errorf("error starting web server: %s", err)
}
return nil
},
func(err error) {
cancel()
},
)
}
level.Info(logger).Log("msg", "starting receiver")

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ require (
github.com/miekg/dns v1.0.8 // indirect
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mozillazg/go-cos v0.11.0
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
github.com/oklog/run v1.0.0
github.com/oklog/ulid v1.3.1
github.com/olekukonko/tablewriter v0.0.1
github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.0.2
github.com/pkg/errors v0.8.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.1 h1:PZSj/UFNaVp3KxrzHOcS7oyuWA7LoOY/77yCTEFu21U=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7 h1:8KbikWulLUcMM96hBxjgoo6gTmCkG6HYSDohv/WygYU=
github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
Expand Down
3 changes: 3 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func FromProto(storeType storepb.StoreType) StoreAPI {
return Sidecar
case storepb.StoreType_STORE:
return Store
case storepb.StoreType_RECEIVE:
return Receive
default:
return nil
}
Expand All @@ -84,4 +86,5 @@ var (
Rule = sourceStoreAPI{component: component{name: "rule"}}
Sidecar = sourceStoreAPI{component: component{name: "sidecar"}}
Store = sourceStoreAPI{component: component{name: "store"}}
Receive = sourceStoreAPI{component: component{name: "receive"}}
)
Loading

0 comments on commit 1ab4011

Please sign in to comment.