Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

wal: add write ahead log package #332

Merged
merged 19 commits into from
Aug 7, 2018
Merged
279 changes: 279 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
// Copyright 2018 The Prometheus Authors

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)

// CheckpointStats returns stats about a created checkpoint.
type CheckpointStats struct {
DroppedSeries int
DroppedSamples int
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
}

// LastCheckpoint returns the directory name of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
// Traverse list backwards since there may be multiple checkpoints left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]

if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return fi.Name(), k, nil
}
return "", 0, ErrNotFound
}

// DeleteCheckpoints deletes all checkpoints in dir that have an index
// below n.
func DeleteCheckpoints(dir string, n int) error {
var errs MultiError

files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one checkpoint can't be deleted (it becomes read-only?), then later checkpoints won't be deleted until that error is solved, and the failure mode will be upgraded from "can't delete this" to "I'm filling up the disk". Would it make sense to continue deleting them?

Also, I see Checkpoint() would fail. Would that cause Prometheus server to abort or something else that could cause data loss? In that case, I think this function shouldn't fail if a directory can't be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We shouldn't abort on failure in both places. Especially since those leftover checkpoints don't cause problems in general since we just ignore them.

errs.Add(err)
}
}
return errs.Err()
}

const checkpointPrefix = "checkpoint."

// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{}

var sr io.Reader
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a new block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not pollute the entire function with temporary variables we create here that we just need for a few lines.

lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
if err == nil {
if m > k+1 {
return nil, errors.New("unexpected gap to last checkpoint")
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1

last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn))
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
sr = last
}

segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
defer segsr.Close()

if sr != nil {
sr = io.MultiReader(sr, segsr)
} else {
sr = segsr
}
}

cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n))
cpdirtmp := cpdir + ".tmp"

if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}

r := wal.NewReader(sr)

var (
series []RefSeries
samples []RefSample
tstones []Stone
dec RecordDecoder
enc RecordEncoder
buf []byte
recs [][]byte
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]

// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
// Remember where the record for this iteration starts.
start := len(buf)
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
}
// Drop irrelevant series in place.
repl := series[:0]
for _, s := range series {
if keep(s.Ref) {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Series(repl, buf)
}
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)

case RecordSamples:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
}
// Drop irrelevant samples in place.
repl := samples[:0]
for _, s := range samples {
if s.T >= mint {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for keep(s.Ref) too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would help for data size. I wonder about the penalty though of calling a function for each sample – especially since we don't necessarily know how expensive that function is.

repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Samples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)

case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
for _, s := range tstones {
for _, iv := range s.intervals {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for keep(s.ref) too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. I'd err on the side of having a few MB of temporary overhead over calling something with unknown cost a billion times.

if iv.Maxt >= mint {
repl = append(repl, s)
break
}
}
}
if len(repl) > 0 {
buf = enc.Tombstones(repl, buf)
}
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)

default:
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])

// Flush records in 1 MB increments.
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
}

// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil
}
Loading