Skip to content

Commit

Permalink
statistics: improve code to avoid data race (#46309)
Browse files Browse the repository at this point in the history
ref #46158
  • Loading branch information
hawkingrei authored Aug 23, 2023
1 parent 2aa82f5 commit aadbb7e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 22 deletions.
2 changes: 1 addition & 1 deletion statistics/handle/cache/internal/lfu/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ go_test(
embed = [":lfu"],
flaky = True,
race = "on",
shard_count = 7,
shard_count = 8,
deps = [
"//statistics",
"//statistics/handle/cache/internal/testutil",
Expand Down
48 changes: 27 additions & 21 deletions statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,9 @@ func (s *LFU) Get(tid int64, _ bool) (*statistics.Table, bool) {
// Put implements statsCacheInner
func (s *LFU) Put(tblID int64, tbl *statistics.Table) bool {
cost := tbl.MemoryUsage().TotalTrackingMemUsage()
// Here we need to insert resultKeySet first and then write to LFU,
// in order to prevent data race. If the LFU cost is already full,
// a rejection may occur, triggering the onEvict event.
// Both inserting into resultKeySet and evicting will modify the memory cost,
// so we need to stagger these two actions.
s.resultKeySet.AddKeyValue(tblID, tbl)
s.cost.Add(cost)
ok := s.cache.Set(tblID, tbl, cost)
metrics.CostGauge.Set(float64(s.cost.Load()))
return ok
s.addCost(cost)
return s.cache.Set(tblID, tbl, cost)
}

// Del implements statsCacheInner
Expand All @@ -117,48 +110,56 @@ func (s *LFU) Values() []*statistics.Table {

// DropEvicted drop stats for table column/index
func DropEvicted(item statistics.TableCacheItem) {
if !item.IsStatsInitialized() || item.GetEvictedStatus() == statistics.AllEvicted {
if !item.IsStatsInitialized() ||
item.GetEvictedStatus() == statistics.AllEvicted {
return
}
item.DropUnnecessaryData()
}

func (s *LFU) onReject(item *ristretto.Item) {
metrics.RejectCounter.Add(1.0)
s.onEvict(item)
s.dropMemory(item)
}

func (s *LFU) onEvict(item *ristretto.Item) {
s.dropMemory(item)
metrics.EvictCounter.Inc()
}

func (s *LFU) dropMemory(item *ristretto.Item) {
if item.Value == nil {
// Sometimes the same key may be passed to the "onEvict/onExit" function twice,
// and in the second invocation, the value is empty, so it should not be processed.
// Sometimes the same key may be passed to the "onEvict/onExit"
// function twice, and in the second invocation, the value is empty,
// so it should not be processed.
return
}
// We do not need to calculate the cost during onEvict, because the onexit function
// is also called when the evict event occurs.
metrics.EvictCounter.Inc()
table := item.Value.(*statistics.Table)
// We do not need to calculate the cost during onEvict,
// because the onexit function is also called when the evict event occurs.
// TODO(hawkingrei): not copy the useless part.
table := item.Value.(*statistics.Table).Copy()
before := table.MemoryUsage().TotalTrackingMemUsage()
for _, column := range table.Columns {
DropEvicted(column)
}
for _, indix := range table.Indices {
DropEvicted(indix)
}
s.resultKeySet.AddKeyValue(int64(item.Key), table)
after := table.MemoryUsage().TotalTrackingMemUsage()
// why add before again? because the cost will be subtracted in onExit.
// in fact, it is -(before - after) + after = after + after - before
s.cost.Add(2*after - before)
s.addCost(2*after - before)
}

func (s *LFU) onExit(val interface{}) {
func (s *LFU) onExit(val any) {
if val == nil {
// Sometimes the same key may be passed to the "onEvict/onExit" function twice,
// and in the second invocation, the value is empty, so it should not be processed.
return
}
s.cost.Add(-1 * val.(*statistics.Table).MemoryUsage().TotalTrackingMemUsage())
metrics.CostGauge.Set(float64(s.cost.Load()))
s.addCost(
-1 * val.(*statistics.Table).MemoryUsage().TotalTrackingMemUsage())
}

// Len implements statsCacheInner
Expand Down Expand Up @@ -201,3 +202,8 @@ func (s *LFU) Clear() {
s.cache.Clear()
s.resultKeySet.Clear()
}

func (s *LFU) addCost(v int64) {
newv := s.cost.Add(v)
metrics.CostGauge.Set(float64(newv))
}
66 changes: 66 additions & 0 deletions statistics/handle/cache/internal/lfu/lfu_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package lfu

import (
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -170,6 +171,71 @@ func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) {
require.Equal(t, 1000, len(lfu.Values()))
}

func TestLFUCachePutGetWithManyConcurrencyAndSmallConcurrency(t *testing.T) {
// to test DATA RACE

capacity := int64(100)
lfu, err := NewLFU(capacity)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for c := 0; c < 1000; c++ {
for n := 0; n < 50; n++ {
t1 := testutil.NewMockStatisticsTable(1, 1, true, true, true)
lfu.Put(int64(n), t1)
}
}
}()
}
time.Sleep(1 * time.Second)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for c := 0; c < 1000; c++ {
for n := 0; n < 50; n++ {
tbl, ok := lfu.Get(int64(n), true)
require.True(t, ok)
checkTable(t, tbl)
}
}
}()
}
wg.Wait()
lfu.wait()
v, ok := lfu.Get(rand.Int63n(50), false)
require.True(t, ok)
for _, c := range v.Columns {
require.Equal(t, c.GetEvictedStatus(), statistics.AllEvicted)
}
for _, i := range v.Indices {
require.Equal(t, i.GetEvictedStatus(), statistics.AllEvicted)
}
}

func checkTable(t *testing.T, tbl *statistics.Table) {
for _, column := range tbl.Columns {
if column.GetEvictedStatus() == statistics.AllEvicted {
require.Nil(t, column.TopN)
require.Equal(t, 0, cap(column.Histogram.Buckets))
} else {
require.NotNil(t, column.TopN)
require.Greater(t, cap(column.Histogram.Buckets), 0)
}
}
for _, idx := range tbl.Indices {
if idx.GetEvictedStatus() == statistics.AllEvicted {
require.Nil(t, idx.TopN)
require.Equal(t, 0, cap(idx.Histogram.Buckets))
} else {
require.NotNil(t, idx.TopN)
require.Greater(t, cap(idx.Histogram.Buckets), 0)
}
}
}

func TestLFUReject(t *testing.T) {
capacity := int64(100000000000)
lfu, err := NewLFU(capacity)
Expand Down

0 comments on commit aadbb7e

Please sign in to comment.