Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add upper bound of retry for sync load #52658

Merged
merged 13 commits into from
Apr 22, 2024
4 changes: 3 additions & 1 deletion pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 4,
shard_count = 5,
deps = [
":syncload",
"//pkg/config",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/statistics/handle/types",
"//pkg/testkit",
"//pkg/util/mathutil",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
68 changes: 42 additions & 26 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
"go.uber.org/zap"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

type statsSyncLoad struct {
statsHandle statstypes.StatsHandle
StatsLoad statstypes.StatsLoad
Expand Down Expand Up @@ -204,6 +207,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
Expand All @@ -223,46 +229,59 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
return s.handleOneItemTask(sctx, task)
err := s.handleOneItemTask(sctx, task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
if !isVaildForRetry(task) {
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return an error here? Or does this mean we want to ignore the task immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to ignore the task immediately. If the task is essential, it will be inserted into the channel. we should keep a high QPS for HandleOneTask

}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
func isVaildForRetry(task *statstypes.NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the clean code. the origin's result *stmtctx.StatsLoadResult is unnecessary.

defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return result, nil
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -272,7 +291,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return result, nil
return nil
}
if col != nil {
wrapper.colInfo = col.Info
Expand All @@ -288,18 +307,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0),
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()),
}
if s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
}
return nil, nil
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return nil
}
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -311,10 +327,10 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) {
return result, nil
if needUpdate {
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
}
return nil, nil
return nil
}

// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
Expand Down Expand Up @@ -492,14 +508,14 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
// - If the stats is fully loaded,
// - If the stats is meta-loaded and we also just need the meta.
if ok && (c.IsFullLoad() || !fullLoaded) {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[item.ID] = colHist
Expand Down
91 changes: 91 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics/handle/syncload"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -206,6 +208,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
default:
}
Comment on lines +211 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the purpose of this change is. It seems I can still pass this test without your patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, there were no checks performed here, but now they have been added.


require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
Expand All @@ -229,3 +243,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.StatsLoadItem, 1)
neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := stmtctx.NewStmtCtx()
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)"))
var (
task1 *types.NeededItemTask
err1 error
)

for i := 0; i < syncload.RetryCount; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
}
1 change: 1 addition & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type NeededItemTask struct {
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Item model.StatsLoadItem
Retry int
}

// StatsLoad is used to load stats concurrently
Expand Down