-
Notifications
You must be signed in to change notification settings - Fork 179
wal: add write ahead log package #332
Conversation
I smell some offline kubecon discussions 🤔 Are there any more discussions how and where would this be used? |
I figured it to be in relation with this proposal |
@krasi-georgiev yes, that's the right one. No offline discussions and nothing outside of this doc really. After some initial benchmarking after integrating this, TSDB overall is looking at ~20% sample throughput decrease. So a bit worse on the benchmark scale, but the durability guarantees are actually a fair bit better with this WAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looked long and hard and didn't see any problems with the implementation.
the code is easy to follow and read 👍
if err := prev.Close(); err != nil { | ||
level.Error(w.logger).Log("msg", "close previous segment", "err", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that the actor is used only in a couple of places , is there no way to use simple mtx and get rid of the run()
all together to simplify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not, mutex would mean the writing goroutines would block. Fsync can take several seconds or much longer in bad conditions.
wal/wal.go
Outdated
return &Reader{rdr: bufio.NewReader(r)} | ||
} | ||
|
||
// Next advances the reader to the next records and returns true iff it exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iff -> if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inevitably gets pointed out each time I try to use it ;) https://en.wikipedia.org/wiki/If_and_only_if
Doesn't seem to catch on – when no one knows what it means, it's probably not worth using.
wal/wal.go
Outdated
default: | ||
return errors.Errorf("unexpected record type %d", typ) | ||
} | ||
// Only increment i for non-zero records since we use it below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
below -> above
273b107
to
fbcedb5
Compare
Added 3 commits to change that :P I know it's quite a bit of code for a single PR, but it's well-separated bottom-up along commits. I think some parts are just better to review with the full picture at hand. Old WAL code was not removed. It needs to stick around anyway for migration procedures. |
if err != nil || k >= n { | ||
continue | ||
} | ||
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one checkpoint can't be deleted (it becomes read-only?), then later checkpoints won't be deleted until that error is solved, and the failure mode will be upgraded from "can't delete this" to "I'm filling up the disk". Would it make sense to continue deleting them?
Also, I see Checkpoint() would fail. Would that cause Prometheus server to abort or something else that could cause data loss? In that case, I think this function shouldn't fail if a directory can't be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. We shouldn't abort on failure in both places. Especially since those leftover checkpoints don't cause problems in general since we just ignore them.
checkpoint.go
Outdated
// | ||
// If no errors occurs, it returns the number of the highest segment that's | ||
// part of the checkpoint. | ||
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are m and n? Can you document them?
checkpoint.go
Outdated
repl = append(repl, s) | ||
break | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete empty block?
head.go
Outdated
@@ -139,9 +141,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { | |||
}, func() float64 { | |||
return float64(h.MinTime()) | |||
}) | |||
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ | |||
Name: "prometheus_tsdb_wal_truncate_duration_seconds", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we assuming nobody out there relies on this metric for monitoring Prometheus? Would it make sense to preserve it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics are explicitly excluded from our stability guarantees – but for critical metrics the concern makes sense nonetheless, of course.
I think this is more of a debug metric than an alerting/dashboard one. But may make sense to keep it anyway – especially since it's still a generally accurate name.
func (w *WAL) fsync(f *Segment) error { | ||
start := time.Now() | ||
err := fileutil.Fsync(f.File) | ||
w.fsyncDuration.Observe(time.Since(start).Seconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, is fsync often taking 10s of seconds or longer? If closer to 1 second (or below), I'd use more granular unit, e.g. ms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Prometheus we've a few instrumentation conventions that are not baked into our model. One of them is that we always use base units, i.e. bytes, seconds, ...
The idea is that you don't have to make unit adjustments if you want to do binary ops between metrics and such and can just do a final division/multiplication on the overall result. Dashboard builders also often allow you to set a unit and they'll pick the most sensible scale – you could then just always default to the base unit and don't have to handpick it for each graph.
Since everything is a double in our world, this choice conveniently doesn't really impact data size or similar.
Time.Seconds()
returns a float64 by the way, so we aren't losing the sub-seconds in case you meant that.
wal/wal.go
Outdated
|
||
// flushPage writes the contents of the page to disk. | ||
// If clear is set or no more records will reasonbly fit into it, its | ||
// remaining size gets written as zero data and its reset for the next physical page. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the last part. Is it saying that the page will be padded with byte-0 if clear is true or the page doesn't have room for another record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If clear
is false, we may fill up the remainder of the page with zero bytes if we think there's not enough space left to fit in another (partial) record. If clear
is true, we force the zero padding no matter how much space is left. We need the latter on shutdown or when completing a segment (when no partial records may be written), since we never want to leave a segment with a size that's not a multiple of 32KB.
I will make the comment clearer.
d80e448
to
36ac113
Compare
Some initial feedback after running this in a small Prometheus server with high frequency scrapes: Memory and CPU usage seem about 3-5% lower on average, which may just be noise. Max usage shows no notable difference. Startups got about 3x slower. This is not super concerning but also not great for big setups and worth investigating. |
@brian-brazil @gouthamve can I have a review from one of you? We introduced a regression a while back regarding block order. It never made it into Prometheus though. Fixed it in here and added a test. |
This is looking good. |
During performance analysis of this change, I found a regression. A buffer that seems stack-allocated doesn't pass escape analysis and gets thrown on the heap. That resulted in big spikes. I pushed a commit to fix this. This shows memory usage before and after the change. It shows the released v2.2.1 and a version with the new WAL and in-memory metadata store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't dug too deeply into this. The new file format should be added to the docs.
checkpoint.go
Outdated
// This makes it easy to read it through the WAL package and concatenate | ||
// it with the original WAL. | ||
// | ||
// Non-critical errors are locked and not returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logged?
checkpoint.go
Outdated
return nil, errors.New("invalid record type") | ||
} | ||
if len(buf[start:]) == 0 { | ||
continue // all contents discarded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment style
checkpoint.go
Outdated
} | ||
recs = append(recs, buf[start:]) | ||
|
||
// flush records in 1 MB increments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capital letter
@@ -613,10 +616,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { | |||
if len(bm) <= 1 { | |||
return nil | |||
} | |||
sort.Slice(bm, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was mutating the input slice in a function that should do a read-only check. Thus current master actually has a regression. The sort is moved properly into the reload function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed and tested in master via #335 so no longer relevant for this PR.
@@ -23,3 +24,25 @@ func ReadDir(dirpath string) ([]string, error) { | |||
sort.Strings(names) | |||
return names, nil | |||
} | |||
|
|||
// Rename safely renames a file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has a race condition between the remove and the rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Care to elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The doc says it's "safe" however there's a race condition. In what way is the meant to be safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to elaborate on what exactly the race condition is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the time the removeall starts to the rename ends, partial or no file may be present.
head.go
Outdated
return nil | ||
} | ||
|
||
// Init backfills data from the write ahead log and prepares the head for writes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more a load than a backfill, which is confusing
} else { | ||
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) | ||
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { | ||
return errors.Wrap(err, "create checkpoint") | ||
} | ||
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have a separate metric for the checkpoint duration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was renamed to checkpoint but as @jkohen pointed out, it might break alerts and dashboards. It is essentially still part of the truncation process, so keeping the name seemed fine. The WAL.Truncate
call is just simple file removal. I don't think there's anything worth measuring in there.
wal/wal.go
Outdated
w.segment = next | ||
w.donePages = 0 | ||
|
||
// Don't block further writes by fsyinc the last segment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fsyncing
} | ||
} | ||
|
||
donec := make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a single-level channel is sufficient here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not, we have to wait until run terminated successfully. Currently it returns right when it reads from the channel. But this is quite implementation dependent and would be an easy regression target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like unnecessary complexity currently, especially as WAL is a struct rather than an interface.
wal/wal.go
Outdated
|
||
func (w *WAL) run() { | ||
for { | ||
// Processing all pending functions has precedence over shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we shutdown with pending functions? The current code seems to allow for that
@@ -221,17 +223,27 @@ func (h *Head) processWALSamples( | |||
h.metrics.chunksCreated.Inc() | |||
h.metrics.chunks.Inc() | |||
} | |||
if s.T > maxt { | |||
maxt = s.T | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about mint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually uncovered several more issues around this general area. I've adjusted things to make it work, but it touches a few more places. So I'd rather add this in a PR on top.
btw did you try to run this with |
@krasi-georgiev yea, that looks all okay. |
@brian-brazil how does this one look? I've two follow up PRs ready. |
I'm still waiting on file format docs/protocol before giving it a full review. |
The record encoding itself did not change. But so far the WAL format isn't documented at all. I've another PR in the queue that changes the record format and backfills documentation. I can backport it to this one though without those changes. |
@brian-brazil docs added |
Allow to repair the WAL based on the error returned by a reader during a full scan over all records. Signed-off-by: Fabian Reinartz <[email protected]>
Create checkpoints from a sequence of WAL segments while filtering out obsolete data. The checkpoint format is again a sequence of WAL segments, which allows us to reuse the serialization format and implementation. Signed-off-by: Fabian Reinartz <[email protected]>
Remove the old WAL and drop in the new one Signed-off-by: Fabian Reinartz <[email protected]>
We assume in multiple places that the block list held by DB has blocks sequential by time. A regression caused us to hold them ordered by ULID, i.e. by creation time instead. Signed-off-by: Fabian Reinartz <[email protected]>
The buffers we allocated were escaping to the heap, resulting in large memory usage spikes during startup and checkpointing in Prometheus. This attaches the buffer to the reader object to prevent this. Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
On startup, rewrite the old write ahead log into the new format once. Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
Rebased and resolved conflicts. |
This fixes various issues when initializing the head time range under different starting conditions. Signed-off-by: Fabian Reinartz <[email protected]>
Signed-off-by: Fabian Reinartz <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Just some minor doc nits.
checkpoint.go
Outdated
DroppedTombstones int | ||
TotalSeries int // Processed series including dropped ones. | ||
TotalSamples int // Processed samples inlcuding dropped ones. | ||
TotalTombstones int // Processed tombstones including droppes ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped
docs/format/wal.md
Outdated
└─────────────────────────────────────────────────────┘ | ||
``` | ||
|
||
[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GitHub isn't rendering this properly.
wal/wal.go
Outdated
|
||
const ( | ||
recPageTerm recType = 0 // rest of page is empty | ||
recFull recType = 1 // full record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full stops and capital letters. There's others in this file too.
Signed-off-by: Fabian Reinartz <[email protected]>
Properly initialize head time
Signed-off-by: Fabian Reinartz <[email protected]>
Migrate write ahead log
Signed-off-by: Fabian Reinartz <[email protected]>
👍 |
This adds a new WAL that's agnostic to the actual record contents.
It's much simpler and should be more resilient than the existing one.
It is safe for concurrent reads across different processes.
@brian-brazil @jkohen
Integrating this into the rest of TSDB is another can of worms. I may need to make some slight tweaks here and there, but in general it seems complete and working.
The batch mode is mostly there because I'd like to implement the checkpointing (formerly truncation) by just writing another WAL so we don't need to implement another disk format. But in that batch write scenario, firing off millions of tiny writes wouldn't be ideal.
Also we need some nicer recovery handling than in the last one – but should be simpler to do as well.