From cd3059037f07fe9aea5a9a487a113fba71cfc61b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= <lserven@gmail.com> Date: Tue, 5 Nov 2019 20:04:08 +0100 Subject: [PATCH] cmd/thanos/receive: reduce WAL replays at startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every time thanos receive is started, it has to replay the WAL three times, namely: 1. open the TSDB; 2. close the TSDB; open the ReadOnly TSDB and Flush; and 3. open the TSDB These WAL replays can take a very long time if the WAL has lots of data. With the fix from #1654, the third time will be instantaneous because the WAL will be empty. That still leaves two potentially long WAL replays. We can cut this down to just one long replay if we do the following operations instead: 1. with a closed TSDB, open the ReadOnly TSDB and Flush; and 2. open the TSDB Now, the second step will be a fast replay because the WAL is empty, leaving just one potentially expensive WAL replay. This commit eliminates explicit opening of the writable TSDB during startup, and instead opens it after flushing the read-only TSDB. Signed-off-by: Lucas Servén Marín <lserven@gmail.com> --- cmd/thanos/receive.go | 17 ++++++----------- pkg/receive/tsdb.go | 25 +++++++++++++------------ 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 1424a13506..f454b2855c 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -230,6 +230,9 @@ func runReceive( defer close(dbReady) defer close(uploadC) + // Before actually starting, we need to make sure the + // WAL is flushed. The WAL is flushed after the + // hashring is loaded. db := receive.NewFlushableStorage( dataDir, log.With(logger, "component", "tsdb"), @@ -237,23 +240,12 @@ func runReceive( tsdbCfg, ) - // Before actually starting, we need to make sure the - // WAL is flushed. The WAL is flushed after the - // hashring ring is loaded. - if err := db.Open(); err != nil { - return errors.Wrap(err, "opening storage") - } - // Before quitting, ensure the WAL is flushed and the DB is closed. defer func() { if err := db.Flush(); err != nil { level.Warn(logger).Log("err", err, "msg", "failed to flush storage") return } - if err := db.Close(); err != nil { - level.Warn(logger).Log("err", err, "msg", "failed to close storage") - return - } }() for { @@ -267,6 +259,9 @@ func runReceive( if err := db.Flush(); err != nil { return errors.Wrap(err, "flushing storage") } + if err := db.Open(); err != nil { + return errors.Wrap(err, "opening storage") + } if upload { uploadC <- struct{}{} <-uploadDone diff --git a/pkg/receive/tsdb.go b/pkg/receive/tsdb.go index 16fdae6e8f..da463fe874 100644 --- a/pkg/receive/tsdb.go +++ b/pkg/receive/tsdb.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage/tsdb" @@ -69,32 +70,32 @@ func (f *FlushableStorage) open() error { } // Flush temporarily stops the storage and flushes the WAL to blocks. -// Note: this operation leaves the storage in the same state it was in. +// Note: this operation leaves the storage closed. func (f *FlushableStorage) Flush() error { + _, err := os.Stat(filepath.Join(f.path, "wal")) + if os.IsNotExist(err) { + level.Info(f.l).Log("msg", "No WAL was found for flushing; ignoring.") + return nil + } + if err != nil { + return errors.Wrap(err, "stating WAL") + } f.mu.Lock() defer f.mu.Unlock() - var reopen bool if !f.stopped { if err := f.DB.Close(); err != nil { return errors.Wrap(err, "stopping storage") } f.stopped = true - reopen = true } - ro, err := promtsdb.OpenDBReadOnly(f.Dir(), f.l) + ro, err := promtsdb.OpenDBReadOnly(f.path, f.l) if err != nil { return errors.Wrap(err, "opening read-only DB") } - if err := ro.FlushWAL(f.Dir()); err != nil { + if err := ro.FlushWAL(f.path); err != nil { return errors.Wrap(err, "flushing WAL") } - if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil { - return errors.Wrap(err, "removing stale WAL") - } - if reopen { - return errors.Wrap(f.open(), "re-starting storage") - } - return nil + return errors.Wrap(os.RemoveAll(filepath.Join(f.path, "wal")), "removing stale WAL") } // Close stops the storage.