Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

wal: add write ahead log package #332

Merged
merged 19 commits into from
Aug 7, 2018
Merged

wal: add write ahead log package #332

merged 19 commits into from
Aug 7, 2018

Conversation

fabxc
Copy link
Contributor

@fabxc fabxc commented May 16, 2018

This adds a new WAL that's agnostic to the actual record contents.
It's much simpler and should be more resilient than the existing one.

It is safe for concurrent reads across different processes.

@brian-brazil @jkohen

Integrating this into the rest of TSDB is another can of worms. I may need to make some slight tweaks here and there, but in general it seems complete and working.

The batch mode is mostly there because I'd like to implement the checkpointing (formerly truncation) by just writing another WAL so we don't need to implement another disk format. But in that batch write scenario, firing off millions of tiny writes wouldn't be ideal.

Also we need some nicer recovery handling than in the last one – but should be simpler to do as well.

@krasi-georgiev
Copy link
Contributor

I smell some offline kubecon discussions 🤔
I am just guessing, but probably this is in relation to the WAL replication mentioned in @gouthamve's talk.

Are there any more discussions how and where would this be used?

@krasi-georgiev
Copy link
Contributor

I figured it to be in relation with this proposal
https://docs.google.com/document/d/1TEqqE_Stq04drhjSU1I7Ctmuy0dpsvlPL1AKxqEQoSg

@fabxc
Copy link
Contributor Author

fabxc commented May 16, 2018

@krasi-georgiev yes, that's the right one. No offline discussions and nothing outside of this doc really.


After some initial benchmarking after integrating this, TSDB overall is looking at ~20% sample throughput decrease.
Which essentially still means many millions per second and it would be noise at most in Prometheus's full resource profile.

So a bit worse on the benchmark scale, but the durability guarantees are actually a fair bit better with this WAL.

Copy link
Contributor

@krasi-georgiev krasi-georgiev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked long and hard and didn't see any problems with the implementation.

the code is easy to follow and read 👍

if err := prev.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that the actor is used only in a couple of places , is there no way to use simple mtx and get rid of the run() all together to simplify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not, mutex would mean the writing goroutines would block. Fsync can take several seconds or much longer in bad conditions.

wal/wal.go Outdated
return &Reader{rdr: bufio.NewReader(r)}
}

// Next advances the reader to the next records and returns true iff it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iff -> if

Copy link
Contributor Author

@fabxc fabxc May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inevitably gets pointed out each time I try to use it ;) https://en.wikipedia.org/wiki/If_and_only_if
Doesn't seem to catch on – when no one knows what it means, it's probably not worth using.

wal/wal.go Outdated
default:
return errors.Errorf("unexpected record type %d", typ)
}
// Only increment i for non-zero records since we use it below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below -> above

@fabxc fabxc force-pushed the newwal branch 2 times, most recently from 273b107 to fbcedb5 Compare May 17, 2018 13:14
@fabxc
Copy link
Contributor Author

fabxc commented May 17, 2018

the code is easy to follow and read 👍

Added 3 commits to change that :P

I know it's quite a bit of code for a single PR, but it's well-separated bottom-up along commits. I think some parts are just better to review with the full picture at hand.

Old WAL code was not removed. It needs to stick around anyway for migration procedures.

