From ba41bc95d57f8e58271a4d1139314b82bfb50b73 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 17 Apr 2024 14:02:53 +0800 Subject: [PATCH 01/13] statistics: add upper bound of retry for sync load Signed-off-by: Weizhen Wang --- .../handle/syncload/stats_syncload.go | 15 +++++++ .../handle/syncload/stats_syncload_test.go | 45 +++++++++++++++++++ pkg/statistics/handle/types/interfaces.go | 1 + 3 files changed, 61 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 3e9db7716182e..147b3b6caa169 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -232,6 +232,10 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty if result.Err == nil { slr := result.Val.(*stmtctx.StatsLoadResult) if slr.Error != nil { + if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { + logutil.BgLogger().Error("stats loading error", zap.Error(slr.Error)) + return nil, nil + } return task, slr.Error } task.ResultCh <- *slr @@ -239,11 +243,22 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } return task, result.Err case <-time.After(timeout): + if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { + return nil, nil + } task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) return task, nil } } +func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) bool { + task.Retry++ + if task.Retry > 3 { + return false + } + return true +} + func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { defer func() { // recover for each task, worker keeps working diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 01706d668d125..901f6e10fa48e 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -229,3 +229,48 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + h := dom.StatsHandle() + + neededColumns := make([]model.StatsLoadItem, 1) + neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + timeout := time.Nanosecond * mathutil.MaxInt + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) +} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index d3a4ee726306a..5c1b41d7fbd65 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -370,6 +370,7 @@ type NeededItemTask struct { ToTimeout time.Time ResultCh chan stmtctx.StatsLoadResult Item model.StatsLoadItem + Retry int } // StatsLoad is used to load stats concurrently From 6d906cdcd099e3554bb893895502945aba2705e7 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 17 Apr 2024 14:10:42 +0800 Subject: [PATCH 02/13] statistics: add upper bound of retry for sync load Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 147b3b6caa169..62dc8b2b7bcc6 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -253,10 +253,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) bool { task.Retry++ - if task.Retry > 3 { - return false - } - return true + return task.Retry <= 3 } func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { From 551c02c5ac7b1ebc50c2de5375daab534907f644 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 17 Apr 2024 14:58:26 +0800 Subject: [PATCH 03/13] update Signed-off-by: Weizhen Wang --- .../handle/syncload/stats_syncload.go | 4 ++++ .../handle/syncload/stats_syncload_test.go | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 62dc8b2b7bcc6..c518982e7f77e 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -241,6 +241,10 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty task.ResultCh <- *slr return nil, nil } + if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { + logutil.BgLogger().Error("stats loading error", zap.Error(result.Err)) + return nil, nil + } return task, result.Err case <-time.After(timeout): if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 901f6e10fa48e..7f19ff991b218 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/stretchr/testify/require" @@ -256,11 +257,14 @@ func TestRetry(t *testing.T) { tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) tableInfo := tbl.Meta() + h := dom.StatsHandle() neededColumns := make([]model.StatsLoadItem, 1) neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache h.Clear() require.NoError(t, dom.StatsHandle().Update(is)) @@ -273,4 +277,22 @@ func TestRetry(t *testing.T) { h.SendLoadRequests(stmtCtx1, neededColumns, timeout) stmtCtx2 := stmtctx.NewStmtCtx() h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *types.NeededItemTask + err1 error + ) + + for i := 0; i < 3; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + } + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.NoError(t, err1) + require.Nil(t, task1) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) + } From 72fa39d043ceeaea9618c627bfca1038b545d74c Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 17 Apr 2024 15:03:13 +0800 Subject: [PATCH 04/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/BUILD.bazel | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index 365fadbfeb6ab..89af409d74ec8 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -30,12 +30,13 @@ go_test( srcs = ["stats_syncload_test.go"], flaky = True, race = "on", - shard_count = 4, + shard_count = 5, deps = [ "//pkg/config", "//pkg/parser/model", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/types", "//pkg/testkit", "//pkg/util/mathutil", "@com_github_pingcap_failpoint//:failpoint", From 2f3c90c1dca3481beceb8f42476c092f4f257edf Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 17 Apr 2024 15:08:10 +0800 Subject: [PATCH 05/13] update Signed-off-by: Weizhen Wang --- .../handle/syncload/stats_syncload.go | 59 ++++++++----------- .../handle/syncload/stats_syncload_test.go | 1 - 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index c518982e7f77e..6eb65711284b6 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -223,31 +223,28 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } else { task = lastTask } + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { - return s.handleOneItemTask(sctx, task) + err := s.handleOneItemTask(sctx, task) + return nil, err }) timeout := time.Until(task.ToTimeout) select { - case result := <-resultChan: - if result.Err == nil { - slr := result.Val.(*stmtctx.StatsLoadResult) - if slr.Error != nil { - if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { - logutil.BgLogger().Error("stats loading error", zap.Error(slr.Error)) - return nil, nil - } - return task, slr.Error - } - task.ResultCh <- *slr + case sr := <-resultChan: + if sr.Err == nil { + task.ResultCh <- result return nil, nil } - if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { - logutil.BgLogger().Error("stats loading error", zap.Error(result.Err)) + if !updateNeededItemTaskRetryCountAndCheck(task) { + result.Error = sr.Err + task.ResultCh <- result return nil, nil } - return task, result.Err + return task, sr.Err case <-time.After(timeout): - if ok := updateNeededItemTaskRetryCountAndCheck(task); !ok { + if !updateNeededItemTaskRetryCountAndCheck(task) { + result.Error = errors.New("stats loading timeout") + task.ResultCh <- result return nil, nil } task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) @@ -260,7 +257,7 @@ func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) boo return task.Retry <= 3 } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -268,17 +265,16 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty err = errors.Errorf("stats loading panicked: %v", r) } }() - result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} - item := result.Item + item := task.Item.TableItemID tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - return result, nil + return nil } wrapper := &statsWrapper{} if item.IsIndex { index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) if !loadNeeded { - return result, nil + return nil } if index != nil { wrapper.idxInfo = index.Info @@ -288,7 +284,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } else { col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) if !loadNeeded { - return result, nil + return nil } if col != nil { wrapper.colInfo = col.Info @@ -304,18 +300,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0), IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()), } - if s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - return result, nil - } - return nil, nil + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + return nil } } t := time.Now() needUpdate := false wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) if err != nil { - result.Error = err - return result, err + return err } if item.IsIndex { if wrapper.idxInfo != nil { @@ -327,10 +320,10 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - return result, nil + if needUpdate { + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) } - return nil, nil + return nil } // readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously @@ -508,14 +501,14 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - return true + return false } if !item.IsIndex && colHist != nil { c, ok := tbl.Columns[item.ID] // - If the stats is fully loaded, // - If the stats is meta-loaded and we also just need the meta. if ok && (c.IsFullLoad() || !fullLoaded) { - return true + return false } tbl = tbl.Copy() tbl.Columns[item.ID] = colHist diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 7f19ff991b218..85cc46f37f553 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -294,5 +294,4 @@ func TestRetry(t *testing.T) { require.NoError(t, err1) require.Nil(t, task1) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) - } From f2d5deff917eef732a590f7ccfb6ed89fd37d61b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 18 Apr 2024 15:32:27 +0800 Subject: [PATCH 06/13] update Signed-off-by: Weizhen Wang --- .../handle/syncload/stats_syncload_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 85cc46f37f553..140619f2e2de6 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -289,9 +289,21 @@ func TestRetry(t *testing.T) { task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.Error(t, err1) require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } } task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.NoError(t, err1) require.Nil(t, task1) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) } From 8fe07e229f2ed0f7afdcf4624f903776cad2b28c Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 18 Apr 2024 15:58:02 +0800 Subject: [PATCH 07/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 140619f2e2de6..9db978fb5c51e 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -296,9 +296,9 @@ func TestRetry(t *testing.T) { default: } } - task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.NoError(t, err1) - require.Nil(t, task1) + require.Nil(t, result) select { case <-task1.ResultCh: default: From 534ef1d61ce636209e68e5a9a0a251e65849d8c3 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 18 Apr 2024 16:06:14 +0800 Subject: [PATCH 08/13] update Signed-off-by: Weizhen Wang --- .../handle/syncload/stats_syncload_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 9db978fb5c51e..bbac50a8ee473 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -207,6 +207,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Error(t, err1) require.NotNil(t, task1) + select { + case <-stmtCtx1.StatsLoad.ResultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + case <-stmtCtx2.StatsLoad.ResultCh: + t.Logf("stmtCtx2.ResultCh should not get anything") + t.FailNow() + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get anything") + t.FailNow() + default: + } require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) From ead4a4852980cef6341a25f13fdbca9bb3fd4e4f Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 18 Apr 2024 16:18:44 +0800 Subject: [PATCH 09/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 6eb65711284b6..6e55f155915b3 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -204,6 +204,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{ } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working @@ -231,6 +234,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty timeout := time.Until(task.ToTimeout) select { case sr := <-resultChan: + // sr.Val is always nil. if sr.Err == nil { task.ResultCh <- result return nil, nil From 6ad0da035c11ee330f57a2f83b48ac7e39108870 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 21 Apr 2024 19:34:40 +0800 Subject: [PATCH 10/13] update0419 Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 6e55f155915b3..29721faf08f09 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -239,14 +239,14 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty task.ResultCh <- result return nil, nil } - if !updateNeededItemTaskRetryCountAndCheck(task) { + if !isVaildForRetry(task) { result.Error = sr.Err task.ResultCh <- result return nil, nil } return task, sr.Err case <-time.After(timeout): - if !updateNeededItemTaskRetryCountAndCheck(task) { + if !isVaildForRetry(task) { result.Error = errors.New("stats loading timeout") task.ResultCh <- result return nil, nil @@ -256,7 +256,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } } -func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) bool { +func isVaildForRetry(task *statstypes.NeededItemTask) bool { task.Retry++ return task.Retry <= 3 } From 30fa2794dee8c9cd2af00966e0331d0c8eadf31e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 21 Apr 2024 22:53:16 +0800 Subject: [PATCH 11/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index bbac50a8ee473..829ac5d462d55 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -297,6 +297,7 @@ func TestRetry(t *testing.T) { err1 error ) + // only 3 retries, too manh retries will take too much time for i := 0; i < 3; i++ { task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.Error(t, err1) From fee47d880e0f9aaa68d347d2cc4d4bf28a6818e1 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 22 Apr 2024 18:41:29 +0800 Subject: [PATCH 12/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/BUILD.bazel | 1 + pkg/statistics/handle/syncload/stats_syncload.go | 5 ++++- pkg/statistics/handle/syncload/stats_syncload_test.go | 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index 89af409d74ec8..ed6e310786a2a 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -32,6 +32,7 @@ go_test( race = "on", shard_count = 5, deps = [ + ":syncload", "//pkg/config", "//pkg/parser/model", "//pkg/sessionctx", diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 29721faf08f09..723a2d77b3084 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -36,6 +36,9 @@ import ( "go.uber.org/zap" ) +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + type statsSyncLoad struct { statsHandle statstypes.StatsHandle StatsLoad statstypes.StatsLoad @@ -258,7 +261,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty func isVaildForRetry(task *statstypes.NeededItemTask) bool { task.Retry++ - return task.Retry <= 3 + return task.Retry <= RetryCount } func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 829ac5d462d55..57f4294467414 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/mathutil" @@ -298,7 +299,7 @@ func TestRetry(t *testing.T) { ) // only 3 retries, too manh retries will take too much time - for i := 0; i < 3; i++ { + for i := 0; i < syncload.RetryCount; i++ { task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.Error(t, err1) require.NotNil(t, task1) From e9d8b6fdc1f4b71852600930c555b96561fdedba Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 22 Apr 2024 18:43:25 +0800 Subject: [PATCH 13/13] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 57f4294467414..e20b615b7e79f 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -298,7 +298,6 @@ func TestRetry(t *testing.T) { err1 error ) - // only 3 retries, too manh retries will take too much time for i := 0; i < syncload.RetryCount; i++ { task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) require.Error(t, err1)