From 86a8573af2dc44b948ee0fb23b430acbdeb59629 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Tue, 30 Jun 2020 01:09:06 +0200 Subject: [PATCH] receive: Added more observability, fixed leaktest, to actually check leaks ): Reason: Missing (), probably we need linter for this. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/receive.go | 35 +++++++++++------ pkg/receive/handler_test.go | 6 +-- pkg/receive/multitsdb.go | 54 ++++++++++++++------------ pkg/receive/multitsdb_test.go | 12 ++---- pkg/tracing/stackdriver/tracer_test.go | 1 + 5 files changed, 61 insertions(+), 47 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 80f04a9460d..3af31ddb695 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -18,6 +18,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" kingpin "gopkg.in/alecthomas/kingpin.v2" @@ -205,8 +206,7 @@ func runReceive( allowOutOfOrderUpload bool, ) error { logger = log.With(logger, "component", "receive") - level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") - + level.Warn(logger).Log("msg", "setting up receive") rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) if err != nil { return err @@ -285,8 +285,8 @@ func runReceive( // dbReady signals when TSDB is ready and the Store gRPC server can start. dbReady := make(chan struct{}, 1) - // updateDB signals when TSDB needs to be flushed and updated. - updateDB := make(chan struct{}, 1) + // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change + hashringChangedChan := make(chan struct{}, 1) // uploadC signals when new blocks should be uploaded. uploadC := make(chan struct{}, 1) // uploadDone signals when uploading has finished. @@ -294,29 +294,41 @@ func runReceive( level.Debug(logger).Log("msg", "setting up tsdb") { - // TSDB. + dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_attempted_total", + Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", + }) + dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_completed_total", + Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", + }) + + // TSDBs reload logic, listening on hashring changes. cancel := make(chan struct{}) g.Add(func() error { defer close(dbReady) defer close(uploadC) - // Before quitting, ensure the WAL is flushed and the DB is closed. + // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { if err := dbs.Flush(); err != nil { level.Warn(logger).Log("err", err, "msg", "failed to flush storage") } + if err := dbs.Close(); err != nil { + level.Warn(logger).Log("err", err, "msg", "failed to close multi db") + } }() for { select { case <-cancel: return nil - case _, ok := <-updateDB: + case _, ok := <-hashringChangedChan: if !ok { return nil } - - level.Info(logger).Log("msg", "updating DB") + dbUpdatesStarted.Inc() + level.Info(logger).Log("msg", "updating Multi DB") if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") @@ -330,6 +342,7 @@ func runReceive( } statusProber.Ready() level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests") + dbUpdatesCompleted.Inc() dbReady <- struct{}{} } } @@ -373,7 +386,7 @@ func runReceive( cancel := make(chan struct{}) g.Add(func() error { - defer close(updateDB) + defer close(hashringChangedChan) for { select { case h, ok := <-updates: @@ -384,7 +397,7 @@ func runReceive( msg := "hashring has changed; server is not ready to receive web requests." statusProber.NotReady(errors.New(msg)) level.Info(logger).Log("msg", msg) - updateDB <- struct{}{} + hashringChangedChan <- struct{}{} case <-cancel: return nil } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 731c821b408..7cefb844ff1 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -183,8 +183,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) } func TestReceiveQuorum(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second) - + defer leaktest.CheckTimeout(t, 10*time.Second)() appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } @@ -521,8 +520,7 @@ func TestReceiveQuorum(t *testing.T) { } func TestReceiveWithConsistencyDelay(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second) - + defer leaktest.CheckTimeout(t, 10*time.Second)() appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 596f6953cc2..57ce6cb7b43 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -70,7 +70,6 @@ func NewMultiTSDB( type tenant struct { readyS *ReadyStorage - tsdb *tsdb.DB storeTSDB *store.TSDBStore ship *shipper.Shipper @@ -100,16 +99,9 @@ func (t *tenant) shipper() *shipper.Shipper { return t.ship } -func (t *tenant) db() *tsdb.DB { - t.mtx.RLock() - defer t.mtx.RUnlock() - return t.tsdb -} - func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) { t.readyS.Set(tenantTSDB) t.mtx.Lock() - t.tsdb = tenantTSDB t.storeTSDB = storeTSDB t.ship = ship t.mtx.Unlock() @@ -148,17 +140,17 @@ func (t *MultiTSDB) Flush() error { errmtx := &sync.Mutex{} merr := terrors.MultiError{} wg := &sync.WaitGroup{} - for _, tenant := range t.tenants { - db := tenant.db() + for id, tenant := range t.tenants { + db := tenant.readyStorage().Get() if db == nil { + level.Error(t.logger).Log("msg", "flushing TSDB failed; not ready", "tenant", id) continue } - + level.Info(t.logger).Log("msg", "flushing TSDB", "tenant", id) wg.Add(1) go func() { head := db.Head() - mint, maxt := head.MinTime(), head.MaxTime() - if err := db.CompactHead(tsdb.NewRangeHead(head, mint, maxt-1)); err != nil { + if err := db.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime()-1)); err != nil { errmtx.Lock() merr.Add(err) errmtx.Unlock() @@ -171,7 +163,28 @@ func (t *MultiTSDB) Flush() error { return merr.Err() } +func (t *MultiTSDB) Close() error { + t.mtx.Lock() + defer t.mtx.Unlock() + + merr := terrors.MultiError{} + for id, tenant := range t.tenants { + db := tenant.readyStorage().Get() + if db == nil { + level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id) + continue + } + level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id) + merr.Add(db.Close()) + } + return merr.Err() +} + func (t *MultiTSDB) Sync(ctx context.Context) error { + if t.bucket == nil { + return errors.New("bucket is not specified, Sync should not be invoked") + } + t.mtx.RLock() defer t.mtx.RUnlock() @@ -184,7 +197,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error { if s == nil { continue } - wg.Add(1) go func() { if uploaded, err := s.Sync(ctx); err != nil { @@ -195,7 +207,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error { wg.Done() }() } - wg.Wait() return merr.Err() } @@ -219,6 +230,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) dataDir := path.Join(t.dataDir, tenantID) + level.Info(logger).Log("msg", "opening TSDB") opts := *t.tsdbOpts s, err := tsdb.Open( dataDir, @@ -232,7 +244,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.mtx.Unlock() return err } - var ship *shipper.Shipper if t.bucket != nil { ship = shipper.New( @@ -245,16 +256,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.allowOutOfOrderUpload, ) } - tenant.set(store.NewTSDBStore( - logger, - reg, - s, - component.Receive, - lbls, - ), s, ship) - + tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship) + level.Info(logger).Log("msg", "TSDB is now ready") return nil } + func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) { // Fast path, as creating tenants is a very rare operation. t.mtx.RLock() diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a05244cf040..b884cc19bc9 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,7 +5,6 @@ package receive import ( "context" - "fmt" "io/ioutil" "os" "testing" @@ -25,13 +24,12 @@ import ( ) func TestMultiTSDB(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second) - + defer leaktest.CheckTimeout(t, 10*time.Second)() dir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - logger := log.NewNopLogger() + logger := log.NewLogfmtLogger(os.Stderr) t.Run("run fresh", func(t *testing.T) { m := NewMultiTSDB( dir, logger, prometheus.NewRegistry(), &tsdb.Options{ @@ -45,7 +43,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, ) - defer testutil.Ok(t, m.Flush()) + defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) testutil.Ok(t, m.Open()) @@ -112,7 +110,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, ) - defer testutil.Ok(t, m.Flush()) + defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) testutil.Ok(t, m.Open()) @@ -202,13 +200,11 @@ Outer: if !ok { break Outer } - fmt.Println(r[0].String()) testutil.Equals(t, expectedFooResp, r) case r, ok := <-respBar: if !ok { break Outer } - fmt.Println(r[0].String()) testutil.Equals(t, expectedBarResp, r) } } diff --git a/pkg/tracing/stackdriver/tracer_test.go b/pkg/tracing/stackdriver/tracer_test.go index b0d4790475e..38a78a7d614 100644 --- a/pkg/tracing/stackdriver/tracer_test.go +++ b/pkg/tracing/stackdriver/tracer_test.go @@ -22,6 +22,7 @@ import ( // it will be still enabled for all spans within this span. func TestContextTracing_ClientEnablesTracing(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() +() m := &basictracer.InMemorySpanRecorder{} r := &forceRecorder{wrapped: m}