Skip to content

Commit

Permalink
planner: capsulate stats cache behind an interface (#47588)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
qw4990 authored Oct 12, 2023
1 parent 8dbfdfb commit 9353c68
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 192 deletions.
4 changes: 4 additions & 0 deletions domain/domain_sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (do *Domain) initDomainSysVars() {

// setStatsCacheCapacity sets statsCache cap
func (do *Domain) setStatsCacheCapacity(c int64) {
statsHandle := do.StatsHandle()
if statsHandle == nil { // from test
return
}
do.StatsHandle().SetStatsCacheCapacity(c)
}

Expand Down
30 changes: 15 additions & 15 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"go.uber.org/zap"
)

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
// The table is read-only. Please do not modify it.
Expand All @@ -63,15 +63,15 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *cache.Stat
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (*cache.StatsCache, error) {
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return nil, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables, err := cache.NewStatsCache()
tables, err := cache.NewStatsCacheImpl()
if err != nil {
return nil, err
}
Expand All @@ -90,7 +90,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (*cache.StatsCache, err
return tables, nil
}

func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID := row.GetInt64(0)
table, ok := cache.Get(tblID)
Expand Down Expand Up @@ -161,7 +161,7 @@ func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache *
}
}

func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID, statsVer := row.GetInt64(0), row.GetInt64(8)
table, ok := cache.Get(tblID)
Expand Down Expand Up @@ -231,7 +231,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cach
}
}

func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache.StatsCache) error {
func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand All @@ -254,7 +254,7 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache *cache.
return nil
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *cache.StatsCache) error {
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand All @@ -277,7 +277,7 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *cache.Stat
return nil
}

func (*Handle) initStatsTopN4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
affectedIndexes := make(map[*statistics.Index]struct{})
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
Expand All @@ -303,7 +303,7 @@ func (*Handle) initStatsTopN4Chunk(cache *cache.StatsCache, iter *chunk.Iterator
}
}

func (h *Handle) initStatsTopN(cache *cache.StatsCache) error {
func (h *Handle) initStatsTopN(cache util.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand All @@ -326,7 +326,7 @@ func (h *Handle) initStatsTopN(cache *cache.StatsCache) error {
return nil
}

func (*Handle) initStatsFMSketch4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (*Handle) initStatsFMSketch4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
if !ok {
Expand All @@ -353,7 +353,7 @@ func (*Handle) initStatsFMSketch4Chunk(cache *cache.StatsCache, iter *chunk.Iter
}
}

func (h *Handle) initStatsFMSketch(cache *cache.StatsCache) error {
func (h *Handle) initStatsFMSketch(cache util.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, value from mysql.stats_fm_sketch"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand All @@ -376,7 +376,7 @@ func (h *Handle) initStatsFMSketch(cache *cache.StatsCache) error {
return nil
}

func (*Handle) initStatsBuckets4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
table, ok := cache.Get(tableID)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (*Handle) initStatsBuckets4Chunk(cache *cache.StatsCache, iter *chunk.Itera
}
}

func (h *Handle) initStatsBuckets(cache *cache.StatsCache) error {
func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand Down Expand Up @@ -486,7 +486,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
h.initStatsCache(cache)
h.Replace(cache)
return nil
}

Expand Down Expand Up @@ -539,7 +539,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
}
}
}
h.initStatsCache(cache)
h.Replace(cache)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions statistics/handle/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
"//config",
"//statistics",
"//statistics/handle/cache/internal/testutil",
"//statistics/handle/util",
"//util/benchdaily",
],
)
29 changes: 15 additions & 14 deletions statistics/handle/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache/internal/testutil"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/benchdaily"
)

func benchCopyAndUpdate(b *testing.B, c *StatsCachePointer) {
func benchCopyAndUpdate(b *testing.B, c util.StatsCache) {
var wg sync.WaitGroup
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -34,14 +35,14 @@ func benchCopyAndUpdate(b *testing.B, c *StatsCachePointer) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache(c.Load(), []*statistics.Table{t1}, nil)
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
}()
}
wg.Wait()
b.StopTimer()
}

func benchPutGet(b *testing.B, c *StatsCachePointer) {
func benchPutGet(b *testing.B, c util.StatsCache) {
var wg sync.WaitGroup
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -50,29 +51,29 @@ func benchPutGet(b *testing.B, c *StatsCachePointer) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache(c.Load(), []*statistics.Table{t1}, nil)
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
}(i)
}
for i := 0; i < b.N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.Load().Get(int64(i))
c.Get(int64(i))
}(i)
}
wg.Wait()
b.StopTimer()
}

