Skip to content

Commit

Permalink
[BREAKING] feat(metrics): fix and update metrics in badger (#1948)
Browse files Browse the repository at this point in the history
Our current metrics are outdated, and some of them are not being invoked
at all right now. Added new metrics, and fixed old ones. Note that this PR
removes all the old metrics (because of the prefix change) and introduces
new improved metrics.
  • Loading branch information
harshil-goel authored Jul 18, 2023
1 parent da1dcac commit ec80d3d
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 50 deletions.
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
}
// Found the required version of the key, return immediately.
if vs.Version == version {
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
return vs, nil
}
if maxVs.Version < vs.Version {
Expand Down Expand Up @@ -898,6 +899,7 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
size += e.estimateSizeAndSetThreshold(db.valueThreshold())
count++
}
y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, int64(size))
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}
Expand Down
2 changes: 2 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
panic(ErrDBClosed)
}

y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)

// Keep track of the number of active iterators.
txn.numIterators.Add(1)

Expand Down
28 changes: 24 additions & 4 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,22 @@ func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
return err
}

getSizes := func(tables []*table.Table) int64 {
size := int64(0)
for _, i := range tables {
size += i.Size()
}
return size
}

sizeNewTables := int64(0)
sizeOldTables := int64(0)
if s.kv.opt.MetricsEnabled {
sizeNewTables = getSizes(newTables)
sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
}

// See comment earlier in this function about the ordering of these ops, and the order in which
// we access levels when reading.
if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
Expand All @@ -1459,16 +1475,16 @@ func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
expensive = " [E]"
}
s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
" [%s] -> [%s], took %v\n",
" [%s] -> [%s], took %v\n, deleted %d bytes",
id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
dur.Round(time.Millisecond))
dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
}

if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
s.kv.opt.Debugf("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
s.kv.opt.Debugf("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
}
return nil
Expand Down Expand Up @@ -1598,13 +1614,17 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
if vs.Value == nil && vs.Meta == 0 {
continue
}
y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
if vs.Version == version {
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
if len(maxVs.Value) > 0 {
y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
}
return maxVs, nil
}

Expand Down
5 changes: 1 addition & 4 deletions memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
mt.maxVersion = ts
}
y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
return nil
}

Expand Down Expand Up @@ -388,7 +389,6 @@ func (lf *logFile) encryptionEnabled() bool {

// Acquire lock on mmap/file if you are calling this
func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
var nbr int64
offset := p.Offset
// Do not convert size to uint32, because the lf.Data can be of size
// 4GB, which overflows the uint32 during conversion to make the size 0,
Expand All @@ -404,10 +404,7 @@ func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
err = y.ErrEOF
} else {
buf = lf.Data[offset : offset+valsz]
nbr = int64(valsz)
}
y.NumReadsAdd(lf.opt.MetricsEnabled, 1)
y.NumBytesReadAdd(lf.opt.MetricsEnabled, nbr)
return buf, err
}

Expand Down
203 changes: 203 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

import (
"expvar"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func clearAllMetrics() {
expvar.Do(func(kv expvar.KeyValue) {
// Reset the value of each expvar variable based on its type
switch v := kv.Value.(type) {
case *expvar.Int:
v.Set(0)
case *expvar.Float:
v.Set(0)
case *expvar.Map:
v.Init()
case *expvar.String:
v.Set("")
}
})
}

func TestWriteMetrics(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
clearAllMetrics()
num := 10
val := make([]byte, 1<<12)
key := make([]byte, 40)
for i := 0; i < num; i++ {
_, err := rand.Read(key)
require.NoError(t, err)
_, err = rand.Read(val)
require.NoError(t, err)

writer := db.NewManagedWriteBatch()
require.NoError(t, writer.SetEntryAt(NewEntry(key, val), 1))
writer.Flush()
}

expectedSize := int64(len(val)) + 48 + 2 // 48 := size of key (40 + 8(ts)), 2 := meta
write_metric := expvar.Get("badger_v4_write_bytes_user")
require.Equal(t, expectedSize*int64(num), write_metric.(*expvar.Int).Value())

put_metric := expvar.Get("badger_v4_put_num_user")
require.Equal(t, int64(num), put_metric.(*expvar.Int).Value())

lsm_metric := expvar.Get("badger_v4_write_bytes_l0")
require.Equal(t, expectedSize*int64(num), lsm_metric.(*expvar.Int).Value())

compactionMetric := expvar.Get("badger_v4_write_bytes_compaction").(*expvar.Map)
require.Equal(t, nil, compactionMetric.Get("l6"))

// Force compaction
db.Close()

_, err := OpenManaged(opt)
require.NoError(t, err)

compactionMetric = expvar.Get("badger_v4_write_bytes_compaction").(*expvar.Map)
require.GreaterOrEqual(t, expectedSize*int64(num)+int64(num*200), compactionMetric.Get("l6").(*expvar.Int).Value())
// Because we have random values, compression is not able to do much, so we incur a cost on total size
})
}

