Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retention mark files. #3706

Merged
merged 6 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 81 additions & 36 deletions pkg/storage/stores/shipper/compactor/retention/marker.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package retention

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io/fs"
"io/ioutil"
Expand All @@ -21,7 +21,10 @@ import (
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

var minListMarkDelay = time.Minute
var (
minListMarkDelay = time.Minute
maxMarkPerFile = int64(100000)
)

type MarkerStorageWriter interface {
Put(chunkID []byte) error
Expand All @@ -34,42 +37,94 @@ type markerStorageWriter struct {
tx *bbolt.Tx
bucket *bbolt.Bucket

count int64
fileName string
count int64
currentFileCount int64
curFileName string
workDir string

buf []byte
}

func NewMarkerStorageWriter(workingDir string) (MarkerStorageWriter, error) {
err := chunk_util.EnsureDirectory(filepath.Join(workingDir, markersFolder))
dir := filepath.Join(workingDir, markersFolder)
err := chunk_util.EnsureDirectory(dir)
if err != nil {
return nil, err
}
fileName := filepath.Join(workingDir, markersFolder, fmt.Sprint(time.Now().UnixNano()))

msw := &markerStorageWriter{
workDir: dir,
currentFileCount: 0,
buf: make([]byte, 8),
}

return msw, msw.createFile()
}

func (m *markerStorageWriter) createFile() error {
fileName := filepath.Join(m.workDir, fmt.Sprint(time.Now().UnixNano()))
db, err := shipper_util.SafeOpenBoltdbFile(fileName)
if err != nil {
return nil, err
return err
}
tx, err := db.Begin(true)
if err != nil {
return nil, err
return err
}
bucket, err := tx.CreateBucketIfNotExists(chunkBucket)
if err != nil {
return nil, err
return err
}
level.Info(util_log.Logger).Log("msg", "mark file created", "file", fileName)
bucket.FillPercent = 1
m.db = db
m.tx = tx
m.bucket = bucket
m.curFileName = fileName
m.currentFileCount = 0
return nil
}

func (m *markerStorageWriter) closeFile() error {
err := m.tx.Commit()
if err != nil {
return err
}
if err := m.db.Close(); err != nil {
return err
}
// The marker file is empty we can remove.
if m.currentFileCount == 0 {
return os.Remove(m.curFileName)
}
return &markerStorageWriter{
db: db,
tx: tx,
bucket: bucket,
count: 0,
fileName: fileName,
}, err
return nil
}

func (m *markerStorageWriter) Put(chunkID []byte) error {
if err := m.bucket.Put(chunkID, empty); err != nil {
if m.currentFileCount > maxMarkPerFile { // roll files when max marks is reached.
if err := m.closeFile(); err != nil {
return err
}
if err := m.createFile(); err != nil {
return err
}

}
// insert in order and fillpercent = 1
id, err := m.bucket.NextSequence()
if err != nil {
return err
}
binary.BigEndian.PutUint64(m.buf, id) // insert in order using sequence id.
// boltdb requires the value to be valid for the whole tx.
// so we make a copy.
value := make([]byte, len(chunkID))
copy(value, chunkID)
if err := m.bucket.Put(m.buf, value); err != nil {
return err
}
m.count++
m.currentFileCount++
return nil
}

Expand All @@ -78,17 +133,7 @@ func (m *markerStorageWriter) Count() int64 {
}

func (m *markerStorageWriter) Close() error {
if err := m.tx.Commit(); err != nil {
return err
}
if err := m.db.Close(); err != nil {
return err
}
// The marker file is empty we can remove.
if m.count == 0 {
return os.Remove(m.fileName)
}
return nil
return m.closeFile()
}

type MarkerProcessor interface {
Expand Down Expand Up @@ -181,7 +226,7 @@ func (r *markerProcessor) Start(deleteFunc func(ctx context.Context, chunkId []b
func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.Context, chunkId []byte) error) error {
var (
wg sync.WaitGroup
queue = make(chan *bytes.Buffer)
queue = make(chan *keyPair)
)
// we use a copy to view the file so that we can read and update at the same time.
viewFile, err := ioutil.TempFile("/tmp/", "marker-view-")
Expand Down Expand Up @@ -228,7 +273,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
defer wg.Done()
for key := range queue {
if err := processKey(r.ctx, key, dbUpdate, deleteFunc); err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.String(), "err", err)
level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.key.String(), "value", key.value.String(), "err", err)
}
putKeyBuffer(key)
}
Expand All @@ -241,8 +286,8 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
}

c := b.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
key, err := getKeyBuffer(k)
for k, v := c.First(); k != nil; k, v = c.Next() {
key, err := getKeyPairBuffer(k, v)
if err != nil {
return err
}
Expand All @@ -260,17 +305,17 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C
return nil
}

func processKey(ctx context.Context, key *bytes.Buffer, db *bbolt.DB, deleteFunc func(ctx context.Context, chunkId []byte) error) error {
keyData := key.Bytes()
if err := deleteFunc(ctx, keyData); err != nil {
func processKey(ctx context.Context, key *keyPair, db *bbolt.DB, deleteFunc func(ctx context.Context, chunkId []byte) error) error {
chunkID := key.value.Bytes()
if err := deleteFunc(ctx, chunkID); err != nil {
return err
}
return db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(chunkBucket)
if b == nil {
return nil
}
return b.Delete(keyData)
return b.Delete(key.key.Bytes())
})
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/marker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,21 @@ func Test_markerProcessor_availablePath(t *testing.T) {
})
}
}

