From d3e3ce41921aca0d0847978713f90bc8fb30f14c Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 19:22:08 +0800 Subject: [PATCH 01/12] fixup --- pkg/statistics/handle/handle_hist.go | 478 ----------------- .../handle/syncload/stats_syncload.go | 496 ++++++++++++++++++ 2 files changed, 496 insertions(+), 478 deletions(-) create mode 100644 pkg/statistics/handle/syncload/stats_syncload.go diff --git a/pkg/statistics/handle/handle_hist.go b/pkg/statistics/handle/handle_hist.go index 924fda56698b0..abe36f6ff344d 100644 --- a/pkg/statistics/handle/handle_hist.go +++ b/pkg/statistics/handle/handle_hist.go @@ -13,481 +13,3 @@ // limitations under the License. package handle - -import ( - "fmt" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/storage" - utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" -) - -type statsWrapper struct { - col *statistics.Column - idx *statistics.Index -} - -// StatsLoad is used to load stats concurrently -type StatsLoad struct { - NeededItemsCh chan *NeededItemTask - TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult - SubCtxs []sessionctx.Context - sync.Mutex -} - -// NeededItemTask represents one needed column/indices with expire time. -type NeededItemTask struct { - ToTimeout time.Time - ResultCh chan stmtctx.StatsLoadResult - TableItemID model.TableItemID -} - -// SendLoadRequests send neededColumns requests -func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error { - remainedItems := h.removeHistLoadedColumns(neededHistItems) - - failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { - if sc.OptimizeTracer != nil { - count := val.(int) - if len(remainedItems) != count { - panic("remained items count wrong") - } - } - }) - - if len(remainedItems) <= 0 { - return nil - } - sc.StatsLoad.Timeout = timeout - sc.StatsLoad.NeededItems = remainedItems - sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) - tasks := make([]*NeededItemTask, 0) - for _, item := range remainedItems { - task := &NeededItemTask{ - TableItemID: item, - ToTimeout: time.Now().Local().Add(timeout), - ResultCh: sc.StatsLoad.ResultCh, - } - tasks = append(tasks, task) - } - timer := time.NewTimer(timeout) - defer timer.Stop() - for _, task := range tasks { - select { - case h.StatsLoad.NeededItemsCh <- task: - continue - case <-timer.C: - return errors.New("sync load stats channel is full and timeout sending task to channel") - } - } - sc.StatsLoad.LoadStartTime = time.Now() - return nil -} - -// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout -func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { - if len(sc.StatsLoad.NeededItems) <= 0 { - return nil - } - var errorMsgs []string - defer func() { - if len(errorMsgs) > 0 { - logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", - zap.Strings("errors", errorMsgs)) - } - sc.StatsLoad.NeededItems = nil - }() - resultCheckMap := map[model.TableItemID]struct{}{} - for _, col := range sc.StatsLoad.NeededItems { - resultCheckMap[col] = struct{}{} - } - metrics.SyncLoadCounter.Inc() - timer := time.NewTimer(sc.StatsLoad.Timeout) - defer timer.Stop() - for { - select { - case result, ok := <-sc.StatsLoad.ResultCh: - if !ok { - return errors.New("sync load stats channel closed unexpectedly") - } - if result.HasError() { - errorMsgs = append(errorMsgs, result.ErrorMsg()) - } - delete(resultCheckMap, result.Item) - if len(resultCheckMap) == 0 { - metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) - return nil - } - case <-timer.C: - metrics.SyncLoadTimeoutCounter.Inc() - return errors.New("sync load stats timeout") - } - } -} - -// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. -func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID { - remainedItems := make([]model.TableItemID, 0, len(neededItems)) - for _, item := range neededItems { - tbl, ok := h.Get(item.TableID) - if !ok { - continue - } - if item.IsIndex { - remainedItems = append(remainedItems, item) - continue - } - colHist, ok := tbl.Columns[item.ID] - if ok && colHist.IsStatsInitialized() && !colHist.IsFullLoad() { - remainedItems = append(remainedItems, item) - } - } - return remainedItems -} - -// AppendNeededItem appends needed columns/indices to ch, it is only used for test -func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case h.StatsLoad.NeededItemsCh <- task: - case <-timer.C: - return errors.New("Channel is full and timeout writing to channel") - } - return nil -} - -var errExit = errors.New("Stop loading since domain is closed") - -// SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { - defer func() { - exitWg.Done() - logutil.BgLogger().Info("SubLoadWorker exited.") - }() - // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry - var lastTask *NeededItemTask - for { - task, err := h.HandleOneTask(sctx, lastTask, exit) - lastTask = task - if err != nil { - switch err { - case errExit: - return - default: - time.Sleep(h.Lease() / 10) - continue - } - } - } -} - -// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. -func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) { - defer func() { - // recover for each task, worker keeps working - if r := recover(); r != nil { - logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) - err = errors.Errorf("stats loading panicked: %v", r) - } - }() - if lastTask == nil { - task, err = h.drainColTask(exit) - if err != nil { - if err != errExit { - logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) - } - return task, err - } - } else { - task = lastTask - } - return h.handleOneItemTask(sctx, task) -} - -func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) { - result := stmtctx.StatsLoadResult{Item: task.TableItemID} - item := result.Item - tbl, ok := h.Get(item.TableID) - if !ok { - h.writeToResultChan(task.ResultCh, result) - return nil, nil - } - var err error - wrapper := &statsWrapper{} - if item.IsIndex { - index, ok := tbl.Indices[item.ID] - if !ok || index.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) - return nil, nil - } - wrapper.idx = index - } else { - col, ok := tbl.Columns[item.ID] - if !ok || col.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) - return nil, nil - } - wrapper.col = col - } - // to avoid duplicated handling in concurrent scenario - working := h.setWorking(result.Item, task.ResultCh) - if !working { - h.writeToResultChan(task.ResultCh, result) - return nil, nil - } - t := time.Now() - needUpdate := false - wrapper, err = h.readStatsForOneItem(sctx, item, wrapper) - if err != nil { - result.Error = err - return task, err - } - if item.IsIndex { - if wrapper.idx != nil { - needUpdate = true - } - } else { - if wrapper.col != nil { - needUpdate = true - } - } - metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - h.writeToResultChan(task.ResultCh, result) - } - h.finishWorking(result) - return nil, nil -} - -// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously -func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { - failpoint.Inject("mockReadStatsForOnePanic", nil) - failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) - } - }) - c := w.col - index := w.idx - loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - var hg *statistics.Histogram - var err error - isIndexFlag := int64(0) - if item.IsIndex { - isIndexFlag = 1 - } - if item.IsIndex { - hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), index.Histogram.NDV, int(isIndexFlag), index.LastUpdateVersion, index.NullCount, index.TotColSize, index.Correlation) - if err != nil { - return nil, errors.Trace(err) - } - } else { - hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &c.Info.FieldType, c.Histogram.NDV, int(isIndexFlag), c.LastUpdateVersion, c.NullCount, c.TotColSize, c.Correlation) - if err != nil { - return nil, errors.Trace(err) - } - } - var cms *statistics.CMSketch - var topN *statistics.TopN - cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) - if err != nil { - return nil, errors.Trace(err) - } - var fms *statistics.FMSketch - if loadFMSketch { - fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) - if err != nil { - return nil, errors.Trace(err) - } - } - rows, _, err := utilstats.ExecRows(sctx, "select stats_ver from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", item.TableID, item.ID, int(isIndexFlag)) - if err != nil { - return nil, errors.Trace(err) - } - if len(rows) == 0 { - logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", item.TableID), - zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) - return nil, errors.Trace(fmt.Errorf("fail to get stats version for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) - } - statsVer := rows[0].GetInt64(0) - if item.IsIndex { - idxHist := &statistics.Index{ - Histogram: *hg, - CMSketch: cms, - TopN: topN, - FMSketch: fms, - Info: index.Info, - StatsVer: statsVer, - Flag: index.Flag, - PhysicalID: index.PhysicalID, - } - if statsVer != statistics.Version0 { - idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - } - index.LastAnalyzePos.Copy(&idxHist.LastAnalyzePos) - w.idx = idxHist - } else { - colHist := &statistics.Column{ - PhysicalID: item.TableID, - Histogram: *hg, - Info: c.Info, - CMSketch: cms, - TopN: topN, - FMSketch: fms, - IsHandle: c.IsHandle, - StatsVer: statsVer, - } - if colHist.StatsAvailable() { - colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() - } - w.col = colHist - } - return w, nil -} - -// drainColTask will hang until a column task can return, and either task or error will be returned. -func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { - // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh - for { - select { - case <-exit: - return nil, errExit - case task, ok := <-h.StatsLoad.NeededItemsCh: - if !ok { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") - } - // if the task has already timeout, no sql is sync-waiting for it, - // so do not handle it just now, put it to another channel with lower priority - if time.Now().After(task.ToTimeout) { - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) - continue - } - return task, nil - case task, ok := <-h.StatsLoad.TimeoutItemsCh: - select { - case <-exit: - return nil, errExit - case task0, ok0 := <-h.StatsLoad.NeededItemsCh: - if !ok0 { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") - } - // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) - return task0, nil - default: - if !ok { - return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") - } - // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh - return task, nil - } - } - } -} - -// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. -func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemTask) { - select { - case taskCh <- task: - default: - } -} - -// writeToChanWithTimeout writes a task to a channel and blocks until timeout. -func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededItemTask, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case taskCh <- task: - case <-timer.C: - return errors.New("Channel is full and timeout writing to channel") - } - return nil -} - -// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (*Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { - defer func() { - if r := recover(); r != nil { - logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) - } - }() - select { - case resultCh <- rs: - default: - } -} - -// updateCachedItem updates the column/index hist to global statsCache. -func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions - // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. - tbl, ok := h.Get(item.TableID) - if !ok { - return true - } - if !item.IsIndex && colHist != nil { - c, ok := tbl.Columns[item.ID] - if !ok || c.IsFullLoad() { - return true - } - tbl = tbl.Copy() - tbl.Columns[c.ID] = colHist - } else if item.IsIndex && idxHist != nil { - index, ok := tbl.Indices[item.ID] - if !ok || index.IsFullLoad() { - return true - } - tbl = tbl.Copy() - tbl.Indices[item.ID] = idxHist - } - h.UpdateStatsCache([]*statistics.Table{tbl}, nil) - return true -} - -func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - chList, ok := h.StatsLoad.WorkingColMap[item] - if ok { - if chList[0] == resultCh { - return true // just return for duplicate setWorking - } - h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) - return false - } - chList = []chan stmtctx.StatsLoadResult{} - chList = append(chList, resultCh) - h.StatsLoad.WorkingColMap[item] = chList - return true -} - -func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { - list := chList[1:] - for _, ch := range list { - h.writeToResultChan(ch, result) - } - } - delete(h.StatsLoad.WorkingColMap, result.Item) -} diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go new file mode 100644 index 0000000000000..0ca5fcda33ae0 --- /dev/null +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -0,0 +1,496 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncload + +import ( + "fmt" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +type statsSyncLoad struct { + statsHandle utilstats.StatsHandle + StatsLoad StatsLoad +} + +type statsWrapper struct { + col *statistics.Column + idx *statistics.Index +} + +// StatsLoad is used to load stats concurrently +type StatsLoad struct { + NeededItemsCh chan *NeededItemTask + TimeoutItemsCh chan *NeededItemTask + WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult + SubCtxs []sessionctx.Context + sync.Mutex +} + +// NeededItemTask represents one needed column/indices with expire time. +type NeededItemTask struct { + ToTimeout time.Time + ResultCh chan stmtctx.StatsLoadResult + TableItemID model.TableItemID +} + +// SendLoadRequests send neededColumns requests +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) + + failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { + if sc.OptimizeTracer != nil { + count := val.(int) + if len(remainedItems) != count { + panic("remained items count wrong") + } + } + }) + + if len(remainedItems) <= 0 { + return nil + } + sc.StatsLoad.Timeout = timeout + sc.StatsLoad.NeededItems = remainedItems + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + tasks := make([]*NeededItemTask, 0) + for _, item := range remainedItems { + task := &NeededItemTask{ + TableItemID: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + tasks = append(tasks, task) + } + timer := time.NewTimer(timeout) + defer timer.Stop() + for _, task := range tasks { + select { + case s.StatsLoad.NeededItemsCh <- task: + continue + case <-timer.C: + return errors.New("sync load stats channel is full and timeout sending task to channel") + } + } + sc.StatsLoad.LoadStartTime = time.Now() + return nil +} + +// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout +func (s *statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { + if len(sc.StatsLoad.NeededItems) <= 0 { + return nil + } + var errorMsgs []string + defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } + sc.StatsLoad.NeededItems = nil + }() + resultCheckMap := map[model.TableItemID]struct{}{} + for _, col := range sc.StatsLoad.NeededItems { + resultCheckMap[col] = struct{}{} + } + metrics.SyncLoadCounter.Inc() + timer := time.NewTimer(sc.StatsLoad.Timeout) + defer timer.Stop() + for { + select { + case result, ok := <-sc.StatsLoad.ResultCh: + if !ok { + return errors.New("sync load stats channel closed unexpectedly") + } + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + case <-timer.C: + metrics.SyncLoadTimeoutCounter.Inc() + return errors.New("sync load stats timeout") + } + } +} + +// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID { + remainedItems := make([]model.TableItemID, 0, len(neededItems)) + for _, item := range neededItems { + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + continue + } + if item.IsIndex { + remainedItems = append(remainedItems, item) + continue + } + colHist, ok := tbl.Columns[item.ID] + if ok && colHist.IsStatsInitialized() && !colHist.IsFullLoad() { + remainedItems = append(remainedItems, item) + } + } + return remainedItems +} + +// AppendNeededItem appends needed columns/indices to ch, it is only used for test +func (s *statsSyncLoad) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case s.StatsLoad.NeededItemsCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +var errExit = errors.New("Stop loading since domain is closed") + +// SubLoadWorker loads hist data for each column +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { + defer func() { + exitWg.Done() + logutil.BgLogger().Info("SubLoadWorker exited.") + }() + // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry + var lastTask *NeededItemTask + for { + task, err := s.HandleOneTask(sctx, lastTask, exit) + lastTask = task + if err != nil { + switch err { + case errExit: + return + default: + time.Sleep(s.statsHandle.Lease() / 10) + continue + } + } + } +} + +// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + if lastTask == nil { + task, err = s.drainColTask(exit) + if err != nil { + if err != errExit { + logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) + } + return task, err + } + } else { + task = lastTask + } + return s.handleOneItemTask(sctx, task) +} + +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) { + result := stmtctx.StatsLoadResult{Item: task.TableItemID} + item := result.Item + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + h.writeToResultChan(task.ResultCh, result) + return nil, nil + } + var err error + wrapper := &statsWrapper{} + if item.IsIndex { + index, ok := tbl.Indices[item.ID] + if !ok || index.IsFullLoad() { + h.writeToResultChan(task.ResultCh, result) + return nil, nil + } + wrapper.idx = index + } else { + col, ok := tbl.Columns[item.ID] + if !ok || col.IsFullLoad() { + h.writeToResultChan(task.ResultCh, result) + return nil, nil + } + wrapper.col = col + } + // to avoid duplicated handling in concurrent scenario + working := h.setWorking(result.Item, task.ResultCh) + if !working { + h.writeToResultChan(task.ResultCh, result) + return nil, nil + } + t := time.Now() + needUpdate := false + wrapper, err = h.readStatsForOneItem(sctx, item, wrapper) + if err != nil { + result.Error = err + return task, err + } + if item.IsIndex { + if wrapper.idx != nil { + needUpdate = true + } + } else { + if wrapper.col != nil { + needUpdate = true + } + } + metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) + if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { + h.writeToResultChan(task.ResultCh, result) + } + h.finishWorking(result) + return nil, nil +} + +// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously +func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { + failpoint.Inject("mockReadStatsForOnePanic", nil) + failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) + } + }) + c := w.col + index := w.idx + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + var hg *statistics.Histogram + var err error + isIndexFlag := int64(0) + if item.IsIndex { + isIndexFlag = 1 + } + if item.IsIndex { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), index.Histogram.NDV, int(isIndexFlag), index.LastUpdateVersion, index.NullCount, index.TotColSize, index.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } else { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &c.Info.FieldType, c.Histogram.NDV, int(isIndexFlag), c.LastUpdateVersion, c.NullCount, c.TotColSize, c.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } + var cms *statistics.CMSketch + var topN *statistics.TopN + cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + var fms *statistics.FMSketch + if loadFMSketch { + fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + } + rows, _, err := utilstats.ExecRows(sctx, "select stats_ver from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", item.TableID, item.ID, int(isIndexFlag)) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", item.TableID), + zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) + return nil, errors.Trace(fmt.Errorf("fail to get stats version for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) + } + statsVer := rows[0].GetInt64(0) + if item.IsIndex { + idxHist := &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + Info: index.Info, + StatsVer: statsVer, + Flag: index.Flag, + PhysicalID: index.PhysicalID, + } + if statsVer != statistics.Version0 { + idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + index.LastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + w.idx = idxHist + } else { + colHist := &statistics.Column{ + PhysicalID: item.TableID, + Histogram: *hg, + Info: c.Info, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: c.IsHandle, + StatsVer: statsVer, + } + if colHist.StatsAvailable() { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + w.col = colHist + } + return w, nil +} + +// drainColTask will hang until a column task can return, and either task or error will be returned. +func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*NeededItemTask, error) { + // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh + for { + select { + case <-exit: + return nil, errExit + case task, ok := <-h.StatsLoad.NeededItemsCh: + if !ok { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // if the task has already timeout, no sql is sync-waiting for it, + // so do not handle it just now, put it to another channel with lower priority + if time.Now().After(task.ToTimeout) { + h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + continue + } + return task, nil + case task, ok := <-h.StatsLoad.TimeoutItemsCh: + select { + case <-exit: + return nil, errExit + case task0, ok0 := <-h.StatsLoad.NeededItemsCh: + if !ok0 { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh + h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + return task0, nil + default: + if !ok { + return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + } + // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + return task, nil + } + } + } +} + +// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. +func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemTask) { + select { + case taskCh <- task: + default: + } +} + +// writeToChanWithTimeout writes a task to a channel and blocks until timeout. +func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case taskCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. +func (s *statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) + } + }() + select { + case resultCh <- rs: + default: + } +} + +// updateCachedItem updates the column/index hist to global statsCache. +func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return true + } + if !item.IsIndex && colHist != nil { + c, ok := tbl.Columns[item.ID] + if !ok || c.IsFullLoad() { + return true + } + tbl = tbl.Copy() + tbl.Columns[c.ID] = colHist + } else if item.IsIndex && idxHist != nil { + index, ok := tbl.Indices[item.ID] + if !ok || index.IsFullLoad() { + return true + } + tbl = tbl.Copy() + tbl.Indices[item.ID] = idxHist + } + h.UpdateStatsCache([]*statistics.Table{tbl}, nil) + return true +} + +func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { + h.StatsLoad.Lock() + defer h.StatsLoad.Unlock() + chList, ok := h.StatsLoad.WorkingColMap[item] + if ok { + if chList[0] == resultCh { + return true // just return for duplicate setWorking + } + h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) + return false + } + chList = []chan stmtctx.StatsLoadResult{} + chList = append(chList, resultCh) + h.StatsLoad.WorkingColMap[item] = chList + return true +} + +func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) { + h.StatsLoad.Lock() + defer h.StatsLoad.Unlock() + if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { + list := chList[1:] + for _, ch := range list { + h.writeToResultChan(ch, result) + } + } + delete(h.StatsLoad.WorkingColMap, result.Item) +} From 484e4b71fb068fde0d094f4aaae6ca1d9bac805f Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 20:15:24 +0800 Subject: [PATCH 02/12] fixup --- pkg/domain/domain.go | 2 +- pkg/statistics/handle/BUILD.bazel | 4 +- pkg/statistics/handle/handle.go | 15 +-- pkg/statistics/handle/syncload/BUILD.bazel | 25 +++++ .../handle/syncload/stats_syncload.go | 103 +++++++++--------- pkg/statistics/handle/util/BUILD.bazel | 2 + pkg/statistics/handle/util/interfaces.go | 40 +++++++ 7 files changed, 126 insertions(+), 65 deletions(-) create mode 100644 pkg/statistics/handle/syncload/BUILD.bazel diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 46e777b6f941d..a982031b9e871 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2268,7 +2268,7 @@ func quitStatsOwner(do *Domain, mgr owner.Manager) { func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { statsHandle := do.StatsHandle() for i, ctx := range ctxList { - statsHandle.StatsLoad.SubCtxs[i] = ctx + statsHandle.SubCtxs()[i] = ctx do.wg.Add(1) go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 2650a756dfa6b..ae4d69b751b67 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "//pkg/ddl/util", "//pkg/infoschema", "//pkg/kv", - "//pkg/metrics", "//pkg/parser/ast", "//pkg/parser/model", "//pkg/parser/mysql", @@ -31,14 +30,13 @@ go_library( "//pkg/statistics/handle/history", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/syncload", "//pkg/statistics/handle/usage", "//pkg/statistics/handle/util", "//pkg/types", - "//pkg/util", "//pkg/util/chunk", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_tiancaiamao_gp//:gp", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 0a48a905b3732..37ce6fe148641 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -15,16 +15,15 @@ package handle import ( + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "math" "time" - "github.com/pingcap/tidb/pkg/config" ddlUtil "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze" "github.com/pingcap/tidb/pkg/statistics/handle/cache" @@ -73,6 +72,9 @@ type Handle struct { // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. util.StatsAnalyze + // StatsSyncLoad is used to load stats syncly. + util.StatsSyncLoad + // StatsReadWriter is used to read/write stats from/to storage. util.StatsReadWriter @@ -94,9 +96,6 @@ type Handle struct { // StatsCache ... util.StatsCache - // StatsLoad is used to load stats concurrently - StatsLoad StatsLoad - lease atomic2.Duration } @@ -111,7 +110,6 @@ func (h *Handle) Clear() { // NewHandle creates a Handle for update stats. func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool util.SessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) { - cfg := config.GetGlobalConfig() handle := &Handle{ gpool: gp.New(math.MaxInt16, time.Minute), ddlEventCh: make(chan *ddlUtil.Event, 1000), @@ -135,10 +133,7 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti handle.StatsHistory = history.NewStatsHistory(handle) handle.StatsUsage = usage.NewStatsUsageImpl(handle) handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle) - handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) - handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} + handle.StatsSyncLoad = syncload.NewStatsSyncLoad(handle) return handle, nil } diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel new file mode 100644 index 0000000000000..c3781148bc1e7 --- /dev/null +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -0,0 +1,25 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "syncload", + srcs = ["stats_syncload.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload", + visibility = ["//visibility:public"], + deps = [ + "//pkg/config", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics", + "//pkg/statistics/handle/storage", + "//pkg/statistics/handle/util", + "//pkg/types", + "//pkg/util", + "//pkg/util/logutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", + ], +) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 0ca5fcda33ae0..3201c7479692e 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -16,11 +16,11 @@ package syncload import ( "fmt" - "sync" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/storage" utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -36,7 +37,18 @@ import ( type statsSyncLoad struct { statsHandle utilstats.StatsHandle - StatsLoad StatsLoad + StatsLoad utilstats.StatsLoad +} + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(statsHandle utilstats.StatsHandle) utilstats.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} + cfg := config.GetGlobalConfig() + s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) + s.StatsLoad.NeededItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} + return s } type statsWrapper struct { @@ -44,20 +56,9 @@ type statsWrapper struct { idx *statistics.Index } -// StatsLoad is used to load stats concurrently -type StatsLoad struct { - NeededItemsCh chan *NeededItemTask - TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult - SubCtxs []sessionctx.Context - sync.Mutex -} - -// NeededItemTask represents one needed column/indices with expire time. -type NeededItemTask struct { - ToTimeout time.Time - ResultCh chan stmtctx.StatsLoadResult - TableItemID model.TableItemID +// SubCtxs returns the sub contexts of the statsSyncLoad. +func (s *statsSyncLoad) SubCtxs() []sessionctx.Context { + return s.StatsLoad.SubCtxs } // SendLoadRequests send neededColumns requests @@ -79,9 +80,9 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis sc.StatsLoad.Timeout = timeout sc.StatsLoad.NeededItems = remainedItems sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) - tasks := make([]*NeededItemTask, 0) + tasks := make([]*utilstats.NeededItemTask, 0) for _, item := range remainedItems { - task := &NeededItemTask{ + task := &utilstats.NeededItemTask{ TableItemID: item, ToTimeout: time.Now().Local().Add(timeout), ResultCh: sc.StatsLoad.ResultCh, @@ -164,7 +165,7 @@ func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.TableItemID) } // AppendNeededItem appends needed columns/indices to ch, it is only used for test -func (s *statsSyncLoad) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { +func (s *statsSyncLoad) AppendNeededItem(task *utilstats.NeededItemTask, timeout time.Duration) error { timer := time.NewTimer(timeout) defer timer.Stop() select { @@ -184,7 +185,7 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{ logutil.BgLogger().Info("SubLoadWorker exited.") }() // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry - var lastTask *NeededItemTask + var lastTask *utilstats.NeededItemTask for { task, err := s.HandleOneTask(sctx, lastTask, exit) lastTask = task @@ -201,7 +202,7 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{ } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. -func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) { +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *utilstats.NeededItemTask, exit chan struct{}) (task *utilstats.NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -223,12 +224,12 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *NeededI return s.handleOneItemTask(sctx, task) } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) { +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *utilstats.NeededItemTask) (*utilstats.NeededItemTask, error) { result := stmtctx.StatsLoadResult{Item: task.TableItemID} item := result.Item tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } var err error @@ -236,27 +237,27 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *NeededI if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.col = col } // to avoid duplicated handling in concurrent scenario - working := h.setWorking(result.Item, task.ResultCh) + working := s.setWorking(result.Item, task.ResultCh) if !working { - h.writeToResultChan(task.ResultCh, result) + s.writeToResultChan(task.ResultCh, result) return nil, nil } t := time.Now() needUpdate := false - wrapper, err = h.readStatsForOneItem(sctx, item, wrapper) + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper) if err != nil { result.Error = err return task, err @@ -271,15 +272,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *NeededI } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - h.writeToResultChan(task.ResultCh, result) + if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx) { + s.writeToResultChan(task.ResultCh, result) } - h.finishWorking(result) + s.finishWorking(result) return nil, nil } // readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously -func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) { failpoint.Inject("mockReadStatsForOnePanic", nil) failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { if val.(bool) { @@ -365,33 +366,33 @@ func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItem } // drainColTask will hang until a column task can return, and either task or error will be returned. -func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*NeededItemTask, error) { +func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*utilstats.NeededItemTask, error) { // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh for { select { case <-exit: return nil, errExit - case task, ok := <-h.StatsLoad.NeededItemsCh: + case task, ok := <-s.StatsLoad.NeededItemsCh: if !ok { return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") } // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) continue } return task, nil - case task, ok := <-h.StatsLoad.TimeoutItemsCh: + case task, ok := <-s.StatsLoad.TimeoutItemsCh: select { case <-exit: return nil, errExit - case task0, ok0 := <-h.StatsLoad.NeededItemsCh: + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: if !ok0 { return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") } // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh - h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) return task0, nil default: if !ok { @@ -405,7 +406,7 @@ func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*NeededItemTask, error } // writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. -func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemTask) { +func (s *statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask) { select { case taskCh <- task: default: @@ -413,7 +414,7 @@ func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemT } // writeToChanWithTimeout writes a task to a channel and blocks until timeout. -func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededItemTask, timeout time.Duration) error { +func (s *statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask, timeout time.Duration) error { timer := time.NewTimer(timeout) defer timer.Stop() select { @@ -462,35 +463,35 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis tbl = tbl.Copy() tbl.Indices[item.ID] = idxHist } - h.UpdateStatsCache([]*statistics.Table{tbl}, nil) + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) return true } func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - chList, ok := h.StatsLoad.WorkingColMap[item] + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + chList, ok := s.StatsLoad.WorkingColMap[item] if ok { if chList[0] == resultCh { return true // just return for duplicate setWorking } - h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) + s.StatsLoad.WorkingColMap[item] = append(chList, resultCh) return false } chList = []chan stmtctx.StatsLoadResult{} chList = append(chList, resultCh) - h.StatsLoad.WorkingColMap[item] = chList + s.StatsLoad.WorkingColMap[item] = chList return true } func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) { - h.StatsLoad.Lock() - defer h.StatsLoad.Unlock() - if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok { list := chList[1:] for _, ch := range list { - h.writeToResultChan(ch, result) + s.writeToResultChan(ch, result) } } - delete(h.StatsLoad.WorkingColMap, result.Item) + delete(s.StatsLoad.WorkingColMap, result.Item) } diff --git a/pkg/statistics/handle/util/BUILD.bazel b/pkg/statistics/handle/util/BUILD.bazel index 2b3e27d834cdb..292161a37817c 100644 --- a/pkg/statistics/handle/util/BUILD.bazel +++ b/pkg/statistics/handle/util/BUILD.bazel @@ -16,10 +16,12 @@ go_library( "//pkg/parser/model", "//pkg/parser/terror", "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/table", "//pkg/types", + "//pkg/util", "//pkg/util/chunk", "//pkg/util/intest", "//pkg/util/sqlexec", diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index 48d8c56c9fba4..0ca17cbd0b532 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -16,6 +16,9 @@ package util import ( "context" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/util" + "sync" "time" "github.com/pingcap/tidb/pkg/infoschema" @@ -293,6 +296,43 @@ type StatsReadWriter interface { SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) } +// NeededItemTask represents one needed column/indices with expire time. +type NeededItemTask struct { + ToTimeout time.Time + ResultCh chan stmtctx.StatsLoadResult + TableItemID model.TableItemID +} + +// StatsLoad is used to load stats concurrently +type StatsLoad struct { + NeededItemsCh chan *NeededItemTask + TimeoutItemsCh chan *NeededItemTask + WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult + SubCtxs []sessionctx.Context + sync.Mutex +} + +// StatsSyncLoad implement the sync-load feature. +type StatsSyncLoad interface { + // SendLoadRequests sends load requests to the channel. + SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error + + // SyncWaitStatsLoad will wait for the load requests to finish. + SyncWaitStatsLoad(sc *stmtctx.StatementContext) error + + // AppendNeededItem appends a needed item to the channel. + AppendNeededItem(task *NeededItemTask, timeout time.Duration) error + + // SubLoadWorker will start a goroutine to handle the load requests. + SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) + + // HandleOneTask will handle one task. + HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) + + // SubCtxs returns the sub contexts. + SubCtxs() []sessionctx.Context +} + // StatsHandle is used to manage TiDB Statistics. type StatsHandle interface { // GPool returns the goroutine pool. From ca3f9e3ca9d61a9198798f0593d1c1f6758aaba5 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 21:37:52 +0800 Subject: [PATCH 03/12] fixup --- pkg/domain/domain.go | 2 +- pkg/statistics/handle/syncload/stats_syncload.go | 7 ++++--- pkg/statistics/handle/util/interfaces.go | 9 +++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index a982031b9e871..72e174fb0a77b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2268,7 +2268,7 @@ func quitStatsOwner(do *Domain, mgr owner.Manager) { func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { statsHandle := do.StatsHandle() for i, ctx := range ctxList { - statsHandle.SubCtxs()[i] = ctx + statsHandle.SetSubCtxs(i, ctx) do.wg.Add(1) go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 3201c7479692e..698d3f815d141 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -56,9 +56,10 @@ type statsWrapper struct { idx *statistics.Index } -// SubCtxs returns the sub contexts of the statsSyncLoad. -func (s *statsSyncLoad) SubCtxs() []sessionctx.Context { - return s.StatsLoad.SubCtxs +// SetSubCtxs sets the sessionctx which is used to run queries background. +// TODO: use SessionPool instead. +func (s *statsSyncLoad) SetSubCtxs(idx int, sctx sessionctx.Context) { + s.StatsLoad.SubCtxs[idx] = sctx } // SendLoadRequests send neededColumns requests diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index 0ca17cbd0b532..b16e6ba5722b9 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -16,16 +16,16 @@ package util import ( "context" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" - "github.com/pingcap/tidb/pkg/util" "sync" "time" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tiancaiamao/gp" ) @@ -329,8 +329,9 @@ type StatsSyncLoad interface { // HandleOneTask will handle one task. HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) - // SubCtxs returns the sub contexts. - SubCtxs() []sessionctx.Context + // SetSubCtxs sets the sessionctx which is used to run queries background. + // TODO: use SessionPool instead. + SetSubCtxs(idx int, sctx sessionctx.Context) } // StatsHandle is used to manage TiDB Statistics. From 355037c39b7ad3e0047d38678dd88c6d7480548c Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 21:54:35 +0800 Subject: [PATCH 04/12] fixup --- pkg/statistics/handle/syncload/stats_syncload.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 698d3f815d141..b071cdac5523b 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -105,7 +105,7 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis } // SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout -func (s *statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { if len(sc.StatsLoad.NeededItems) <= 0 { return nil } @@ -407,7 +407,7 @@ func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*utilstats.NeededItemT } // writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. -func (s *statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask) { +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask) { select { case taskCh <- task: default: @@ -415,7 +415,7 @@ func (s *statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask } // writeToChanWithTimeout writes a task to a channel and blocks until timeout. -func (s *statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask, timeout time.Duration) error { +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask, timeout time.Duration) error { timer := time.NewTimer(timeout) defer timer.Stop() select { @@ -427,7 +427,7 @@ func (s *statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItem } // writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (s *statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) From a575d65a317c68519eeb782b51f9edfcc0bcfb89 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 22:21:17 +0800 Subject: [PATCH 05/12] fixup --- pkg/statistics/handle/handle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 37ce6fe148641..ba315e9cb776f 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -15,7 +15,6 @@ package handle import ( - "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "math" "time" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/history" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" "github.com/pingcap/tidb/pkg/statistics/handle/storage" + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/logutil" From 89f47929a5788982f0feba37a771ded9f9c04356 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 23:38:52 +0800 Subject: [PATCH 06/12] fixup --- pkg/statistics/handle/handle_hist.go | 15 --------------- .../stats_syncload_test.go} | 10 +--------- 2 files changed, 1 insertion(+), 24 deletions(-) delete mode 100644 pkg/statistics/handle/handle_hist.go rename pkg/statistics/handle/{handle_hist_test.go => syncload/stats_syncload_test.go} (96%) diff --git a/pkg/statistics/handle/handle_hist.go b/pkg/statistics/handle/handle_hist.go deleted file mode 100644 index abe36f6ff344d..0000000000000 --- a/pkg/statistics/handle/handle_hist.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package handle diff --git a/pkg/statistics/handle/handle_hist_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go similarity index 96% rename from pkg/statistics/handle/handle_hist_test.go rename to pkg/statistics/handle/syncload/stats_syncload_test.go index 28d95020510ce..e66007e43a4b5 100644 --- a/pkg/statistics/handle/handle_hist_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handle_test +package syncload_test import ( "testing" @@ -206,18 +206,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Error(t, err1) require.NotNil(t, task1) - list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]] - require.True(t, ok) - require.Len(t, list, 1) - require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0]) task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Nil(t, err2) require.Nil(t, task2) - list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]] - require.True(t, ok) - require.Len(t, list, 2) - require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1]) require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) From bb6f8fc0da5ce5551d8c2eee865107ba804abe6d Mon Sep 17 00:00:00 2001 From: qw4990 Date: Wed, 18 Oct 2023 23:41:14 +0800 Subject: [PATCH 07/12] fixup --- pkg/statistics/handle/BUILD.bazel | 11 +---------- pkg/statistics/handle/syncload/BUILD.bazel | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 853c0987732c5..229888fe212f1 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "bootstrap.go", "ddl.go", "handle.go", - "handle_hist.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle", visibility = ["//visibility:public"], @@ -15,8 +14,6 @@ go_library( "//pkg/ddl/util", "//pkg/infoschema", "//pkg/kv", - "//pkg/parser/ast", - "//pkg/metrics", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", @@ -48,25 +45,19 @@ go_test( timeout = "short", srcs = [ "ddl_test.go", - "handle_hist_test.go", "main_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 8, + shard_count = 4, deps = [ - "//pkg/config", "//pkg/parser/model", "//pkg/planner/cardinality", - "//pkg/sessionctx", - "//pkg/sessionctx/stmtctx", "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/types", - "//pkg/util/mathutil", "//pkg/util/mock", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index c3781148bc1e7..b25e45d0e241c 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "syncload", @@ -23,3 +23,21 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "syncload_test", + timeout = "short", + srcs = ["stats_syncload_test.go"], + flaky = True, + shard_count = 4, + deps = [ + "//pkg/config", + "//pkg/parser/model", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/testkit", + "//pkg/util/mathutil", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) From b47eb7376912e973867b3f50bb5f81d4534ac20f Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 19 Oct 2023 10:51:53 +0800 Subject: [PATCH 08/12] fixup --- pkg/planner/core/casetest/planstats/plan_stats_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/planner/core/casetest/planstats/plan_stats_test.go b/pkg/planner/core/casetest/planstats/plan_stats_test.go index f29b42d2bfbf4..7be6b9eeb0c6a 100644 --- a/pkg/planner/core/casetest/planstats/plan_stats_test.go +++ b/pkg/planner/core/casetest/planstats/plan_stats_test.go @@ -32,7 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle" + utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" @@ -268,7 +268,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) { neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false} resultCh := make(chan stmtctx.StatsLoadResult, 1) timeout := time.Duration(1<<63 - 1) - task := &handle.NeededItemTask{ + task := &utilstats.NeededItemTask{ TableItemID: neededColumn, ResultCh: resultCh, ToTimeout: time.Now().Local().Add(timeout), From 1b2ddf04f093a4c3f2a46fe6b338a7e1d7bd98f2 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 19 Oct 2023 10:52:48 +0800 Subject: [PATCH 09/12] fixup --- pkg/planner/core/casetest/planstats/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/core/casetest/planstats/BUILD.bazel b/pkg/planner/core/casetest/planstats/BUILD.bazel index 094d33c59f3ac..c789c86065f71 100644 --- a/pkg/planner/core/casetest/planstats/BUILD.bazel +++ b/pkg/planner/core/casetest/planstats/BUILD.bazel @@ -21,7 +21,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/statistics", - "//pkg/statistics/handle", + "//pkg/statistics/handle/util", "//pkg/table", "//pkg/testkit", "//pkg/testkit/testdata", From 156423a2cc7b33700d08282edd88e4a9e0617be1 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 19 Oct 2023 11:45:18 +0800 Subject: [PATCH 10/12] fixup --- .../handle/syncload/stats_syncload_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index e66007e43a4b5..356dda7e6dd2d 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -40,14 +40,14 @@ func TestSyncLoadSkipUnAnalyzedItems(t *testing.T) { h.SetLease(1) // no item would be loaded - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(0)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(0)`)) tk.MustQuery("trace plan select * from t where a > 10") - failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems") + failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems") tk.MustExec("analyze table t1") // one column would be loaded - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(1)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(1)`)) tk.MustQuery("trace plan select * from t1 where a > 10") - failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems") + failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems") } func TestConcurrentLoadHist(t *testing.T) { @@ -175,11 +175,11 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { inTerms string }{ { - failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOnePanic", + failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOnePanic", inTerms: "panic", }, { - failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOneFail", + failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", inTerms: "return(true)", }, } From a35ff678a457c4e63f26928d5fe798d2e781a15f Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 19 Oct 2023 14:25:46 +0800 Subject: [PATCH 11/12] fixup --- pkg/statistics/handle/syncload/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index b25e45d0e241c..17d9da6fd92ed 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -29,6 +29,7 @@ go_test( timeout = "short", srcs = ["stats_syncload_test.go"], flaky = True, + race = "on" shard_count = 4, deps = [ "//pkg/config", From ae8343d149bfbf5f3a1f2e3eba1f985b2f173308 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Thu, 19 Oct 2023 14:28:27 +0800 Subject: [PATCH 12/12] fixup --- pkg/statistics/handle/syncload/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index 17d9da6fd92ed..910011c8f6f00 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -29,7 +29,7 @@ go_test( timeout = "short", srcs = ["stats_syncload_test.go"], flaky = True, - race = "on" + race = "on", shard_count = 4, deps = [ "//pkg/config",