Skip to content

Commit

Permalink
planner: move more methods from StatsHandle to its sub-packages (#47739
Browse files Browse the repository at this point in the history
…) (#47751)

ref #46905
  • Loading branch information
ti-chi-bot authored Oct 20, 2023
1 parent dfc40e3 commit 690ffed
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 244 deletions.
5 changes: 3 additions & 2 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -61,7 +61,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- errors.Trace(exeerrors.ErrQueryInterrupted)
return
}
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
statsHandle := domain.GetDomain(worker.sctx).StatsHandle()
err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ go_library(
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tiancaiamao_gp//:gp",
Expand Down
169 changes: 9 additions & 160 deletions pkg/statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,11 @@
package handle

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)

// HandleDDLEvent begins to process a ddl task.
Expand All @@ -40,7 +31,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.insertTableStats2KV(t.TableInfo, id); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, id); err != nil {
return err
}
}
Expand All @@ -50,7 +41,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.resetTableStats2KVForDrop(id); err != nil {
if err := h.ResetTableStats2KVForDrop(id); err != nil {
return err
}
}
Expand All @@ -60,13 +51,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.insertColStats2KV(id, t.ColumnInfos); err != nil {
if err := h.InsertColStats2KV(id, t.ColumnInfos); err != nil {
return err
}
}
case model.ActionAddTablePartition, model.ActionTruncateTablePartition:
for _, def := range t.PartInfo.Definitions {
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
}
Expand All @@ -81,14 +72,14 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
}
}
for _, def := range t.PartInfo.Definitions {
if err := h.resetTableStats2KVForDrop(def.ID); err != nil {
if err := h.ResetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionReorganizePartition:
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
// Do not update global stats, since the data have not changed!
Expand All @@ -97,7 +88,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
// Add partitioning
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
}
Expand All @@ -107,21 +98,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
// Note that t.TableInfo is the current (new) table info
// and t.PartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
return h.changeGlobalStatsID(t.PartInfo.NewTableID, t.TableInfo.ID)
return h.ChangeGlobalStatsID(t.PartInfo.NewTableID, t.TableInfo.ID)
case model.ActionFlashbackCluster:
return h.updateStatsVersion()
return h.UpdateStatsVersion()
}
return nil
}

// updateStatsVersion will set statistics version to the newest TS,
// then tidb-server will reload automatic.
func (h *Handle) updateStatsVersion() error {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
return storage.UpdateStatsVersion(sctx)
}, statsutil.FlagWrapTxn)
}

// updateGlobalStats will trigger the merge of global-stats when we drop table partition
func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
// We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode.
Expand All @@ -130,18 +113,6 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
})
}

func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} {
_, err = statsutil.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
if err != nil {
return err
}
}
return nil
}, statsutil.FlagWrapTxn)
}

func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
Expand All @@ -165,125 +136,3 @@ func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, er
func (h *Handle) DDLEventCh() chan *util.Event {
return h.ddlEventCh
}

// insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the
// new columns and indices which belong to this table.
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
statsVer = startTS
for _, col := range info.Columns {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
return nil
}, statsutil.FlagWrapTxn)
}

// resetTableStats2KV resets the count to 0.
func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
if _, err := statsutil.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil {
return err
}
return nil
}, statsutil.FlagWrapTxn)
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}

// First of all, we update the version.
_, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return err
}
statsVer = startTS
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs sqlexec.RecordSet
rs, err = statsutil.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return err
}
defer terror.Call(rs.Close)
req := rs.NewChunk(nil)
err = rs.Next(context.Background(), req)
if err != nil {
return err
}
count := req.GetRow(0).GetInt64(0)
for _, colInfo := range colInfos {
value := types.NewDatum(colInfo.GetOriginDefaultValue())
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, &colInfo.FieldType)
if err != nil {
return err
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
// There must be only one bucket for this new column and the value is the default value.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}
}
}
return nil
}, statsutil.FlagWrapTxn)
}
23 changes: 0 additions & 23 deletions pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,29 +220,6 @@ func (h *Handle) FlushStats() {
}
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
return SaveTableStatsToStorage(sctx, results, analyzeSnapshot, source)
})
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) error {
statsVer, err := storage.SaveTableStatsToStorage(sctx, results, analyzeSnapshot)
if err == nil && statsVer != 0 {
tableID := results.TableID.GetStatisticsID()
if err1 := history.RecordHistoricalStatsMeta(sctx, tableID, statsVer, source); err1 != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.String("source", source),
zap.Error(err1))
}
}
return err
}

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (es *statistics.ExtendedStatsColl, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
Expand Down
16 changes: 9 additions & 7 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,18 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
}

// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string) {
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) {
if version == 0 {
return
}
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return
if !enforce {
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return
}
}
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
return RecordHistoricalStatsMeta(sctx, tableID, version, source)
Expand Down
Loading

0 comments on commit 690ffed

Please sign in to comment.