Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add upper bound of retry for sync load #52658

Merged
merged 13 commits into from
Apr 22, 2024
3 changes: 2 additions & 1 deletion pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 4,
shard_count = 5,
deps = [
"//pkg/config",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/statistics/handle/types",
"//pkg/testkit",
"//pkg/util/mathutil",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
61 changes: 35 additions & 26 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,46 +223,58 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
return s.handleOneItemTask(sctx, task)
err := s.handleOneItemTask(sctx, task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
case sr := <-resultChan:
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
if !updateNeededItemTaskRetryCountAndCheck(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, result.Err
return task, sr.Err
case <-time.After(timeout):
if !updateNeededItemTaskRetryCountAndCheck(task) {
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return an error here? Or does this mean we want to ignore the task immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to ignore the task immediately. If the task is essential, it will be inserted into the channel. we should keep a high QPS for HandleOneTask

}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) bool {
func isVaildForRetry(task *statstypes.NeededItemTask) bool {

Or any more straight name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

task.Retry++
return task.Retry <= 3
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the clean code. the origin's result *stmtctx.StatsLoadResult is unnecessary.

defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return result, nil
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -272,7 +284,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return result, nil
return nil
}
if col != nil {
wrapper.colInfo = col.Info
Expand All @@ -288,18 +300,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0),
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()),
}
if s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
}
return nil, nil
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return nil
}
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -311,10 +320,10 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
if needUpdate {
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
}
return nil, nil
return nil
}

// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
Expand Down Expand Up @@ -492,14 +501,14 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
// - If the stats is fully loaded,
// - If the stats is meta-loaded and we also just need the meta.
if ok && (c.IsFullLoad() || !fullLoaded) {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[item.ID] = colHist
Expand Down
66 changes: 66 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -229,3 +230,68 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.StatsLoadItem, 1)
neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)"))
var (
task1 *types.NeededItemTask
err1 error
)

for i := 0; i < 3; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should define a const for it. Then we can use it for both code and test.

Copy link
Member Author

@hawkingrei hawkingrei Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some comments here.

whether to retry 3 times, depends on the duration of the retry. if the retry takes several microseconds, it can retry more times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I understand wrongly. You are right.

task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
}
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, task1)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type NeededItemTask struct {
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Item model.StatsLoadItem
Retry int
}

// StatsLoad is used to load stats concurrently
Expand Down