diff --git a/pkg/statistics/handle/handle_hist.go b/pkg/statistics/handle/handle_hist.go index a335c89bc8f89a..639628e6fffead 100644 --- a/pkg/statistics/handle/handle_hist.go +++ b/pkg/statistics/handle/handle_hist.go @@ -222,8 +222,15 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } else { task = lastTask } +<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) { return h.handleOneItemTask(sctx, task) +======= + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { + err := s.handleOneItemTask(task) + return nil, err +>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload.go }) timeout := time.Until(task.ToTimeout) select { @@ -242,13 +249,32 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask } } +<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { +======= +func isVaildForRetry(task *statstypes.NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err error) { + se, err := s.statsHandle.SPool().Get() + if err != nil { + return err + } + sctx := se.(sessionctx.Context) + sctx.GetSessionVars().StmtCtx.Priority = mysql.HighPriority +>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload.go defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) err = errors.Errorf("stats loading panicked: %v", r) } + if err == nil { // only recycle when no error + sctx.GetSessionVars().StmtCtx.Priority = mysql.NoPriority + s.statsHandle.SPool().Put(se) + } }() result = &stmtctx.StatsLoadResult{Item: task.TableItemID} item := result.Item diff --git a/pkg/statistics/handle/handle_hist_test.go b/pkg/statistics/handle/handle_hist_test.go index d210f225e15a86..08f3e718674fe4 100644 --- a/pkg/statistics/handle/handle_hist_test.go +++ b/pkg/statistics/handle/handle_hist_test.go @@ -229,3 +229,96 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } +<<<<<<< HEAD:pkg/statistics/handle/handle_hist_test.go +======= + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + h := dom.StatsHandle() + + neededColumns := make([]model.StatsLoadItem, 1) + neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *types.NeededItemTask + err1 error + ) + + for i := 0; i < syncload.RetryCount; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + } + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.NoError(t, err1) + require.Nil(t, result) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } + task1.Retry = 0 + for i := 0; i < syncload.RetryCount*5; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + task1.Retry = 0 + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) +} +>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload_test.go