func benchGet(b *testing.B, c *StatsCachePointer) {
func benchGet(b *testing.B, c util.StatsCache) {
var w sync.WaitGroup
for i := 0; i < b.N; i++ {
w.Add(1)
go func(i int) {
defer w.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache(c.Load(), []*statistics.Table{t1}, nil)
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
}(i)
}
w.Wait()
Expand All @@ -82,7 +83,7 @@ func benchGet(b *testing.B, c *StatsCachePointer) {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.Load().Get(int64(i))
c.Get(int64(i))
}(i)
}
wg.Wait()
Expand All @@ -95,7 +96,7 @@ func BenchmarkStatsCacheLFUCopyAndUpdate(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand All @@ -108,7 +109,7 @@ func BenchmarkStatsCacheMapCacheCopyAndUpdate(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand All @@ -121,7 +122,7 @@ func BenchmarkLFUCachePutGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand All @@ -134,7 +135,7 @@ func BenchmarkMapCachePutGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand All @@ -147,7 +148,7 @@ func BenchmarkLFUCacheGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand All @@ -160,7 +161,7 @@ func BenchmarkMapCacheGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCachePointer()
cache, err := NewStatsCacheImpl()
if err != nil {
b.Fail()
}
Expand Down
82 changes: 69 additions & 13 deletions statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,35 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache/internal/metrics"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// StatsCachePointer is used to cache the stats of a table.
type StatsCachePointer struct {
// StatsCacheImpl implements util.StatsCache.
type StatsCacheImpl struct {
atomic.Pointer[StatsCache]
}

// NewStatsCachePointer creates a new StatsCache.
func NewStatsCachePointer() (*StatsCachePointer, error) {
// NewStatsCacheImpl creates a new StatsCache.
func NewStatsCacheImpl() (util.StatsCache, error) {
newCache, err := NewStatsCache()
if err != nil {
return nil, err
}
result := StatsCachePointer{}
result := &StatsCacheImpl{}
result.Store(newCache)
return &result, nil
return result, nil
}

// Load loads the cached stats from the cache.
func (s *StatsCachePointer) Load() *StatsCache {
return s.Pointer.Load()
// Replace replaces this cache.
func (s *StatsCacheImpl) Replace(cache util.StatsCache) {
x := cache.(*StatsCacheImpl)
s.replace(x.Load())
}

// Replace replaces the cache with the new cache.
func (s *StatsCachePointer) Replace(newCache *StatsCache) {
// replace replaces the cache with the new cache.
func (s *StatsCacheImpl) replace(newCache *StatsCache) {
old := s.Swap(newCache)
if old != nil {
old.Close()
Expand All @@ -53,10 +57,62 @@ func (s *StatsCachePointer) Replace(newCache *StatsCache) {
}

// UpdateStatsCache updates the cache with the new cache.
func (s *StatsCachePointer) UpdateStatsCache(newCache *StatsCache, tables []*statistics.Table, deletedIDs []int64) {
func (s *StatsCacheImpl) UpdateStatsCache(tables []*statistics.Table, deletedIDs []int64) {
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
s.Load().Update(tables, deletedIDs)
} else {
s.Replace(newCache.CopyAndUpdate(tables, deletedIDs))
newCache := s.Load().CopyAndUpdate(tables, deletedIDs)
s.replace(newCache)
}
}

// Close closes this cache.
func (s *StatsCacheImpl) Close() {
s.Load().Close()
}

// Clear clears this cache.
func (s *StatsCacheImpl) Clear() {
cache, err := NewStatsCache()
if err != nil {
logutil.BgLogger().Warn("create stats cache failed", zap.Error(err))
return
}
s.replace(cache)
}

// MemConsumed returns its memory usage.
func (s *StatsCacheImpl) MemConsumed() (size int64) {
return s.Load().Cost()
}

// Get returns the specified table's stats.
func (s *StatsCacheImpl) Get(tableID int64) (*statistics.Table, bool) {
return s.Load().Get(tableID)
}

// Put puts this table stats into the cache.
func (s *StatsCacheImpl) Put(id int64, t *statistics.Table) {
s.Load().put(id, t)
}

// MaxTableStatsVersion returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
func (s *StatsCacheImpl) MaxTableStatsVersion() uint64 {
return s.Load().Version()
}

// Values returns all values in this cache.
func (s *StatsCacheImpl) Values() []*statistics.Table {
return s.Load().Values()
}

// Len returns the length of this cache.
func (s *StatsCacheImpl) Len() int {
return s.Load().Len()
}

// SetStatsCacheCapacity sets the cache's capacity.
func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) {
s.Load().SetCapacity(c)
}
Loading

0 comments on commit 9353c68

Please sign in to comment.