Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: unify the way to handle txn in statshandle (#47859) #47889

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin
}
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
return RecordHistoricalStatsMeta(sctx, tableID, version, source)
})
}, util.FlagWrapTxn)
if err != nil { // just log the error, hide the error from the outside caller.
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
Expand Down Expand Up @@ -113,14 +113,6 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(sctx, err)
}()

const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil {
return errors.Trace(err)
Expand Down
8 changes: 1 addition & 7 deletions pkg/statistics/handle/storage/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,10 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID))
}

_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
err1 := util.FinishTransaction(sctx, err)
if err == nil && err1 == nil {
if err == nil {
removeExtendedStatsItem(statsCache, tableID, statsName)
}
err = err1
}()
version, err := util.GetStartTS(sctx)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/statistics/handle/storage/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,6 @@ func BlocksToJSONTable(blocks [][]byte) (*util.JSONTable, error) {

// TableHistoricalStatsToJSON converts the historical stats of a table to JSONTable.
func TableHistoricalStatsToJSON(sctx sessionctx.Context, physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) {
if _, err := util.Exec(sctx, "begin"); err != nil {
return nil, false, err
}
defer func() {
err = util.FinishTransaction(sctx, err)
}()

// get meta version
rows, _, err := util.ExecRows(sctx, "select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
Expand Down
21 changes: 0 additions & 21 deletions pkg/statistics/handle/storage/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
needDumpFMS := results.TableID.IsPartitionTable()
tableID := results.TableID.GetStatisticsID()
ctx := util.StatsCtx
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return 0, err
}
defer func() {
err = util.FinishTransaction(sctx, err)
}()
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
Expand Down Expand Up @@ -334,13 +327,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context,
func SaveStatsToStorage(sctx sessionctx.Context,
tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram,
cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool) (statsVer uint64, err error) {
_, err = util.Exec(sctx, "begin pessimistic")
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(sctx, err)
}()
version, err := util.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -404,13 +390,6 @@ func SaveStatsToStorage(sctx sessionctx.Context,
func SaveMetaToStorage(
sctx sessionctx.Context,
tableID, count, modifyCount int64) (statsVer uint64, err error) {
_, err = util.Exec(sctx, "begin")
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
err = util.FinishTransaction(sctx, err)
}()
version, err := util.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
tableID := results.TableID.GetStatisticsID()
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true)
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount i
statsVer, err = SaveStatsToStorage(sctx, tableID,
count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
}
Expand All @@ -251,7 +251,7 @@ func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, s
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
}
Expand All @@ -264,7 +264,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64,
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
}
Expand All @@ -277,7 +277,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
}
Expand All @@ -290,7 +290,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st
err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad)
return err
})
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ func InsertExtendedStats(sctx sessionctx.Context,
}
strColIDs := string(bytes)

_, err = statsutil.Exec(sctx, "begin pessimistic")
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(sctx, err)
}()
// No need to use `exec.ExecuteInternal` since we have acquired the lock.
rows, _, err := statsutil.ExecRows(sctx, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
if err != nil {
Expand Down Expand Up @@ -170,13 +163,6 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context,
return 0, nil
}

_, err = statsutil.Exec(sctx, "begin pessimistic")
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
err = statsutil.FinishTransaction(sctx, err)
}()
version, err := statsutil.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/statistics/handle/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type SessionPool interface {
Put(pools.Resource)
}

// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func FinishTransaction(sctx sessionctx.Context, err error) error {
// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(sctx sessionctx.Context, err error) error {
if err == nil {
_, _, err = ExecRows(sctx, "commit")
_, _, err = ExecRows(sctx, "COMMIT")
} else {
_, _, err1 := ExecRows(sctx, "rollback")
terror.Log(errors.Trace(err1))
Expand Down Expand Up @@ -174,11 +174,11 @@ func UpdateSCtxVarsForStats(sctx sessionctx.Context) error {
// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility.
func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) {
// TODO: check whether this sctx is already in a txn
if _, _, err := ExecRows(sctx, "begin"); err != nil {
if _, _, err := ExecRows(sctx, "BEGIN PESSIMISTIC"); err != nil {
return err
}
defer func() {
err = FinishTransaction(sctx, err)
err = finishTransaction(sctx, err)
}()
err = f(sctx)
return
Expand Down