Skip to content

Commit

Permalink
statistics: correct behavior of non-lite InitStats and stats sync loa…
Browse files Browse the repository at this point in the history
…d of no stats column (#57803) (#57943)

close #57804
  • Loading branch information
ti-chi-bot authored Dec 3, 2024
1 parent 6087f99 commit 0eb3c0b
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 48 deletions.
70 changes: 56 additions & 14 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,20 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
Flag: row.GetInt64(10),
StatsVer: statsVer,
}
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
lastAnalyzePos.Copy(&col.LastAnalyzePos)
table.SetCol(hist.ID, col)
table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
// We will also set int primary key's loaded status to evicted.
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
} else if col.NDV > 0 || col.NullCount > 0 {
// If NDV > 0 or NullCount > 0, we also treat it as the one having its statistics. See the comments of StatsAvailable in column.go.
// So we align its status as evicted too.
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
// Otherwise the column's stats is not initialized.
}
}
if table != nil {
Expand All @@ -261,8 +266,19 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
}
}

// initStatsHistogramsSQLGen generates the SQL to load all stats_histograms records.
// We need to read all the records since we need to do initialization of table.ColAndIdxExistenceMap.
func initStatsHistogramsSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " where table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.StatsCache) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id"
sql := initStatsHistogramsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand All @@ -285,7 +301,7 @@ func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.S
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id"
sql := initStatsHistogramsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -319,10 +335,7 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
}()

sctx := se.(sessionctx.Context)
// Why do we need to add `is_index=1` in the SQL?
// because it is aligned to the `initStatsTopN` function, which only loads the topn of the index too.
// the other will be loaded by sync load.
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %? and is_index=1"
sql := initStatsHistogramsSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -401,8 +414,20 @@ func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
}
}

// initStatsTopNSQLGen generates the SQL to load all stats_top_n records.
// We only need to load the indexes' since we only record the existence of columns in ColAndIdxExistenceMap.
// The stats of the column is not loaded during the bootstrap process.
func initStatsTopNSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl) */ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsTopN(cache statstypes.StatsCache, totalMemory uint64) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id"
sql := initStatsTopNSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -435,7 +460,7 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? order by table_id"
sql := initStatsTopNSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -619,6 +644,18 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}
}

