Skip to content

Commit

Permalink
cmd/thanos/receive: avoid deadlock (thanos-io#1727)
Browse files Browse the repository at this point in the history
While debugging thanos-io#1721, I found that when thanos receive bails, there is
a race in a select statement, where the non-returning branch may be
chosen. This branch will deadlock if selected twice because the channel
reader has already exited. The way to prevent this is by checking if
we need to exit on every loop.

Signed-off-by: Lucas Servén Marín <[email protected]>
Signed-off-by: Aleksey Sin <[email protected]>
  • Loading branch information
squat authored and Aleksey Sin committed Nov 27, 2019
1 parent 9457802 commit 362668b
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
Expand Down Expand Up @@ -198,7 +198,7 @@ func runReceive(
TLSClientConfig: rwTLSClientConfig,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -230,21 +230,29 @@ func runReceive(
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring ring is loaded.
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
if err := db.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close storage")
return
}
}()

for {
Expand All @@ -258,17 +266,14 @@ func runReceive(
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.Ready()
statusProber.SetReady()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
dbReady <- struct{}{}
}
Expand Down Expand Up @@ -318,7 +323,7 @@ func runReceive(
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
statusProber.SetNotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
case <-cancel:
Expand All @@ -337,16 +342,7 @@ func runReceive(
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})
g.Add(srv.ListenAndServe, srv.Shutdown)

level.Debug(logger).Log("msg", "setting up grpc server")
{
Expand Down

0 comments on commit 362668b

Please sign in to comment.