From cd3059037f07fe9aea5a9a487a113fba71cfc61b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= <lserven@gmail.com>
Date: Tue, 5 Nov 2019 20:04:08 +0100
Subject: [PATCH] cmd/thanos/receive: reduce WAL replays at startup
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Every time thanos receive is started, it has to replay the WAL three
times, namely:
1. open the TSDB;
2. close the TSDB; open the ReadOnly TSDB and Flush; and
3. open the TSDB

These WAL replays can take a very long time if the WAL has lots of data.
With the fix from #1654, the third time will be instantaneous because
the WAL will be empty. That still leaves two potentially long WAL
replays. We can cut this down to just one long replay if we do the
following operations instead:
1. with a closed TSDB, open the ReadOnly TSDB and Flush; and
2. open the TSDB

Now, the second step will be a fast replay because the WAL is empty,
leaving just one potentially expensive WAL replay.

This commit eliminates explicit opening of the writable TSDB during
startup, and instead opens it after flushing the read-only TSDB.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
---
 cmd/thanos/receive.go | 17 ++++++-----------
 pkg/receive/tsdb.go   | 25 +++++++++++++------------
 2 files changed, 19 insertions(+), 23 deletions(-)

diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go
index 1424a13506..f454b2855c 100644
--- a/cmd/thanos/receive.go
+++ b/cmd/thanos/receive.go
@@ -230,6 +230,9 @@ func runReceive(
 			defer close(dbReady)
 			defer close(uploadC)
 
+			// Before actually starting, we need to make sure the
+			// WAL is flushed. The WAL is flushed after the
+			// hashring is loaded.
 			db := receive.NewFlushableStorage(
 				dataDir,
 				log.With(logger, "component", "tsdb"),
@@ -237,23 +240,12 @@ func runReceive(
 				tsdbCfg,
 			)
 
-			// Before actually starting, we need to make sure the
-			// WAL is flushed. The WAL is flushed after the
-			// hashring ring is loaded.
-			if err := db.Open(); err != nil {
-				return errors.Wrap(err, "opening storage")
-			}
-
 			// Before quitting, ensure the WAL is flushed and the DB is closed.
 			defer func() {
 				if err := db.Flush(); err != nil {
 					level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
 					return
 				}
-				if err := db.Close(); err != nil {
-					level.Warn(logger).Log("err", err, "msg", "failed to close storage")
-					return
-				}
 			}()
 
 			for {
@@ -267,6 +259,9 @@ func runReceive(
 					if err := db.Flush(); err != nil {
 						return errors.Wrap(err, "flushing storage")
 					}
+					if err := db.Open(); err != nil {
+						return errors.Wrap(err, "opening storage")
+					}
 					if upload {
 						uploadC <- struct{}{}
 						<-uploadDone
diff --git a/pkg/receive/tsdb.go b/pkg/receive/tsdb.go
index 16fdae6e8f..da463fe874 100644
--- a/pkg/receive/tsdb.go
+++ b/pkg/receive/tsdb.go
@@ -6,6 +6,7 @@ import (
 	"sync"
 
 	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/prometheus/storage/tsdb"
@@ -69,32 +70,32 @@ func (f *FlushableStorage) open() error {
 }
 
 // Flush temporarily stops the storage and flushes the WAL to blocks.
-// Note: this operation leaves the storage in the same state it was in.
+// Note: this operation leaves the storage closed.
 func (f *FlushableStorage) Flush() error {
+	_, err := os.Stat(filepath.Join(f.path, "wal"))
+	if os.IsNotExist(err) {
+		level.Info(f.l).Log("msg", "No WAL was found for flushing; ignoring.")
+		return nil
+	}
+	if err != nil {
+		return errors.Wrap(err, "stating WAL")
+	}
 	f.mu.Lock()
 	defer f.mu.Unlock()
-	var reopen bool
 	if !f.stopped {
 		if err := f.DB.Close(); err != nil {
 			return errors.Wrap(err, "stopping storage")
 		}
 		f.stopped = true
-		reopen = true
 	}
-	ro, err := promtsdb.OpenDBReadOnly(f.Dir(), f.l)
+	ro, err := promtsdb.OpenDBReadOnly(f.path, f.l)
 	if err != nil {
 		return errors.Wrap(err, "opening read-only DB")
 	}
-	if err := ro.FlushWAL(f.Dir()); err != nil {
+	if err := ro.FlushWAL(f.path); err != nil {
 		return errors.Wrap(err, "flushing WAL")
 	}
-	if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil {
-		return errors.Wrap(err, "removing stale WAL")
-	}
-	if reopen {
-		return errors.Wrap(f.open(), "re-starting storage")
-	}
-	return nil
+	return errors.Wrap(os.RemoveAll(filepath.Join(f.path, "wal")), "removing stale WAL")
 }
 
 // Close stops the storage.