Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <[email protected]>
  • Loading branch information
hawkingrei committed Apr 18, 2024
1 parent 4920264 commit 3b57e4c
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -224,30 +223,30 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
result, err := s.handleOneItemTask(sctx, task)
intest.Assert(result != nil)
return result, err
err := s.handleOneItemTask(sctx, task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
slr := result.Val.(*stmtctx.StatsLoadResult)
if result.Err == nil {
case sr := <-resultChan:
if sr.Err == nil {
// result.Val is nil means the task doesn't update. so we don't need to send the result to the channel.
task.ResultCh <- *slr
task.ResultCh <- result
return nil, nil
}
if !updateNeededItemTaskRetryCountAndCheck(task) {
// result.Val is nil means the task doesn't update. so we don't need to send the result to the channel.
slr.Error = result.Err
task.ResultCh <- *slr
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, result.Err
return task, sr.Err
case <-time.After(timeout):
if !updateNeededItemTaskRetryCountAndCheck(task) {
task.ResultCh <- stmtctx.StatsLoadResult{Item: task.Item.TableItemID, Error: errors.New("stats loading timeout")}
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
Expand All @@ -260,25 +259,24 @@ func updateNeededItemTaskRetryCountAndCheck(task *statstypes.NeededItemTask) boo
return task.Retry <= 3
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
item := result.Item
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return result, nil
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
Expand All @@ -288,7 +286,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return result, nil
return nil
}
if col != nil {
wrapper.colInfo = col.Info
Expand All @@ -305,15 +303,14 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()),
}
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return result, nil
return nil
}
}
t := time.Now()
needUpdate := false
wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idxInfo != nil {
Expand All @@ -328,7 +325,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty
if needUpdate {
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
}
return result, nil
return nil
}

// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
Expand Down

0 comments on commit 3b57e4c

Please sign in to comment.