Skip to content

Commit

Permalink
*: unite stats cache's GET (#46939)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
hawkingrei authored Sep 13, 2023
1 parent a7e3176 commit 2c4e8dc
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 116 deletions.
8 changes: 1 addition & 7 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -164,12 +163,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
sessionVars.StmtCtx.AppendWarning(err)
}

if sessionVars.InRestrictedSQL {
return statsHandle.Update(infoSchema)
}

return statsHandle.Update(infoSchema, cache.WithTableStatsByQuery())
return statsHandle.Update(infoSchema)
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ go_library(
"//sessiontxn",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle/cache",
"//table",
"//table/tables",
"//table/temptable",
Expand Down
9 changes: 4 additions & 5 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand Down Expand Up @@ -4723,10 +4722,10 @@ func getStatsTable(ctx sessionctx.Context, tblInfo *model.TableInfo, pid int64)
}

if pid == tblInfo.ID || ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
statsTbl = statsHandle.GetTableStats(tblInfo, cache.WithTableStatsByQuery())
statsTbl = statsHandle.GetTableStats(tblInfo)
} else {
usePartitionStats = true
statsTbl = statsHandle.GetPartitionStats(tblInfo, pid, cache.WithTableStatsByQuery())
statsTbl = statsHandle.GetPartitionStats(tblInfo, pid)
}

allowPseudoTblTriggerLoading := false
Expand Down Expand Up @@ -4791,9 +4790,9 @@ func getLatestVersionFromStatsTable(ctx sessionctx.Context, tblInfo *model.Table

var statsTbl *statistics.Table
if pid == tblInfo.ID || ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
statsTbl = statsHandle.GetTableStats(tblInfo, cache.WithTableStatsByQuery())
statsTbl = statsHandle.GetTableStats(tblInfo)
} else {
statsTbl = statsHandle.GetPartitionStats(tblInfo, pid, cache.WithTableStatsByQuery())
statsTbl = statsHandle.GetPartitionStats(tblInfo, pid)
}

