Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52830
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed May 16, 2024
1 parent 1f29133 commit 008a67e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pkg/statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,15 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
} else {
task = lastTask
}
<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(sctx, task)
=======
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
err := s.handleOneItemTask(task)
return nil, err
>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload.go
})
timeout := time.Until(task.ToTimeout)
select {
Expand All @@ -242,13 +249,32 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
}
}

<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go
func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
=======
func isVaildForRetry(task *statstypes.NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err error) {
se, err := s.statsHandle.SPool().Get()
if err != nil {
return err
}
sctx := se.(sessionctx.Context)
sctx.GetSessionVars().StmtCtx.Priority = mysql.HighPriority
>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload.go
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
if err == nil { // only recycle when no error
sctx.GetSessionVars().StmtCtx.Priority = mysql.NoPriority
s.statsHandle.SPool().Put(se)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
Expand Down
93 changes: 93 additions & 0 deletions pkg/statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,96 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}
<<<<<<< HEAD:pkg/statistics/handle/handle_hist_test.go
=======

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.StatsLoadItem, 1)
neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)"))
var (
task1 *types.NeededItemTask
err1 error
)

for i := 0; i < syncload.RetryCount; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
task1.Retry = 0
for i := 0; i < syncload.RetryCount*5; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
task1.Retry = 0
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
}
>>>>>>> 8629068c0f0 (*: avoid concurrently using the session in the syncload (#52830)):pkg/statistics/handle/syncload/stats_syncload_test.go

0 comments on commit 008a67e

Please sign in to comment.