Skip to content

Commit

Permalink
compact: take initialization out to the background goroutine
Browse files Browse the repository at this point in the history
Initialization is a lingering process and can take minutes to sync meta and index files. Move it to background goroutine to allow metrics/debug handlers be ready right after application is start. It provides admins set up liveness and readindess probes at the k8s.

Recommendations: use http port for liveness and grpc for readyness

Fixed thanos-io#532
  • Loading branch information
xjewer committed Sep 26, 2018
1 parent 1e1f9e5 commit bdca6b8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
7 changes: 3 additions & 4 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
"syscall"

gmetrics "github.com/armon/go-metrics"

gprom "github.com/armon/go-metrics/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand All @@ -37,7 +36,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, bool) error
Expand Down
19 changes: 12 additions & 7 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ func runStore(
return errors.Wrap(err, "create object storage store")
}

begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

ctx, cancel := context.WithCancel(context.Background())
readyToServe := make(chan struct{}, 1)

g.Add(func() error {
begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store readyToServe", "init_duration", time.Since(begin).String())
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Ready to start Grpc server.
readyToServe <- struct{}{}

err := runutil.Repeat(3*time.Minute, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
Expand Down Expand Up @@ -148,6 +152,7 @@ func runStore(
storepb.RegisterStoreServer(s, bs)

g.Add(func() error {
<-readyToServe
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
Expand Down

0 comments on commit bdca6b8

Please sign in to comment.