if err != nil || k >= n {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one checkpoint can't be deleted (it becomes read-only?), then later checkpoints won't be deleted until that error is solved, and the failure mode will be upgraded from "can't delete this" to "I'm filling up the disk". Would it make sense to continue deleting them?

Also, I see Checkpoint() would fail. Would that cause Prometheus server to abort or something else that could cause data loss? In that case, I think this function shouldn't fail if a directory can't be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We shouldn't abort on failure in both places. Especially since those leftover checkpoints don't cause problems in general since we just ignore them.

checkpoint.go Outdated
//
// If no errors occurs, it returns the number of the highest segment that's
// part of the checkpoint.
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are m and n? Can you document them?

checkpoint.go Outdated
repl = append(repl, s)
break
} else {

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete empty block?

head.go Outdated
@@ -139,9 +141,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}, func() float64 {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming nobody out there relies on this metric for monitoring Prometheus? Would it make sense to preserve it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics are explicitly excluded from our stability guarantees – but for critical metrics the concern makes sense nonetheless, of course.
I think this is more of a debug metric than an alerting/dashboard one. But may make sense to keep it anyway – especially since it's still a generally accurate name.

func (w *WAL) fsync(f *Segment) error {
start := time.Now()
err := fileutil.Fsync(f.File)
w.fsyncDuration.Observe(time.Since(start).Seconds())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, is fsync often taking 10s of seconds or longer? If closer to 1 second (or below), I'd use more granular unit, e.g. ms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Prometheus we've a few instrumentation conventions that are not baked into our model. One of them is that we always use base units, i.e. bytes, seconds, ...
The idea is that you don't have to make unit adjustments if you want to do binary ops between metrics and such and can just do a final division/multiplication on the overall result. Dashboard builders also often allow you to set a unit and they'll pick the most sensible scale – you could then just always default to the base unit and don't have to handpick it for each graph.

Since everything is a double in our world, this choice conveniently doesn't really impact data size or similar.

Time.Seconds() returns a float64 by the way, so we aren't losing the sub-seconds in case you meant that.

wal/wal.go Outdated

// flushPage writes the contents of the page to disk.
// If clear is set or no more records will reasonbly fit into it, its
// remaining size gets written as zero data and its reset for the next physical page.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the last part. Is it saying that the page will be padded with byte-0 if clear is true or the page doesn't have room for another record?

Copy link
Contributor Author

@fabxc fabxc May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If clear is false, we may fill up the remainder of the page with zero bytes if we think there's not enough space left to fit in another (partial) record. If clear is true, we force the zero padding no matter how much space is left. We need the latter on shutdown or when completing a segment (when no partial records may be written), since we never want to leave a segment with a size that's not a multiple of 32KB.

I will make the comment clearer.

@fabxc fabxc force-pushed the newwal branch 3 times, most recently from d80e448 to 36ac113 Compare May 18, 2018 06:45
@fabxc
Copy link
Contributor Author

fabxc commented May 18, 2018

Some initial feedback after running this in a small Prometheus server with high frequency scrapes:

Memory and CPU usage seem about 3-5% lower on average, which may just be noise. Max usage shows no notable difference.
The page flush to page completion ratio when scraping ~450 series is about 10x. So each 32KB WAL page gets written 10 times. Assuming a 4KB physical page size, we are looking at ~1.25 write amplification – so nothing to worry about there even if it had high variance.

Startups got about 3x slower. This is not super concerning but also not great for big setups and worth investigating.

@fabxc
Copy link
Contributor Author

fabxc commented May 22, 2018

@brian-brazil @gouthamve can I have a review from one of you?


We introduced a regression a while back regarding block order. It never made it into Prometheus though. Fixed it in here and added a test.

@jkohen
Copy link

jkohen commented May 22, 2018

This is looking good.

@fabxc
Copy link
Contributor Author

fabxc commented May 24, 2018

During performance analysis of this change, I found a regression. A buffer that seems stack-allocated doesn't pass escape analysis and gets thrown on the heap. That resulted in big spikes. I pushed a commit to fix this.

This shows memory usage before and after the change. It shows the released v2.2.1 and a version with the new WAL and in-memory metadata store.

screen shot 2018-05-24 at 3 50 53 pm

Copy link
Contributor

@brian-brazil brian-brazil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't dug too deeply into this. The new file format should be added to the docs.

checkpoint.go Outdated
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are locked and not returned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logged?

checkpoint.go Outdated
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // all contents discarded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment style

checkpoint.go Outdated
}
recs = append(recs, buf[start:])

// flush records in 1 MB increments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capital letter

@@ -613,10 +616,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you removing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was mutating the input slice in a function that should do a read-only check. Thus current master actually has a regression. The sort is moved properly into the reload function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed and tested in master via #335 so no longer relevant for this PR.

@@ -23,3 +24,25 @@ func ReadDir(dirpath string) ([]string, error) {
sort.Strings(names)
return names, nil
}

// Rename safely renames a file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a race condition between the remove and the rename

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Care to elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc says it's "safe" however there's a race condition. In what way is the meant to be safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to elaborate on what exactly the race condition is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the time the removeall starts to the rename ends, partial or no file may be present.

head.go Outdated
return nil
}

