Skip to content

Commit

Permalink
statistics: refactor write merge-global-stats (#53549)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored May 28, 2024
1 parent 4bf624b commit 4a51974
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/store/driver/backoff",
"//pkg/store/driver/txn",
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
globalStatsMap := make(map[globalStatsKey]statstypes.GlobalStatsInfo)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
Expand Down Expand Up @@ -160,7 +161,7 @@ TASKLOOP:
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
err = e.handleGlobalStats(ctx, globalStatsMap)
err = e.handleGlobalStats(globalStatsMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -769,11 +770,11 @@ func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, resu
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
globalStatsMap[globalStatsID] = statstypes.GlobalStatsInfo{IsIndex: result.IsIndex, HistIDs: histIDs, StatsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
globalStatsMap[globalStatsID] = statstypes.GlobalStatsInfo{IsIndex: result.IsIndex, HistIDs: []int64{hg.ID}, StatsVersion: results.StatsVer}
}
}
}
Expand Down
53 changes: 5 additions & 48 deletions pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
package executor

import (
"context"
"fmt"

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -33,20 +30,12 @@ type globalStatsKey struct {
indexID int64
}

type globalStatsInfo struct {
isIndex int
// When the `isIndex == 0`, histIDs will be the column IDs.
// Otherwise, histIDs will only contain the index ID.
histIDs []int64
statsVersion int
}

// globalStatsMap is a map used to store which partition tables and the corresponding indexes need global-level stats.
// The meaning of key in map is the structure that used to store the tableID and indexID.
// The meaning of value in map is some additional information needed to build global-level stats.
type globalStatsMap map[globalStatsKey]globalStatsInfo
type globalStatsMap map[globalStatsKey]statstypes.GlobalStatsInfo

func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap globalStatsMap) error {
func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
globalStatsTableIDs := make(map[int64]struct{}, len(globalStatsMap))
for globalStatsID := range globalStatsMap {
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
Expand Down Expand Up @@ -75,47 +64,15 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
globalOpts = v2Options.FilledOpts
}
}
globalStatsI, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
&info,
globalStatsID.tableID,
info.isIndex == 1,
info.histIDs,
)
if err != nil {
logutil.BgLogger().Warn("merge global stats failed",
zap.String("info", job.JobInfo), zap.Error(err), zap.Int64("tableID", tableID))
if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
}
return err
}
globalStats := globalStatsI.(*globalstats.GlobalStats)
// Dump global-level stats to kv.
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
if hg == nil {
// All partitions have no stats so global stats are not created.
continue
}
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID,
globalStats.Count,
globalStats.ModifyCount,
info.isIndex,
hg,
cms,
topN,
info.statsVersion,
1,
true,
util.StatsMetaHistorySourceAnalyze,
)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo),
zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID))
}
}
return err
}()
Expand Down
47 changes: 43 additions & 4 deletions pkg/statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
Expand All @@ -48,11 +50,18 @@ func NewStatsGlobal(statsHandler statstypes.StatsHandle) statstypes.StatsGlobal
// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (sg *statsGlobalImpl) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
info *statstypes.GlobalStatsInfo,
physicalID int64,
isIndex bool,
histIDs []int64,
) (globalStats any, err error) {
return MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, isIndex, histIDs)
) (err error) {
globalStats, err := MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, info.IsIndex == 1, info.HistIDs)
if err != nil {
if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
sc.GetSessionVars().StmtCtx.AppendWarning(err)
}
return err
}
return WriteGlobalStatsToStorage(sg.statsHandler, globalStats, info, physicalID)
}

// GlobalStats is used to store the statistics contained in the global-level stats
Expand Down Expand Up @@ -346,3 +355,33 @@ func blockingMergePartitionStats2GlobalStats(
}
return
}

// WriteGlobalStatsToStorage is to write global stats to storage
func WriteGlobalStatsToStorage(statsHandle statstypes.StatsHandle, globalStats *GlobalStats, info *statstypes.GlobalStatsInfo, gid int64) (err error) {
// Dump global-level stats to kv.
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
if hg == nil {
// All partitions have no stats so global stats are not created.
continue
}
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(gid,
globalStats.Count,
globalStats.ModifyCount,
info.IsIndex,
hg,
cms,
topN,
info.StatsVersion,
1,
true,
util.StatsMetaHistorySourceAnalyze,
)
if err != nil {
statslogutil.StatsLogger().Error("save global-level stats to storage failed",
zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", gid))
}
}
return err
}
16 changes: 12 additions & 4 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,23 @@ type StatsSyncLoad interface {
HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error)
}

// GlobalStatsInfo represents the contextual information pertaining to global statistics.
type GlobalStatsInfo struct {
HistIDs []int64
// When the `isIndex == 0`, HistIDs will be the column IDs.
// Otherwise, HistIDs will only contain the index ID.
IsIndex int
StatsVersion int
}

// StatsGlobal is used to manage partition table global stats.
type StatsGlobal interface {
// MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID.
MergePartitionStats2GlobalStatsByTableID(sctx sessionctx.Context,
MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
info *GlobalStatsInfo,
physicalID int64,
isIndex bool,
histIDs []int64,
) (globalStats any, err error)
) (err error)
}

// DDL is used to handle ddl events.
Expand Down

0 comments on commit 4a51974

Please sign in to comment.