func Test_MarkFileRotation(t *testing.T) {
dir := t.TempDir()
p, err := newMarkerStorageReader(dir, 150, 0, sweepMetrics)
require.NoError(t, err)
w, err := NewMarkerStorageWriter(dir)
require.NoError(t, err)
totalMarks := int64(2 * int(maxMarkPerFile))
for i := int64(0); i < totalMarks; i++ {
require.NoError(t, w.Put([]byte(fmt.Sprintf("%d", i))))
}
require.NoError(t, w.Close())
paths, _, err := p.availablePath()
require.NoError(t, err)

require.Len(t, paths, 2)
require.Equal(t, totalMarks, w.Count())
}
31 changes: 22 additions & 9 deletions pkg/storage/stores/shipper/compactor/retention/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ var (
}
keyPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 512))
return &keyPair{
key: bytes.NewBuffer(make([]byte, 0, 8)),
value: bytes.NewBuffer(make([]byte, 0, 512)),
}
},
}
)
Expand All @@ -34,16 +37,26 @@ func putComponents(ref *componentRef) {
componentPools.Put(ref)
}

func getKeyBuffer(key []byte) (*bytes.Buffer, error) {
buf := keyPool.Get().(*bytes.Buffer)
if _, err := buf.Write(key); err != nil {
putKeyBuffer(buf)
type keyPair struct {
key *bytes.Buffer
value *bytes.Buffer
}

func getKeyPairBuffer(key, value []byte) (*keyPair, error) {
keyBuf := keyPool.Get().(*keyPair)
if _, err := keyBuf.key.Write(key); err != nil {
putKeyBuffer(keyBuf)
return nil, err
}
if _, err := keyBuf.value.Write(value); err != nil {
putKeyBuffer(keyBuf)
return nil, err
}
return buf, nil
return keyBuf, nil
}

func putKeyBuffer(buf *bytes.Buffer) {
buf.Reset()
keyPool.Put(buf)
func putKeyBuffer(pair *keyPair) {
pair.key.Reset()
pair.value.Reset()
keyPool.Put(pair)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
var (
bucketName = []byte("index")
chunkBucket = []byte("chunks")
empty = []byte("-")
)

const (
Expand Down