// 2. Table row count from statistics is zero. Pseudo stats table.
Expand Down
10 changes: 5 additions & 5 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (*cache.StatsCache, err
func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID := row.GetInt64(0)
table, ok := cache.GetFromInternal(tblID)
table, ok := cache.Get(tblID)
if !ok {
continue
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache *
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID, statsVer := row.GetInt64(0), row.GetInt64(8)
table, ok := cache.GetFromInternal(tblID)
table, ok := cache.Get(tblID)
table = table.Copy()
if !ok {
continue
Expand Down Expand Up @@ -280,7 +280,7 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *cache.Stat
func (*Handle) initStatsTopN4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
affectedIndexes := make(map[*statistics.Index]struct{})
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.GetFromInternal(row.GetInt64(0))
table, ok := cache.Get(row.GetInt64(0))
if !ok {
continue
}
Expand Down Expand Up @@ -328,7 +328,7 @@ func (h *Handle) initStatsTopN(cache *cache.StatsCache) error {

func (*Handle) initStatsFMSketch4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.GetFromInternal(row.GetInt64(0))
table, ok := cache.Get(row.GetInt64(0))
if !ok {
continue
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func (h *Handle) initStatsFMSketch(cache *cache.StatsCache) error {
func (*Handle) initStatsBuckets4Chunk(cache *cache.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
table, ok := cache.GetFromInternal(tableID)
table, ok := cache.Get(tableID)
if !ok {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func benchPutGet(b *testing.B, c *StatsCachePointer) {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.Load().GetFromUser(int64(i))
c.Load().Get(int64(i))
}(i)
}
wg.Wait()
Expand All @@ -82,7 +82,7 @@ func benchGet(b *testing.B, c *StatsCachePointer) {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.Load().GetFromUser(int64(i))
c.Load().Get(int64(i))
}(i)
}
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/cache/internal/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// 2. remove remove the Version method.
type StatsCacheInner interface {
// Get gets the cache.
Get(tid int64, moveFront bool) (*statistics.Table, bool)
Get(tid int64) (*statistics.Table, bool)
// Put puts a cache.
Put(tid int64, tbl *statistics.Table) bool
// Del deletes a cache.
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewLFU(totalMemCost int64) (*LFU, error) {
}

// Get implements statsCacheInner
func (s *LFU) Get(tid int64, _ bool) (*statistics.Table, bool) {
func (s *LFU) Get(tid int64) (*statistics.Table, bool) {
result, ok := s.cache.Get(tid)
if !ok {
return s.resultKeySet.Get(tid)
Expand Down
14 changes: 7 additions & 7 deletions statistics/handle/cache/internal/lfu/lfu_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestLFUPutGetDel(t *testing.T) {
lfu.Put(mockTableID, mockTable)
lfu.wait()
lfu.Del(mockTableID)
v, ok := lfu.Get(mockTableID, false)
v, ok := lfu.Get(mockTableID)
require.False(t, ok)
require.Nil(t, v)
lfu.wait()
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestLFUPutTooBig(t *testing.T) {
mockTable := testutil.NewMockStatisticsTable(1, 1, true, false, false)
// put mockTable, the index should be evicted but the table still exists in the list.
lfu.Put(int64(1), mockTable)
_, ok := lfu.Get(int64(1), false)
_, ok := lfu.Get(int64(1))
require.True(t, ok)
lfu.wait()
require.Equal(t, uint64(lfu.Cost()), lfu.metrics().CostAdded()-lfu.metrics().CostEvicted())
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestLFUCachePutGetWithManyConcurrency(t *testing.T) {
}(i)
go func(i int) {
defer wg.Done()
lfu.Get(int64(i), true)
lfu.Get(int64(i))
}(i)
}
wg.Wait()
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) {
go func() {
defer wg.Done()
for n := 0; n < 1000; n++ {
lfu.Get(int64(n), true)
lfu.Get(int64(n))
}
}()
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestLFUCachePutGetWithManyConcurrencyAndSmallConcurrency(t *testing.T) {
defer wg.Done()
for c := 0; c < 1000; c++ {
for n := 0; n < 50; n++ {
tbl, ok := lfu.Get(int64(n), true)
tbl, ok := lfu.Get(int64(n))
require.True(t, ok)
checkTable(t, tbl)
}
Expand All @@ -212,7 +212,7 @@ func TestLFUCachePutGetWithManyConcurrencyAndSmallConcurrency(t *testing.T) {
}
wg.Wait()
lfu.wait()
v, ok := lfu.Get(rand.Int63n(50), false)
v, ok := lfu.Get(rand.Int63n(50))
require.True(t, ok)
for _, c := range v.Columns {
require.Equal(t, c.GetEvictedStatus(), statistics.AllEvicted)
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestLFUReject(t *testing.T) {
time.Sleep(3 * time.Second)
require.Equal(t, int64(0), lfu.Cost())
require.Len(t, lfu.Values(), 2)
v, ok := lfu.Get(2, false)
v, ok := lfu.Get(2)
require.True(t, ok)
for _, c := range v.Columns {
require.Equal(t, statistics.AllEvicted, c.GetEvictedStatus())
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/cache/internal/mapcache/map_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewMapCache() *MapCache {
}

// Get implements StatsCacheInner
func (m *MapCache) Get(k int64, _ bool) (*statistics.Table, bool) {
func (m *MapCache) Get(k int64) (*statistics.Table, bool) {
v, ok := m.tables[k]
return v.value, ok
}
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (s *StatsCachePointer) Replace(newCache *StatsCache) {
}

// UpdateStatsCache updates the cache with the new cache.
func (s *StatsCachePointer) UpdateStatsCache(newCache *StatsCache, tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) {
func (s *StatsCachePointer) UpdateStatsCache(newCache *StatsCache, tables []*statistics.Table, deletedIDs []int64) {
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
s.Load().Update(tables, deletedIDs, opts...)
s.Load().Update(tables, deletedIDs)
} else {
s.Replace(newCache.CopyAndUpdate(tables, deletedIDs, opts...))
s.Replace(newCache.CopyAndUpdate(tables, deletedIDs))
}
}
47 changes: 5 additions & 42 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,6 @@ import (
"go.uber.org/zap"
)

// TableStatsOption used to indicate the way to get table stats
type TableStatsOption struct {
byQuery bool
}

// ByQuery indicates whether the stats is got by query
func (t *TableStatsOption) ByQuery() bool {
return t.byQuery
}

// TableStatsOpt used to edit getTableStatsOption
type TableStatsOpt func(*TableStatsOption)

// WithTableStatsByQuery indicates user needed
func WithTableStatsByQuery() TableStatsOpt {
return func(option *TableStatsOption) {
option.byQuery = true
}
}

// NewStatsCache creates a new StatsCacheWrapper.
func NewStatsCache() (*StatsCache, error) {
enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota
Expand Down Expand Up @@ -89,16 +69,12 @@ func (sc *StatsCache) Len() int {
return sc.c.Len()
}

// GetFromUser returns the statistics of the specified Table ID.
// Get returns the statistics of the specified Table ID.
// The returned value should be read-only, if you update it, don't forget to use Put to put it back again, otherwise the memory trace can be inaccurate.
//
// e.g. v := sc.Get(id); /* update the value */ v.Version = 123; sc.Put(id, v);
func (sc *StatsCache) GetFromUser(id int64) (*statistics.Table, bool) {
return sc.getCache(id, true)
}

func (sc *StatsCache) getCache(id int64, moveFront bool) (*statistics.Table, bool) {
result, ok := sc.c.Get(id, moveFront)
func (sc *StatsCache) Get(id int64) (*statistics.Table, bool) {
result, ok := sc.c.Get(id)
if ok {
metrics.HitCounter.Add(1)
} else {
Expand All @@ -107,11 +83,6 @@ func (sc *StatsCache) getCache(id int64, moveFront bool) (*statistics.Table, boo
return result, ok
}

// GetFromInternal returns the statistics of the specified Table ID.
func (sc *StatsCache) GetFromInternal(id int64) (*statistics.Table, bool) {
return sc.getCache(id, false)
}

// Put puts the table statistics to the cache from query.
func (sc *StatsCache) Put(id int64, t *statistics.Table) {
sc.put(id, t)
Expand Down Expand Up @@ -171,11 +142,7 @@ func (sc *StatsCache) Version() uint64 {
}

// CopyAndUpdate copies a new cache and updates the new statistics table cache. It is only used in the COW mode.
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
option := &TableStatsOption{}
for _, opt := range opts {
opt(option)
}
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64) *StatsCache {
newCache := &StatsCache{c: sc.c.Copy()}
newCache.maxTblStatsVer.Store(sc.maxTblStatsVer.Load())
for _, tbl := range tables {
Expand All @@ -196,11 +163,7 @@ func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int
}

// Update updates the new statistics table cache.
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) {
option := &TableStatsOption{}
for _, opt := range opts {
opt(option)
}
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64) {
for _, tbl := range tables {
id := tbl.PhysicalID
metrics.UpdateCounter.Inc()
Expand Down
Loading

0 comments on commit 2c4e8dc

Please sign in to comment.