// Init backfills data from the write ahead log and prepares the head for writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more a load than a backfill, which is confusing

} else {
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil {
return errors.Wrap(err, "create checkpoint")
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a separate metric for the checkpoint duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was renamed to checkpoint but as @jkohen pointed out, it might break alerts and dashboards. It is essentially still part of the truncation process, so keeping the name seemed fine. The WAL.Truncate call is just simple file removal. I don't think there's anything worth measuring in there.

wal/wal.go Outdated
w.segment = next
w.donePages = 0

// Don't block further writes by fsyinc the last segment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsyncing

}
}

donec := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a single-level channel is sufficient here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not, we have to wait until run terminated successfully. Currently it returns right when it reads from the channel. But this is quite implementation dependent and would be an easy regression target.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like unnecessary complexity currently, especially as WAL is a struct rather than an interface.

wal/wal.go Outdated

func (w *WAL) run() {
for {
// Processing all pending functions has precedence over shutdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we shutdown with pending functions? The current code seems to allow for that

@@ -221,17 +223,27 @@ func (h *Head) processWALSamples(
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about mint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually uncovered several more issues around this general area. I've adjusted things to make it work, but it touches a few more places. So I'd rather add this in a PR on top.

@krasi-georgiev
Copy link
Contributor

btw did you try to run this with --race enabled ?

@fabxc
Copy link
Contributor Author

fabxc commented May 28, 2018

@krasi-georgiev yea, that looks all okay.

@fabxc
Copy link
Contributor Author

fabxc commented May 29, 2018

@brian-brazil how does this one look? I've two follow up PRs ready.

@brian-brazil
Copy link
Contributor

I'm still waiting on file format docs/protocol before giving it a full review.

@fabxc
Copy link
Contributor Author

fabxc commented May 30, 2018

The record encoding itself did not change. But so far the WAL format isn't documented at all. I've another PR in the queue that changes the record format and backfills documentation.

I can backport it to this one though without those changes.

@fabxc
Copy link
Contributor Author

fabxc commented May 30, 2018

@brian-brazil docs added

Fabian Reinartz added 8 commits July 19, 2018 07:24
Allow to repair the WAL based on the error returned by a reader
during a full scan over all records.

Signed-off-by: Fabian Reinartz <[email protected]>
Create checkpoints from a sequence of WAL segments while filtering
out obsolete data. The checkpoint format is again a sequence of WAL
segments, which allows us to reuse the serialization format and
implementation.

Signed-off-by: Fabian Reinartz <[email protected]>
Remove the old WAL and drop in the new one

Signed-off-by: Fabian Reinartz <[email protected]>
We assume in multiple places that the block list held by DB
has blocks sequential by time.
A regression caused us to hold them ordered by ULID, i.e. by creation
time instead.

Signed-off-by: Fabian Reinartz <[email protected]>
The buffers we allocated were escaping to the heap, resulting in large
memory usage spikes during startup and checkpointing in Prometheus.
This attaches the buffer to the reader object to prevent this.

Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Fabian Reinartz added 3 commits July 19, 2018 07:34
On startup, rewrite the old write ahead log into the new format once.

Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
@fabxc
Copy link
Contributor Author

fabxc commented Jul 19, 2018

Rebased and resolved conflicts.

Fabian Reinartz added 2 commits July 19, 2018 07:41
This fixes various issues when initializing the head time range
under different starting conditions.

Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Copy link
Contributor

@brian-brazil brian-brazil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Just some minor doc nits.

checkpoint.go Outdated
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int // Processed tombstones including droppes ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped

└─────────────────────────────────────────────────────┘
```

[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GitHub isn't rendering this properly.

wal/wal.go Outdated

const (
recPageTerm recType = 0 // rest of page is empty
recFull recType = 1 // full record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full stops and capital letters. There's others in this file too.

Signed-off-by: Fabian Reinartz <[email protected]>
fabxc and others added 4 commits August 2, 2018 16:54
Properly initialize head time
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
@brian-brazil
Copy link
Contributor

👍

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants