From 426acbae8a8b5511948cb6089c4d2d8d59cb2ce9 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 10 May 2021 13:28:54 +0200 Subject: [PATCH 1/6] Improve retention mark files. This PR is two fold: 1) Rotate mark files when they reached 100k marks, this is to ensure we max out marks file and don't create file that may be too big. 2) Instead of inserting marks using the chunk id as the key, we insert using a natural sequence. This has two benefits: - Keys are ordered, and so insertion are faster. - Allows us to use boltdb fill percent to 100%, this means boltdb won't over allocate for inserting key in between unordered data. Why ? I realize that when inserting huge amount of marks, this operation can take up to hours since boltdb has to re-allocate pages over and over. This is mainly because chunkid arrive without specific order. Signed-off-by: Cyril Tovena --- .../shipper/compactor/retention/marker.go | 113 ++++++++++++------ .../compactor/retention/marker_test.go | 18 +++ .../shipper/compactor/retention/pool.go | 31 +++-- 3 files changed, 117 insertions(+), 45 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go index 4593e44d14207..6223ee06aec87 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -1,8 +1,8 @@ package retention import ( - "bytes" "context" + "encoding/binary" "fmt" "io/fs" "io/ioutil" @@ -21,7 +21,10 @@ import ( shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) -var minListMarkDelay = time.Minute +var ( + minListMarkDelay = time.Minute + maxMarkPerFile = int64(100000) +) type MarkerStorageWriter interface { Put(chunkID []byte) error @@ -34,42 +37,90 @@ type markerStorageWriter struct { tx *bbolt.Tx bucket *bbolt.Bucket - count int64 - fileName string + count int64 + currentFileCount int64 + curFileName string + workDir string + + buf []byte } func NewMarkerStorageWriter(workingDir string) (MarkerStorageWriter, error) { - err := chunk_util.EnsureDirectory(filepath.Join(workingDir, markersFolder)) + dir := filepath.Join(workingDir, markersFolder) + err := chunk_util.EnsureDirectory(dir) if err != nil { return nil, err } - fileName := filepath.Join(workingDir, markersFolder, fmt.Sprint(time.Now().UnixNano())) + + msw := &markerStorageWriter{ + workDir: dir, + currentFileCount: 0, + buf: make([]byte, 8), + } + + return msw, msw.createFile() +} + +func (m *markerStorageWriter) createFile() error { + fileName := filepath.Join(m.workDir, fmt.Sprint(time.Now().UnixNano())) db, err := shipper_util.SafeOpenBoltdbFile(fileName) if err != nil { - return nil, err + return err } tx, err := db.Begin(true) if err != nil { - return nil, err + return err } bucket, err := tx.CreateBucketIfNotExists(chunkBucket) if err != nil { - return nil, err + return err + } + level.Info(util_log.Logger).Log("msg", "mark file created", "file", fileName) + bucket.FillPercent = 1 + m.db = db + m.tx = tx + m.bucket = bucket + m.curFileName = fileName + m.currentFileCount = 0 + return nil +} + +func (m *markerStorageWriter) closeFile() error { + err := m.tx.Commit() + if err != nil { + return err + } + if err := m.db.Close(); err != nil { + return err + } + // The marker file is empty we can remove. + if m.currentFileCount == 0 { + return os.Remove(m.curFileName) } - return &markerStorageWriter{ - db: db, - tx: tx, - bucket: bucket, - count: 0, - fileName: fileName, - }, err + return nil } func (m *markerStorageWriter) Put(chunkID []byte) error { - if err := m.bucket.Put(chunkID, empty); err != nil { + if m.currentFileCount > maxMarkPerFile { // roll files when max marks is reached. + if err := m.closeFile(); err != nil { + return err + } + if err := m.createFile(); err != nil { + return err + } + + } + // insert in order and fillpercent = 1 + id, err := m.bucket.NextSequence() + if err != nil { + return err + } + binary.BigEndian.PutUint64(m.buf, id) // insert in order using sequence id. + if err := m.bucket.Put(m.buf, chunkID); err != nil { return err } m.count++ + m.currentFileCount++ return nil } @@ -78,17 +129,7 @@ func (m *markerStorageWriter) Count() int64 { } func (m *markerStorageWriter) Close() error { - if err := m.tx.Commit(); err != nil { - return err - } - if err := m.db.Close(); err != nil { - return err - } - // The marker file is empty we can remove. - if m.count == 0 { - return os.Remove(m.fileName) - } - return nil + return m.closeFile() } type MarkerProcessor interface { @@ -181,7 +222,7 @@ func (r *markerProcessor) Start(deleteFunc func(ctx context.Context, chunkId []b func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.Context, chunkId []byte) error) error { var ( wg sync.WaitGroup - queue = make(chan *bytes.Buffer) + queue = make(chan *keyPair) ) // we use a copy to view the file so that we can read and update at the same time. viewFile, err := ioutil.TempFile("/tmp/", "marker-view-") @@ -228,7 +269,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C defer wg.Done() for key := range queue { if err := processKey(r.ctx, key, dbUpdate, deleteFunc); err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.String(), "err", err) + level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.value.String(), "value", key.value.String(), "err", err) } putKeyBuffer(key) } @@ -241,8 +282,8 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C } c := b.Cursor() - for k, _ := c.First(); k != nil; k, _ = c.Next() { - key, err := getKeyBuffer(k) + for k, v := c.First(); k != nil; k, v = c.Next() { + key, err := getKeyPairBuffer(k, v) if err != nil { return err } @@ -260,9 +301,9 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C return nil } -func processKey(ctx context.Context, key *bytes.Buffer, db *bbolt.DB, deleteFunc func(ctx context.Context, chunkId []byte) error) error { - keyData := key.Bytes() - if err := deleteFunc(ctx, keyData); err != nil { +func processKey(ctx context.Context, key *keyPair, db *bbolt.DB, deleteFunc func(ctx context.Context, chunkId []byte) error) error { + chunkID := key.value.Bytes() + if err := deleteFunc(ctx, chunkID); err != nil { return err } return db.Batch(func(tx *bbolt.Tx) error { @@ -270,7 +311,7 @@ func processKey(ctx context.Context, key *bytes.Buffer, db *bbolt.DB, deleteFunc if b == nil { return nil } - return b.Delete(keyData) + return b.Delete(key.key.Bytes()) }) } diff --git a/pkg/storage/stores/shipper/compactor/retention/marker_test.go b/pkg/storage/stores/shipper/compactor/retention/marker_test.go index 8cfeacdfd36d8..1b4f3483b4654 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker_test.go @@ -160,3 +160,21 @@ func Test_markerProcessor_availablePath(t *testing.T) { }) } } + +func Test_MarkFileRotation(t *testing.T) { + dir := t.TempDir() + p, err := newMarkerStorageReader(dir, 150, 0, sweepMetrics) + require.NoError(t, err) + w, err := NewMarkerStorageWriter(dir) + require.NoError(t, err) + totalMarks := int64(2 * int(maxMarkPerFile)) + for i := int64(0); i < totalMarks; i++ { + require.NoError(t, w.Put([]byte(fmt.Sprintf("%d", i)))) + } + require.NoError(t, w.Close()) + paths, _, err := p.availablePath() + require.NoError(t, err) + + require.Len(t, paths, 2) + require.Equal(t, totalMarks, w.Count()) +} diff --git a/pkg/storage/stores/shipper/compactor/retention/pool.go b/pkg/storage/stores/shipper/compactor/retention/pool.go index 0dd00b5e43354..a5b25ede1219e 100644 --- a/pkg/storage/stores/shipper/compactor/retention/pool.go +++ b/pkg/storage/stores/shipper/compactor/retention/pool.go @@ -19,7 +19,10 @@ var ( } keyPool = sync.Pool{ New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 512)) + return &keyPair{ + key: bytes.NewBuffer(make([]byte, 0, 512)), + value: bytes.NewBuffer(make([]byte, 0, 512)), + } }, } ) @@ -34,16 +37,26 @@ func putComponents(ref *componentRef) { componentPools.Put(ref) } -func getKeyBuffer(key []byte) (*bytes.Buffer, error) { - buf := keyPool.Get().(*bytes.Buffer) - if _, err := buf.Write(key); err != nil { - putKeyBuffer(buf) +type keyPair struct { + key *bytes.Buffer + value *bytes.Buffer +} + +func getKeyPairBuffer(key, value []byte) (*keyPair, error) { + keyBuf := keyPool.Get().(*keyPair) + if _, err := keyBuf.key.Write(key); err != nil { + putKeyBuffer(keyBuf) + return nil, err + } + if _, err := keyBuf.value.Write(value); err != nil { + putKeyBuffer(keyBuf) return nil, err } - return buf, nil + return keyBuf, nil } -func putKeyBuffer(buf *bytes.Buffer) { - buf.Reset() - keyPool.Put(buf) +func putKeyBuffer(pair *keyPair) { + pair.key.Reset() + pair.value.Reset() + keyPool.Put(pair) } From c3bce43cbc42408ac29bb5e87ec4829664c409ad Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 10 May 2021 13:39:25 +0200 Subject: [PATCH 2/6] Improve key buf sizing. Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/retention/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/pool.go b/pkg/storage/stores/shipper/compactor/retention/pool.go index a5b25ede1219e..d9bdd52565180 100644 --- a/pkg/storage/stores/shipper/compactor/retention/pool.go +++ b/pkg/storage/stores/shipper/compactor/retention/pool.go @@ -20,7 +20,7 @@ var ( keyPool = sync.Pool{ New: func() interface{} { return &keyPair{ - key: bytes.NewBuffer(make([]byte, 0, 512)), + key: bytes.NewBuffer(make([]byte, 0, 8)), value: bytes.NewBuffer(make([]byte, 0, 512)), } }, From 46d713d80e521e917bdda61bc33786bb59e7c482 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 10 May 2021 17:15:56 +0200 Subject: [PATCH 3/6] lint Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/retention/retention.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 0584cc44558b0..3c739878211b2 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -21,7 +21,6 @@ import ( var ( bucketName = []byte("index") chunkBucket = []byte("chunks") - empty = []byte("-") ) const ( From fe6e422b7b976696ccba61dff3c0c570e62273c6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 17 May 2021 06:30:13 -0400 Subject: [PATCH 4/6] Update pkg/storage/stores/shipper/compactor/retention/marker.go Co-authored-by: Sandeep Sukhani --- pkg/storage/stores/shipper/compactor/retention/marker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go index 6223ee06aec87..5e84bc649ff39 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -269,7 +269,7 @@ func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.C defer wg.Done() for key := range queue { if err := processKey(r.ctx, key, dbUpdate, deleteFunc); err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.value.String(), "value", key.value.String(), "err", err) + level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.key.String(), "value", key.value.String(), "err", err) } putKeyBuffer(key) } From 20368f9cd998c49128c4e74ebfb555a9639ae478 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 18 May 2021 09:44:41 +0200 Subject: [PATCH 5/6] Copy the value so that it still available for the whole boltdb tx. Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/retention/marker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go index 5e84bc649ff39..663375bde487e 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -116,7 +116,11 @@ func (m *markerStorageWriter) Put(chunkID []byte) error { return err } binary.BigEndian.PutUint64(m.buf, id) // insert in order using sequence id. - if err := m.bucket.Put(m.buf, chunkID); err != nil { + // boltdb requires a the value to be valid for the whole tx. + // so we make a copy. + value := make([]byte, len(chunkID)) + copy(value, chunkID) + if err := m.bucket.Put(m.buf, value); err != nil { return err } m.count++ From 0ab3e97cff0ff5fcfa8d759b65502ed6db77f94c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 18 May 2021 09:47:17 +0200 Subject: [PATCH 6/6] typo Signed-off-by: Cyril Tovena --- pkg/storage/stores/shipper/compactor/retention/marker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go index 663375bde487e..421d4c84cce4f 100644 --- a/pkg/storage/stores/shipper/compactor/retention/marker.go +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -116,7 +116,7 @@ func (m *markerStorageWriter) Put(chunkID []byte) error { return err } binary.BigEndian.PutUint64(m.buf, id) // insert in order using sequence id. - // boltdb requires a the value to be valid for the whole tx. + // boltdb requires the value to be valid for the whole tx. // so we make a copy. value := make([]byte, len(chunkID)) copy(value, chunkID)