Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd,pkg: flush storage when hashring changes #1480

Merged
merged 2 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand Down
258 changes: 165 additions & 93 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,114 +149,153 @@ func runReceive(
MaxBlockDuration: tsdbBlockDuration,
WALCompression: true,
}
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// dbReady signals when TSDB is ready and the Store gRPC server can start.
dbReady := make(chan struct{}, 1)
// updateDB signals when TSDB needs to be flushed and updated.
updateDB := make(chan struct{}, 1)
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
close(dbOpen)
return fmt.Errorf("opening storage failed: %s", err)
// Before actually starting, we need to make sure
// the WAL is flushed.
squat marked this conversation as resolved.
Show resolved Hide resolved
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
if err := db.Open(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not call the Open method inside Flush? this will remove some of the boilerplate.

The same for db.Get()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could do that. I was worried that it would be confusing if flush left an open DB. Does you think it would be more expected for Flush to leave a running DB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is most logical that flush should: open the db -> flush -> close the db

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in this case flush shouldnt be a type and there should be no handle returned. Instead you think it should simply be a func that flushes the specified dir?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah I missed that it needs to be kept open for the Get() method.

How about something like:

func (f *FlushableStorage) Flush() error {
	f.Open()
	defer f.Close() 
	...
}

and

// Get opens a db instance and returns a reference to it.
// The caller is responsible for closing the db.
func (f *FlushableStorage) Get() *promtsdb.DB {
	f.Open()
	return f.DB
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I just want to be clear about the API. I think it’s confusing if Flush leaves an open DB if the original state was closed. It may be clearest to say Flush will always leave the storage in the same state it was in: open storage will stay open; closed storage will remain closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep , that is a great idea, but can you imagine a case where the storage would be open when running a flush?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DB is open immediately before every flush (we are actively writing samples until we get the reshard signal and need to flush), someone has to close it, either the caller, or the flush method itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm yeah, maybe now with the new tsdb.Flush method can just have just use it directly and not need the separate FlushableStorage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that’s what I was driving at earlier. It will reduce the code here, though we’ll have some more boilerplate in the caller since it will have to open a new DB after flushing. This type is convenient because we can have one single reference that takes care of re-opening itself after flushing if it was already open and the only thing the caller has to do is call Open once at the very beginning with all the args and then Flush

return errors.Wrap(err, "opening storage")
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
return
}
if err := db.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "tsdb started")
}()

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
webHandler.StorageReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
for {
select {
case <-cancel:
return nil
case _, ok := <-updateDB:
if !ok {
return nil
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.SetReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
dbReady <- struct{}{}
}
close(cancel)
},
}
}, func(err error) {
close(cancel)
},
)
}

hashringReady := make(chan struct{})
level.Debug(logger).Log("msg", "setting up hashring")
{
updates := make(chan receive.Hashring)
// Note: the hashring configuration watcher
// is the sender and thus closes the chan.
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
if cw != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
receive.HashringFromConfig(ctx, updates, cw)
return nil
}, func(error) {
cancel()
close(updates)
})
} else {
defer close(updates)
cancel := make(chan struct{})
g.Add(func() error {
updates <- receive.SingleNodeHashring(endpoint)
<-cancel
return nil
}, func(error) {
close(cancel)
close(updates)
})
}

cancel := make(chan struct{})
g.Add(
func() error {
g.Add(func() error {
defer close(updateDB)
for {
select {
case h := <-updates:
close(hashringReady)
case h, ok := <-updates:
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
statusProber.SetReady()
case <-cancel:
close(hashringReady)
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
msg := "hashring has changed; server is not ready to receive web requests."
webHandler.Hashring(nil)
statusProber.SetNotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
case <-cancel:
return nil
}
<-cancel
return nil
},
func(err error) {
close(cancel)
},
}
}, func(err error) {
close(cancel)
},
)
}

Expand All @@ -269,36 +308,46 @@ func runReceive(
level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
s *grpc.Server
l net.Listener
)
startGRPC := make(chan struct{})
g.Add(func() error {
<-dbOpen

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

defer close(startGRPC)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

// Wait hashring to be ready before start serving metrics
<-hashringReady
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
for range dbReady {
if s != nil {
s.Stop()
}
l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), component.Receive, lset)
s = newStoreGRPCServer(logger, &receive.UnRegisterer{Registerer: reg}, tracer, tsdbStore, opts)
startGRPC <- struct{}{}
}
return nil
}, func(error) {
if s != nil {
s.Stop()
}
})
// We need to be able to start and stop the gRPC server
brancz marked this conversation as resolved.
Show resolved Hide resolved
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
if err := s.Serve(l); err != nil {
return errors.Wrap(err, "serve gRPC")
}
}
return nil
}, func(error) {})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
Expand All @@ -313,17 +362,6 @@ func runReceive(
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
Expand All @@ -341,20 +379,54 @@ func runReceive(

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use some global cancel context to avoid this blocking as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole idea is that we want this to block so nothing is ingested before old blocks are flushed and uploaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you think that it will cause issues when you try to exit the receiver in the middle of an upload and it blocks here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already had this discussion with @brancz on this PR, but I'm not sure where GitHub put that comment history :/ . The original implementation had a 10-minute timeout on the context and the suggestion was to remove that. Let's cover that question in a follow rather than go back and forth.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I wasn't aware you discussed it already.

When I said a global context I meant to be cancel-able like the rest of the actors groups.

level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
// Before quitting, ensure all blocks are uploaded.
defer func() {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
}()

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
squat marked this conversation as resolved.
Show resolved Hide resolved
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

return nil
return nil
})
}, func(error) {
cancel()
})
}, func(error) {
cancel()
})
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer close(uploadC)
for {
select {
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
cancel()
uploadDone <- struct{}{}
}
}
}, func(error) {
cancel()
})
}
}

level.Info(logger).Log("msg", "starting receiver")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 // v1.8.2 is misleading as Prometheus does not have v2 module. This is pointing to one commit after 2.12.0.
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module.
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.22.4 h1:Mcq67g9mZEBvBuj/x7mF9KCyw5M8/4I/cjQPkdCsq0I=
github.com/aws/aws-sdk-go v1.22.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.23.12 h1:2UnxgNO6Y5J1OrkXS8XNp0UatDxD1bWHiDT62RDPggI=
github.com/aws/aws-sdk-go v1.23.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
Expand Down Expand Up @@ -401,8 +401,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE=
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 h1:yZAWzfQYJN+vduRHL5jcTrVw+XwYU52ZrAhprmwoknI=
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2/go.mod h1:rMTlmxGCvukf2KMu3fClMDKLLoJ5hl61MhcJ7xKakf0=
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 h1:B9IMa7s163/ZDSduepHHfOZZHSKdSbgo/bFY5c+FMAs=
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467/go.mod h1:aojjoH+vNHyJUTJoW15HoQWMKXxNhQylU6/G261nqxQ=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down
Loading