Skip to content

Commit

Permalink
lib/storage: remove prioritizing of merging small parts over merging …
Browse files Browse the repository at this point in the history
…big parts, since it doesn't work as expected

The prioritizing could lead to big merge starvation, which could end up in too big number of parts that must be merged into big parts.

Multiple big merges may be initiated after the migration from v1.39.0 or v1.39.1. It is OK - these merges should be finished soon,
which should return CPU and disk IO usage to normal levels.

Updates VictoriaMetrics#648
Updates VictoriaMetrics#618
  • Loading branch information
valyala committed Jul 30, 2020
1 parent 922d9aa commit e795909
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 77 deletions.
3 changes: 0 additions & 3 deletions app/vmstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_big_merges_delays_total`, func() float64 {
return float64(tm().BigMergesDelays)
})

metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
Expand Down
15 changes: 3 additions & 12 deletions lib/mergeset/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync/atomic"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
)

// PrepareBlockCallback can transform the passed items allocated at the given data.
Expand All @@ -29,9 +28,9 @@ type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
//
// It also atomically adds the number of items merged to itemsMerged.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},
pl *pacelimiter.PaceLimiter, itemsMerged *uint64) error {
itemsMerged *uint64) error {
bsm := bsmPool.Get().(*blockStreamMerger)
if err := bsm.Init(bsrs, prepareBlock, pl); err != nil {
if err := bsm.Init(bsrs, prepareBlock); err != nil {
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
}
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
Expand Down Expand Up @@ -63,9 +62,6 @@ type blockStreamMerger struct {

phFirstItemCaught bool

// optional pace limiter for merge process.
pl *pacelimiter.PaceLimiter

// This are auxiliary buffers used in flushIB
// for consistency checks after prepareBlock call.
firstItem []byte
Expand All @@ -82,13 +78,11 @@ func (bsm *blockStreamMerger) reset() {
bsm.ib.Reset()

bsm.phFirstItemCaught = false
bsm.pl = nil
}

func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, pl *pacelimiter.PaceLimiter) error {
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error {
bsm.reset()
bsm.prepareBlock = prepareBlock
bsm.pl = pl
for _, bsr := range bsrs {
if bsr.Next() {
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
Expand All @@ -111,9 +105,6 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped")

func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {
again:
if bsm.pl != nil {
bsm.pl.WaitIfNeeded()
}
if len(bsm.bsrHeap) == 0 {
// Write the last (maybe incomplete) inmemoryBlock to bsw.
bsm.flushIB(bsw, ph, itemsMerged)
Expand Down
12 changes: 5 additions & 7 deletions lib/mergeset/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"sort"
"testing"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
)

func TestMergeBlockStreams(t *testing.T) {
Expand All @@ -32,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) {
var dstIP1 inmemoryPart
var bsw1 blockStreamWriter
bsw1.InitFromInmemoryPart(&dstIP1)
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, nil, &itemsMerged); err != nil {
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge first level part 1: %s", err)
}

var dstIP2 inmemoryPart
var bsw2 blockStreamWriter
bsw2.InitFromInmemoryPart(&dstIP2)
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge first level part 2: %s", err)
}

Expand All @@ -56,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) {
newTestBlockStreamReader(&dstIP2),
}
bsw.InitFromInmemoryPart(&dstIP)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge second level: %s", err)
}
if itemsMerged != uint64(len(items)) {
Expand All @@ -78,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{})
var itemsMerged uint64
close(ch)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, nil, &itemsMerged); err != errForciblyStopped {
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped {
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
}
if itemsMerged != 0 {
Expand Down Expand Up @@ -122,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
var dstIP inmemoryPart
var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&dstIP)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil {
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
return fmt.Errorf("cannot merge block streams: %w", err)
}
if itemsMerged != uint64(len(items)) {
Expand Down
2 changes: 1 addition & 1 deletion lib/mergeset/part_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
var ip inmemoryPart
var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&ip)
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, nil, &itemsMerged); err != nil {
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
}
if itemsMerged != uint64(len(items)) {
Expand Down
12 changes: 3 additions & 9 deletions lib/mergeset/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
// Merge parts.
// The merge shouldn't be interrupted by stopCh,
// since it may be final after stopCh is closed.
//
// Prioritize merging of inmemory blocks over merging file parts.
storagepacelimiter.BigMerges.Inc()
err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, nil, &tb.itemsMerged)
storagepacelimiter.BigMerges.Dec()
err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)
if err != nil {
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
}
Expand Down Expand Up @@ -801,7 +797,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP

// Merge parts into a temporary location.
var ph partHeader
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, storagepacelimiter.BigMerges, &tb.itemsMerged)
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged)
putBlockStreamWriter(bsw)
if err != nil {
if err == errForciblyStopped {
Expand Down Expand Up @@ -949,9 +945,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 {
return freeSpace / uint64(mergeWorkersCount) / 4
}

var mergeWorkersCount = func() int {
return runtime.GOMAXPROCS(-1)
}()
var mergeWorkersCount = runtime.GOMAXPROCS(-1)

func openParts(path string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed.
Expand Down
38 changes: 38 additions & 0 deletions lib/pacelimiter/pacelimiter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pacelimiter

import (
"fmt"
"runtime"
"sync"
"testing"
Expand Down Expand Up @@ -73,6 +74,43 @@ func TestPacelimiter(t *testing.T) {
}
// Verify that the pl is unblocked now.
pl.WaitIfNeeded()

// Verify that negative count doesn't block pl.
pl.Dec()
pl.WaitIfNeeded()
if n := pl.DelaysTotal(); n == 0 {
t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()")
}
})
t.Run("negative_count", func(t *testing.T) {
n := 10
pl := New()
for i := 0; i < n; i++ {
pl.Dec()
}

doneCh := make(chan error)
go func() {
defer close(doneCh)
for i := 0; i < n; i++ {
pl.Inc()
pl.WaitIfNeeded()
if n := pl.DelaysTotal(); n != 0 {
doneCh <- fmt.Errorf("expecting zero number of delays")
return
}
}
doneCh <- nil
}()

select {
case err := <-doneCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
})
t.Run("concurrent_inc_dec", func(t *testing.T) {
pl := New()
Expand Down
14 changes: 1 addition & 13 deletions lib/storage/block_stream_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"container/heap"
"fmt"
"io"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
)

// blockStreamMerger is used for merging block streams.
Expand All @@ -18,9 +16,6 @@ type blockStreamMerger struct {
// Whether the call to NextBlock must be no-op.
nextBlockNoop bool

// Optional pace limiter for limiting the pace for NextBlock calls.
pl *pacelimiter.PaceLimiter

// The last error
err error
}
Expand All @@ -32,14 +27,11 @@ func (bsm *blockStreamMerger) reset() {
}
bsm.bsrHeap = bsm.bsrHeap[:0]
bsm.nextBlockNoop = false
bsm.pl = nil
bsm.err = nil
}

// Init initializes bsm with the given bsrs.
//
// pl is an optional pace limiter, which allows limiting the pace for NextBlock calls.
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.PaceLimiter) {
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) {
bsm.reset()
for _, bsr := range bsrs {
if bsr.NextBlock() {
Expand All @@ -60,7 +52,6 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.Pa
heap.Init(&bsm.bsrHeap)
bsm.Block = &bsm.bsrHeap[0].Block
bsm.nextBlockNoop = true
bsm.pl = pl
}

// NextBlock stores the next block in bsm.Block.
Expand All @@ -75,9 +66,6 @@ func (bsm *blockStreamMerger) NextBlock() bool {
bsm.nextBlockNoop = false
return true
}
if bsm.pl != nil {
bsm.pl.WaitIfNeeded()
}

bsm.err = bsm.nextBlock()
switch bsm.err {
Expand Down
5 changes: 2 additions & 3 deletions lib/storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)

Expand All @@ -16,11 +15,11 @@ import (
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
pl *pacelimiter.PaceLimiter, dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
ph.Reset()

bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs, pl)
bsm.Init(bsrs)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
bsm.reset()
bsmPool.Put(bsm)
Expand Down
6 changes: 2 additions & 4 deletions lib/storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package storage
import (
"math/rand"
"testing"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
)

func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
Expand Down Expand Up @@ -366,7 +364,7 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{})
var rowsMerged, rowsDeleted uint64
close(ch)
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
}
if rowsMerged != 0 {
Expand All @@ -386,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
bsw.InitFromInmemoryPart(&mp)

var rowsMerged, rowsDeleted uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, storagepacelimiter.BigMerges, nil, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
}

Expand Down
2 changes: 1 addition & 1 deletion lib/storage/merge_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
}
mpOut.Reset()
bsw.InitFromInmemoryPart(&mpOut)
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
panic(fmt.Errorf("cannot merge block streams: %w", err))
}
}
Expand Down
24 changes: 5 additions & 19 deletions lib/storage/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ type partitionMetrics struct {
SmallPartsRefCount uint64

SmallAssistedMerges uint64
BigMergesDelays uint64
}

// UpdateMetrics updates m with metrics from pt.
Expand Down Expand Up @@ -388,8 +387,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)

m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)

m.BigMergesDelays = storagepacelimiter.BigMerges.DelaysTotal()
}

// AddRows adds the given rows to the partition pt.
Expand Down Expand Up @@ -817,13 +814,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
return nil
}

var mergeWorkersCount = func() int {
n := runtime.GOMAXPROCS(-1) / 2
if n <= 0 {
n = 1
}
return n
}()
var mergeWorkersCount = runtime.GOMAXPROCS(-1)

var (
bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
Expand Down Expand Up @@ -935,10 +926,9 @@ func maxRowsByPath(path string) uint64 {
// Calculate the maximum number of rows in the output merge part
// by dividing the freeSpace by the number of concurrent
// mergeWorkersCount for big parts.
// This assumes each row is compressed into 1 byte. Production
// simulation shows that each row usually occupies up to 0.5 bytes,
// so this is quite safe assumption.
maxRows := freeSpace / uint64(mergeWorkersCount)
// This assumes each row is compressed into 0.5 bytes
// according to production data.
maxRows := 2 * (freeSpace / uint64(mergeWorkersCount))
if maxRows > maxRowsPerBigPart {
maxRows = maxRowsPerBigPart
}
Expand Down Expand Up @@ -1058,25 +1048,21 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
var ph partHeader
rowsMerged := &pt.smallRowsMerged
rowsDeleted := &pt.smallRowsDeleted
pl := storagepacelimiter.BigMerges
if isBigPart {
rowsMerged = &pt.bigRowsMerged
rowsDeleted = &pt.bigRowsDeleted
atomic.AddUint64(&pt.bigMergesCount, 1)
atomic.AddUint64(&pt.activeBigMerges, 1)
} else {
pl = nil
atomic.AddUint64(&pt.smallMergesCount, 1)
atomic.AddUint64(&pt.activeSmallMerges, 1)
// Prioritize small merges over big merges.
storagepacelimiter.BigMerges.Inc()
}
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pl, dmis, rowsMerged, rowsDeleted)
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted)
if isBigPart {
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
} else {
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
storagepacelimiter.BigMerges.Dec()
}
putBlockStreamWriter(bsw)
if err != nil {
Expand Down
Loading

0 comments on commit e795909

Please sign in to comment.