func TestVlogMetrics(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
clearAllMetrics()
num := 10
val := make([]byte, 1<<20) // Large Value
key := make([]byte, 40)
for i := 0; i < num; i++ {
_, err := rand.Read(key)
require.NoError(t, err)
_, err = rand.Read(val)
require.NoError(t, err)

writer := db.NewManagedWriteBatch()
require.NoError(t, writer.SetEntryAt(NewEntry(key, val), 1))
writer.Flush()
}

expectedSize := int64(len(val)) + 200 // vlog expected size

totalWrites := expvar.Get("badger_v4_write_num_vlog")
require.Equal(t, int64(num), totalWrites.(*expvar.Int).Value())

bytesWritten := expvar.Get("badger_v4_write_bytes_vlog")
require.GreaterOrEqual(t, expectedSize*int64(num), bytesWritten.(*expvar.Int).Value())

txn := db.NewTransactionAt(2, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.Equal(t, uint64(1), item.Version())

err = item.Value(func(val []byte) error {
totalReads := expvar.Get("badger_v4_read_num_vlog")
bytesRead := expvar.Get("badger_v4_read_bytes_vlog")
require.Equal(t, int64(1), totalReads.(*expvar.Int).Value())
require.GreaterOrEqual(t, expectedSize, bytesRead.(*expvar.Int).Value())
return nil
})

require.NoError(t, err)
})
}

func TestReadMetrics(t *testing.T) {
opt := getTestOptions("")
opt.managedTxns = true
opt.CompactL0OnClose = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
clearAllMetrics()
num := 10
val := make([]byte, 1<<15)
keys := [][]byte{}
writer := db.NewManagedWriteBatch()
for i := 0; i < num; i++ {
keyB := key("byte", 1)
keys = append(keys, []byte(keyB))

_, err := rand.Read(val)
require.NoError(t, err)

require.NoError(t, writer.SetEntryAt(NewEntry([]byte(keyB), val), 1))
}
writer.Flush()

txn := db.NewTransactionAt(2, false)
item, err := txn.Get(keys[0])
require.NoError(t, err)
require.Equal(t, uint64(1), item.Version())

totalGets := expvar.Get("badger_v4_get_num_user")
require.Equal(t, int64(1), totalGets.(*expvar.Int).Value())

totalMemtableReads := expvar.Get("badger_v4_get_num_memtable")
require.Equal(t, int64(1), totalMemtableReads.(*expvar.Int).Value())

totalLSMGets := expvar.Get("badger_v4_get_num_lsm")
require.Nil(t, totalLSMGets.(*expvar.Map).Get("l6"))

// Force compaction
db.Close()

db, err = OpenManaged(opt)
require.NoError(t, err)

txn = db.NewTransactionAt(2, false)
item, err = txn.Get(keys[0])
require.NoError(t, err)
require.Equal(t, uint64(1), item.Version())

_, err = txn.Get([]byte(key("abdbyte", 1000))) // val should be far enough that bloom filter doesn't hit
require.Error(t, err)

totalLSMGets = expvar.Get("badger_v4_get_num_lsm")
require.Equal(t, int64(0x1), totalLSMGets.(*expvar.Map).Get("l6").(*expvar.Int).Value())

totalBloom := expvar.Get("badger_v4_hit_num_lsm_bloom_filter")
require.Equal(t, int64(0x1), totalBloom.(*expvar.Map).Get("l6").(*expvar.Int).Value())
require.Equal(t, int64(0x1), totalBloom.(*expvar.Map).Get("DoesNotHave_HIT").(*expvar.Int).Value())
require.Equal(t, int64(0x2), totalBloom.(*expvar.Map).Get("DoesNotHave_ALL").(*expvar.Int).Value())

bytesLSM := expvar.Get("badger_v4_read_bytes_lsm")
require.Equal(t, int64(len(val)), bytesLSM.(*expvar.Int).Value())

getWithResult := expvar.Get("badger_v4_get_with_result_num_user")
require.Equal(t, int64(2), getWithResult.(*expvar.Int).Value())

iterOpts := DefaultIteratorOptions
iter := txn.NewKeyIterator(keys[0], iterOpts)
iter.Seek(keys[0])

rangeQueries := expvar.Get("badger_v4_iterator_num_user")
require.Equal(t, int64(1), rangeQueries.(*expvar.Int).Value())
})
}
6 changes: 4 additions & 2 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ func (vlog *valueLog) write(reqs []*request) error {
bytesWritten += buf.Len()
// No need to flush anything, we write to file directly via mmap.
}
y.NumWritesAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))

vlog.numEntriesWritten += uint32(written)
vlog.db.threshold.update(valueSizes)
Expand Down Expand Up @@ -994,6 +994,8 @@ func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error)
}

buf, err := lf.read(vp)
y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
return buf, lf, err
}

Expand Down
Loading

0 comments on commit ec80d3d

Please sign in to comment.