Skip to content

Commit

Permalink
cmd,pkg: flush storage when hashring changes
Browse files Browse the repository at this point in the history
In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.

Signed-off-by: Lucas Servén Marín <[email protected]>
  • Loading branch information
squat committed Sep 10, 2019
1 parent 7bf6c31 commit fb0bec7
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 115 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand Down
267 changes: 180 additions & 87 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,56 +140,96 @@ func runReceive(
MinBlockDuration: model.Duration(time.Hour * 2),
MaxBlockDuration: model.Duration(time.Hour * 2),
}
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
})

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// dbReady signals when TSDB is ready and the Store gRPC server can start.
dbReady := make(chan struct{})
// updateDB signals when TSDB needs to be flushed and updated.
updateDB := make(chan struct{})
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{})
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{})
level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
close(dbOpen)
return fmt.Errorf("opening storage failed: %s", err)
}
level.Info(logger).Log("msg", "tsdb started")

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
webHandler.StorageReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
// Before actually starting, we need to make sure
// the WAL is flushed.
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
g.Add(func() error {
for {
select {
case <-cancel:
return nil
case _, ok := <-updateDB:
if !ok {
return nil
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
// Ensure we can safely call close if we exit.
localStorage.Set(nil, startTimeMargin)
if upload {
sendOrCancel(cancel, uploadC)
<-uploadDone
}
level.Info(logger).Log("msg", "starting TSDB ...")
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.Storage(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
level.Info(logger).Log("msg", "server is ready to receive web requests.")
sendOrCancel(cancel, dbReady)
}
close(cancel)
},
}
}, func(err error) {
// Close checks if the DB is nil, so we can safely
// close the storage here.
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
}
close(cancel)
close(dbReady)
close(uploadC)
},
)
}

Expand Down Expand Up @@ -218,28 +258,25 @@ func runReceive(
}

cancel := make(chan struct{})
g.Add(
func() error {
g.Add(func() error {
for {
select {
case h := <-updates:
case h, ok := <-updates:
if !ok {
return nil
}
level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests")
webHandler.Storage(nil)
webHandler.Hashring(h)
sendOrCancel(cancel, updateDB)
case <-cancel:
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
webHandler.Hashring(nil)
level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.")
case <-cancel:
return nil
}
<-cancel
return nil
},
func(err error) {
close(cancel)
},
}
}, func(err error) {
close(cancel)
close(updateDB)
},
)
}

Expand All @@ -251,34 +288,46 @@ func runReceive(
level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
s *grpc.Server
l net.Listener
)
startGRPC := make(chan struct{})
g.Add(func() error {
<-dbOpen

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
for range dbReady {
if s != nil {
s.Stop()
}
l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), component.Receive, lset)
s = newStoreGRPCServer(logger, &receive.UnRegisterer{Registerer: reg}, tracer, tsdbStore, opts)
startGRPC <- struct{}{}
}
return nil
}, func(error) {
if s != nil {
s.Stop()
}
close(startGRPC)
})
// We need to be able to start and stop the gRPC server
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
if err := s.Serve(l); err != nil {
return errors.Wrap(err, "serve gRPC")
}
}
return nil
}, func(error) {})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
Expand All @@ -293,17 +342,6 @@ func runReceive(
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
Expand All @@ -321,23 +359,78 @@ func runReceive(

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
// Before quitting, ensure all blocks are uploaded.
defer func() {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
}()

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

return nil
return nil
})
}, func(error) {
cancel()
})
}, func(error) {
cancel()
})
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
}
cancel()
sendOrCancel(ctx.Done(), uploadDone)
}
}
}, func(error) {
cancel()
close(uploadDone)
})
}
}

// Before quitting, ensure the WAL is flushed.
defer func() {
if err := db.Open(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to open storage")
return
}
if err := db.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
return
}
}()

level.Info(logger).Log("msg", "starting receiver")

return nil
}

// sendOrCancel allows us to signal a chan and still
// quit early if the receiver has already quit.
func sendOrCancel(cancel <-chan struct{}, ch chan<- struct{}) {
select {
case <-cancel:
case ch <- struct{}{}:
}
}
5 changes: 3 additions & 2 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ func (cw *ConfigWatcher) Run(ctx context.Context) {
if len(event.Name) == 0 {
break
}
// Everything but a chmod requires rereading.
if event.Op^fsnotify.Chmod == 0 {
// Everything but a CHMOD requires rereading.
// If the file was removed, we can't read it, so skip.
if event.Op^(fsnotify.Chmod|fsnotify.Remove) == 0 {
break
}
// Changes to a file can spawn various sequences of events with
Expand Down
Loading

0 comments on commit fb0bec7

Please sign in to comment.