// initStatsBucketsSQLGen generates the SQL to load all stats_top_n records.
// We only need to load the indexes' since we only record the existence of columns in ColAndIdxExistenceMap.
// The stats of the column is not loaded during the bootstrap process.
func initStatsBucketsSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where is_index=1"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
Expand All @@ -629,7 +666,7 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint6
return errors.Trace(err)
}
} else {
sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
sql := initStatsBucketsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -668,7 +705,7 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? order by table_id, is_index, hist_id, bucket_id"
sql := initStatsBucketsSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -719,8 +756,10 @@ func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache, totalM

// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (only histogram)
// 2. Column/index stats are marked as existing or not by initializing the table.ColAndIdxExistenceMap, based on data from mysql.stats_histograms)
// 3. TopN, Bucket, FMSketch are not loaded.
// And to work with auto analyze's needs, we need to read all the tables' stats meta into memory.
// The sync/async load of the stats or other process haven't done a full initialization of the table.ColAndIdxExistenceMap. So we need to it here.
func (h *Handle) InitStatsLite(ctx context.Context) (err error) {
defer func() {
_, err1 := util.Exec(h.initStatsCtx, "commit")
Expand Down Expand Up @@ -750,7 +789,10 @@ func (h *Handle) InitStatsLite(ctx context.Context) (err error) {

// InitStats initiates the stats cache.
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (histogram, topn, buckets, FMSketch)
// 2. Index stats are fully loaded. (histogram, topn, buckets)
// 2. Column stats are marked as existing or not by initializing the table.ColAndIdxExistenceMap, based on data from mysql.stats_histograms)
// To work with auto-analyze's needs, we need to read all stats meta info into memory.
// The sync/async load of the stats or other process haven't done a full initialization of the table.ColAndIdxExistenceMap. So we need to it here.
func (h *Handle) InitStats(ctx context.Context, is infoschema.InfoSchema) (err error) {
totalMemory, err := memory.MemTotal()
if err != nil {
Expand Down
35 changes: 20 additions & 15 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ func TestInitStats(t *testing.T) {
require.NoError(t, h.Update(context.Background(), is))
// Index and pk are loaded.
needed := fmt.Sprintf(`Table:%v RealtimeCount:6
column:1 ndv:6 totColSize:0
column:2 ndv:6 totColSize:6
column:3 ndv:6 totColSize:6
index:1 ndv:6
num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0
num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0
Expand Down Expand Up @@ -363,7 +366,7 @@ func TestInitStatsVer2(t *testing.T) {
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false
initStatsVer2(t, false)
initStatsVer2(t)
}

func TestInitStatsVer2Concurrency(t *testing.T) {
Expand All @@ -375,18 +378,21 @@ func TestInitStatsVer2Concurrency(t *testing.T) {
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true
initStatsVer2(t, true)
initStatsVer2(t)
}

func initStatsVer2(t *testing.T, isConcurrency bool) {
func initStatsVer2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_analyze_version=2")
tk.MustExec("create table t(a int, b int, c int, index idx(a), index idxab(a, b))")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a), index idxab(a, b))")
dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())
analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "c")
tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (4, 4, 4), (4, 4, 4)")
tk.MustExec("insert into t values(1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4), (4, 4, 4, 4), (4, 4, 4, 4)")
tk.MustExec("analyze table t with 2 topn, 3 buckets")
tk.MustExec("alter table t add column e int default 1")
dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())
h := dom.StatsHandle()
is := dom.InfoSchema()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
Expand All @@ -398,16 +404,15 @@ func initStatsVer2(t *testing.T, isConcurrency bool) {
h.Clear()
require.NoError(t, h.InitStats(context.Background(), is))
table0 := h.GetTableStats(tbl.Meta())
if isConcurrency {
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
} else {
require.Equal(t, uint8(0x33), table0.GetCol(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x33), table0.GetCol(2).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x33), table0.GetCol(3).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
}
require.Equal(t, 5, table0.ColNum())
require.True(t, table0.GetCol(1).IsAllEvicted())
require.True(t, table0.GetCol(2).IsAllEvicted())
require.True(t, table0.GetCol(3).IsAllEvicted())
require.True(t, !table0.GetCol(4).IsStatsInitialized())
require.True(t, table0.GetCol(5).IsStatsInitialized())
require.Equal(t, 2, table0.IdxNum())
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
h.Clear()
require.NoError(t, h.InitStats(context.Background(), is))
table1 := h.GetTableStats(tbl.Meta())
Expand Down
3 changes: 1 addition & 2 deletions pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/table",
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
Expand All @@ -35,7 +34,7 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
":syncload",
"//pkg/config",
Expand Down
25 changes: 13 additions & 12 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -305,30 +304,31 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
}
}()
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
statsTbl, ok := s.statsHandle.Get(item.TableID)

if !ok {
return nil
}
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tblInfo, ok := s.statsHandle.TableInfoByID(is, item.TableID)
tbl, ok := s.statsHandle.TableInfoByID(is, item.TableID)
if !ok {
return nil
}
isPkIsHandle := tblInfo.Meta().PKIsHandle
tblInfo := tbl.Meta()
isPkIsHandle := tblInfo.PKIsHandle
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
index, loadNeeded := statsTbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
} else {
wrapper.idxInfo = tblInfo.Meta().FindIndexByID(item.ID)
wrapper.idxInfo = tblInfo.FindIndexByID(item.ID)
}
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
col, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return nil
}
Expand All @@ -337,7 +337,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
} else {
// Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats.
// so we have to get the column info from the domain.
wrapper.colInfo = tblInfo.Meta().GetColumnByID(item.ID)
wrapper.colInfo = tblInfo.GetColumnByID(item.ID)
}
// If this column is not analyzed yet and we don't have it in memory.
// We create a fake one for the pseudo estimation.
Expand Down Expand Up @@ -393,7 +393,8 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
}
if hg == nil {
logutil.BgLogger().Warn("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID),
zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex))
zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex),
)
return nil, errGetHistMeta
}
if item.IsIndex {
Expand Down Expand Up @@ -543,7 +544,7 @@ func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, r
}

// updateCachedItem updates the column/index hist to global statsCache.
func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
func (s *statsSyncLoad) updateCachedItem(tblInfo *model.TableInfo, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
Expand All @@ -555,13 +556,13 @@ func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableIt
if !tbl.ColAndIdxExistenceMap.Checked() {
tbl = tbl.Copy()
for _, col := range tbl.HistColl.GetColSlice() {
if tblInfo.Meta().FindColumnByID(col.ID) == nil {
if tblInfo.FindColumnByID(col.ID) == nil {
tbl.HistColl.DelCol(col.ID)
tbl.ColAndIdxExistenceMap.DeleteColAnalyzed(col.ID)
}
}
for _, idx := range tbl.HistColl.GetIdxSlice() {
if tblInfo.Meta().FindIndexByID(idx.ID) == nil {
if tblInfo.FindIndexByID(idx.ID) == nil {
tbl.HistColl.DelIdx(idx.ID)
tbl.ColAndIdxExistenceMap.DeleteIdxAnalyzed(idx.ID)
}
Expand Down
Loading

0 comments on commit 0eb3c0b

Please sign in to comment.