From 438c3d0e64bef907895637232c8b4544ea78eee3 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Fri, 20 Oct 2023 14:09:40 +0800 Subject: [PATCH 1/3] fixup --- .../handle/history/history_stats.go | 10 +-------- pkg/statistics/handle/storage/gc.go | 8 +------ pkg/statistics/handle/storage/json.go | 7 ------- pkg/statistics/handle/storage/save.go | 21 ------------------- .../handle/storage/stats_read_writer.go | 12 +++++------ pkg/statistics/handle/storage/update.go | 14 ------------- pkg/statistics/handle/util/util.go | 6 +++--- 7 files changed, 11 insertions(+), 67 deletions(-) diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 943186843a9a1..690a265e05eff 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -77,7 +77,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin } err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { return RecordHistoricalStatsMeta(sctx, tableID, version, source) - }) + }, util.FlagWrapTxn) if err != nil { // just log the error, hide the error from the outside caller. logutil.BgLogger().Error("record historical stats meta failed", zap.Int64("table-id", tableID), @@ -113,14 +113,6 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u } modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) - _, err = util.Exec(sctx, "begin pessimistic") - if err != nil { - return errors.Trace(err) - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() - const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil { return errors.Trace(err) diff --git a/pkg/statistics/handle/storage/gc.go b/pkg/statistics/handle/storage/gc.go index de6a788de687e..28320718a3463 100644 --- a/pkg/statistics/handle/storage/gc.go +++ b/pkg/statistics/handle/storage/gc.go @@ -388,16 +388,10 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context, logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID)) } - _, err = util.Exec(sctx, "begin pessimistic") - if err != nil { - return 0, errors.Trace(err) - } defer func() { - err1 := util.FinishTransaction(sctx, err) - if err == nil && err1 == nil { + if err == nil { removeExtendedStatsItem(statsCache, tableID, statsName) } - err = err1 }() version, err := util.GetStartTS(sctx) if err != nil { diff --git a/pkg/statistics/handle/storage/json.go b/pkg/statistics/handle/storage/json.go index 020383efb3521..af90c56d3d657 100644 --- a/pkg/statistics/handle/storage/json.go +++ b/pkg/statistics/handle/storage/json.go @@ -270,13 +270,6 @@ func BlocksToJSONTable(blocks [][]byte) (*util.JSONTable, error) { // TableHistoricalStatsToJSON converts the historical stats of a table to JSONTable. func TableHistoricalStatsToJSON(sctx sessionctx.Context, physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { - if _, err := util.Exec(sctx, "begin"); err != nil { - return nil, false, err - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() - // get meta version rows, _, err := util.ExecRows(sctx, "select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) if err != nil { diff --git a/pkg/statistics/handle/storage/save.go b/pkg/statistics/handle/storage/save.go index 8ff6660728028..b56c819ec2d41 100644 --- a/pkg/statistics/handle/storage/save.go +++ b/pkg/statistics/handle/storage/save.go @@ -125,13 +125,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, needDumpFMS := results.TableID.IsPartitionTable() tableID := results.TableID.GetStatisticsID() ctx := util.StatsCtx - _, err = util.Exec(sctx, "begin pessimistic") - if err != nil { - return 0, err - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() txn, err := sctx.Txn(true) if err != nil { return 0, err @@ -334,13 +327,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, func SaveStatsToStorage(sctx sessionctx.Context, tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool) (statsVer uint64, err error) { - _, err = util.Exec(sctx, "begin pessimistic") - if err != nil { - return 0, errors.Trace(err) - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() version, err := util.GetStartTS(sctx) if err != nil { return 0, errors.Trace(err) @@ -404,13 +390,6 @@ func SaveStatsToStorage(sctx sessionctx.Context, func SaveMetaToStorage( sctx sessionctx.Context, tableID, count, modifyCount int64) (statsVer uint64, err error) { - _, err = util.Exec(sctx, "begin") - if err != nil { - return 0, errors.Trace(err) - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() version, err := util.GetStartTS(sctx) if err != nil { return 0, errors.Trace(err) diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go index 3c968224a6cfd..998dcac551cff 100644 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -196,7 +196,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { tableID := results.TableID.GetStatisticsID() s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) @@ -238,7 +238,7 @@ func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount i statsVer, err = SaveStatsToStorage(sctx, tableID, count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) } @@ -251,7 +251,7 @@ func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, s err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) } @@ -264,7 +264,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) } @@ -277,7 +277,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) } @@ -290,7 +290,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) return err - }) + }, util.FlagWrapTxn) if err == nil && statsVer != 0 { s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) } diff --git a/pkg/statistics/handle/storage/update.go b/pkg/statistics/handle/storage/update.go index 50ca730eeeb2c..bc304ff596e90 100644 --- a/pkg/statistics/handle/storage/update.go +++ b/pkg/statistics/handle/storage/update.go @@ -111,13 +111,6 @@ func InsertExtendedStats(sctx sessionctx.Context, } strColIDs := string(bytes) - _, err = statsutil.Exec(sctx, "begin pessimistic") - if err != nil { - return 0, errors.Trace(err) - } - defer func() { - err = statsutil.FinishTransaction(sctx, err) - }() // No need to use `exec.ExecuteInternal` since we have acquired the lock. rows, _, err := statsutil.ExecRows(sctx, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) if err != nil { @@ -170,13 +163,6 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context, return 0, nil } - _, err = statsutil.Exec(sctx, "begin pessimistic") - if err != nil { - return 0, errors.Trace(err) - } - defer func() { - err = statsutil.FinishTransaction(sctx, err) - }() version, err := statsutil.GetStartTS(sctx) if err != nil { return 0, errors.Trace(err) diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go index ef47f4451b2b4..19962af8f3556 100644 --- a/pkg/statistics/handle/util/util.go +++ b/pkg/statistics/handle/util/util.go @@ -66,8 +66,8 @@ type SessionPool interface { Put(pools.Resource) } -// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. -func FinishTransaction(sctx sessionctx.Context, err error) error { +// finishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func finishTransaction(sctx sessionctx.Context, err error) error { if err == nil { _, _, err = ExecRows(sctx, "commit") } else { @@ -178,7 +178,7 @@ func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (er return err } defer func() { - err = FinishTransaction(sctx, err) + err = finishTransaction(sctx, err) }() err = f(sctx) return From 6feff70dbdf10a585d176cba74cdbc4fe651f840 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Fri, 20 Oct 2023 14:58:20 +0800 Subject: [PATCH 2/3] fixup --- pkg/statistics/handle/util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go index 19962af8f3556..0299e9a17ea9b 100644 --- a/pkg/statistics/handle/util/util.go +++ b/pkg/statistics/handle/util/util.go @@ -174,7 +174,7 @@ func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { // WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { // TODO: check whether this sctx is already in a txn - if _, _, err := ExecRows(sctx, "begin"); err != nil { + if _, _, err := ExecRows(sctx, "BEGIN PESSIMISTIC"); err != nil { return err } defer func() { From aa4c8668c42ac68d80e4c1bbed1447e2c91f2445 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 20 Oct 2023 16:43:26 +0800 Subject: [PATCH 3/3] Update pkg/statistics/handle/util/util.go Co-authored-by: Rustin Liu --- pkg/statistics/handle/util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go index 0299e9a17ea9b..202791b6a2305 100644 --- a/pkg/statistics/handle/util/util.go +++ b/pkg/statistics/handle/util/util.go @@ -69,7 +69,7 @@ type SessionPool interface { // finishTransaction will execute `commit` when error is nil, otherwise `rollback`. func finishTransaction(sctx sessionctx.Context, err error) error { if err == nil { - _, _, err = ExecRows(sctx, "commit") + _, _, err = ExecRows(sctx, "COMMIT") } else { _, _, err1 := ExecRows(sctx, "rollback") terror.Log(errors.Trace(err1))