From 322104f15f3087ab5eeace496f96000af44e8186 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 14 Sep 2023 21:40:41 +0800 Subject: [PATCH] planner: refactor to make `tableStatsDelta` thread-safe (#46977) ref pingcap/tidb#46905 --- statistics/handle/handle.go | 21 +++---- statistics/handle/update.go | 82 ++++++++++++++++++--------- statistics/handle/update_list_test.go | 2 +- 3 files changed, 64 insertions(+), 41 deletions(-) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index e0cf52d2e0483..2c8df37316797 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -92,14 +92,11 @@ type Handle struct { // written only after acquiring the lock. statsCache *cache.StatsCachePointer - // globalMap contains all the delta map from collectors when we dump them to KV. - globalMap struct { - data tableDeltaMap - sync.Mutex - } + // tableDelta contains all the delta map from collectors when we dump them to KV. + tableDelta *tableDelta - // colMap contains all the column stats usage information from collectors when we dump them to KV. - colMap *statsUsage + // statsUsage contains all the column stats usage information from collectors when we dump them to KV. + statsUsage *statsUsage // StatsLoad is used to load stats concurrently StatsLoad StatsLoad @@ -182,10 +179,8 @@ func (h *Handle) Clear() { h.mu.ctx.GetSessionVars().EnableChunkRPC = false h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0) h.listHead.ClearForTest() - h.globalMap.Lock() - h.globalMap.data = make(tableDeltaMap) - h.globalMap.Unlock() - h.colMap.reset() + h.tableDelta.reset() + h.statsUsage.reset() h.mu.Unlock() } @@ -215,8 +210,8 @@ func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool s return nil, err } handle.statsCache = statsCache - handle.globalMap.data = make(tableDeltaMap) - handle.colMap = newStatsUsage() + handle.tableDelta = newTableDelta() + handle.statsUsage = newStatsUsage() handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) diff --git a/statistics/handle/update.go b/statistics/handle/update.go index eb0e1456b5364..fb6145ef847b5 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -42,9 +42,40 @@ import ( "go.uber.org/zap" ) -type tableDeltaMap map[int64]variable.TableDelta +// tableDelta is used to collect tables' change information. +// All methods of it are thread-safe. +type tableDelta struct { + delta map[int64]variable.TableDelta // map[tableID]delta + lock sync.Mutex +} + +func newTableDelta() *tableDelta { + return &tableDelta{ + delta: make(map[int64]variable.TableDelta), + } +} -func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[int64]int64) { +func (m *tableDelta) reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.delta = make(map[int64]variable.TableDelta) +} + +func (m *tableDelta) getDeltaAndReset() map[int64]variable.TableDelta { + m.lock.Lock() + defer m.lock.Unlock() + ret := m.delta + m.delta = make(map[int64]variable.TableDelta) + return ret +} + +func (m *tableDelta) update(id int64, delta int64, count int64, colSize *map[int64]int64) { + m.lock.Lock() + defer m.lock.Unlock() + updateTableDeltaMap(m.delta, id, delta, count, colSize) +} + +func updateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { item := m[id] item.Delta += delta item.Count += count @@ -59,13 +90,19 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i m[id] = item } -func (m tableDeltaMap) merge(deltaMap tableDeltaMap) { +func (m *tableDelta) merge(deltaMap map[int64]variable.TableDelta) { + if len(deltaMap) == 0 { + return + } + m.lock.Lock() + defer m.lock.Unlock() for id, item := range deltaMap { - m.update(id, item.Delta, item.Count, &item.ColSize) + updateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) } } // statsUsage maps (tableID, columnID) to the last time when the column stats are used(needed). +// All methods of it are thread-safe. type statsUsage struct { usage map[model.TableItemID]time.Time lock sync.RWMutex @@ -104,15 +141,14 @@ func (m *statsUsage) merge(other map[model.TableItemID]time.Time) { } } -func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, colMap *statsUsage) { - deltaMap.merge(s.mapper) - s.mapper = make(tableDeltaMap) +func merge(s *SessionStatsCollector, deltaMap *tableDelta, colMap *statsUsage) { + deltaMap.merge(s.mapper.getDeltaAndReset()) colMap.merge(s.statsUsage.getUsageAndReset()) } // SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it. type SessionStatsCollector struct { - mapper tableDeltaMap + mapper *tableDelta statsUsage *statsUsage next *SessionStatsCollector sync.Mutex @@ -124,7 +160,7 @@ type SessionStatsCollector struct { // NewSessionStatsCollector initializes a new SessionStatsCollector. func NewSessionStatsCollector() *SessionStatsCollector { return &SessionStatsCollector{ - mapper: make(tableDeltaMap), + mapper: newTableDelta(), statsUsage: newStatsUsage(), } } @@ -147,7 +183,7 @@ func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSi func (s *SessionStatsCollector) ClearForTest() { s.Lock() defer s.Unlock() - s.mapper = make(tableDeltaMap) + s.mapper = newTableDelta() s.statsUsage = newStatsUsage() s.next = nil s.deleted = false @@ -165,7 +201,7 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { h.listHead.Lock() defer h.listHead.Unlock() newCollector := &SessionStatsCollector{ - mapper: make(tableDeltaMap), + mapper: newTableDelta(), next: h.listHead.next, statsUsage: newStatsUsage(), } @@ -376,7 +412,7 @@ const ( // sweepList will loop over the list, merge each session's local stats into handle // and remove closed session's collector. func (h *Handle) sweepList() { - deltaMap := make(tableDeltaMap) + deltaMap := newTableDelta() colMap := newStatsUsage() prev := h.listHead prev.Lock() @@ -395,25 +431,17 @@ func (h *Handle) sweepList() { } } prev.Unlock() - h.globalMap.Lock() - h.globalMap.data.merge(deltaMap) - h.globalMap.Unlock() - h.colMap.merge(colMap.getUsageAndReset()) + h.tableDelta.merge(deltaMap.getDeltaAndReset()) + h.statsUsage.merge(colMap.getUsageAndReset()) } // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { h.sweepList() - h.globalMap.Lock() - deltaMap := h.globalMap.data - h.globalMap.data = make(tableDeltaMap) - h.globalMap.Unlock() + deltaMap := h.tableDelta.getDeltaAndReset() defer func() { - h.globalMap.Lock() - deltaMap.merge(h.globalMap.data) - h.globalMap.data = deltaMap - h.globalMap.Unlock() + h.tableDelta.merge(deltaMap) }() // TODO: pass in do.InfoSchema() to DumpStatsDeltaToKV. is := func() infoschema.InfoSchema { @@ -431,7 +459,7 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { return errors.Trace(err) } if updated { - deltaMap.update(id, -item.Delta, -item.Count, nil) + updateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) } if err = h.dumpTableStatColSizeToKV(id, item); err != nil { delete(deltaMap, id) @@ -551,9 +579,9 @@ func (h *Handle) DumpColStatsUsageToKV() error { return nil } h.sweepList() - colMap := h.colMap.getUsageAndReset() + colMap := h.statsUsage.getUsageAndReset() defer func() { - h.colMap.merge(colMap) + h.statsUsage.merge(colMap) }() type pair struct { lastUsedAt string diff --git a/statistics/handle/update_list_test.go b/statistics/handle/update_list_test.go index 1ee9ce256d672..dea9519672ad8 100644 --- a/statistics/handle/update_list_test.go +++ b/statistics/handle/update_list_test.go @@ -22,7 +22,7 @@ import ( func TestInsertAndDelete(t *testing.T) { h := Handle{ - listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)}, + listHead: &SessionStatsCollector{mapper: newTableDelta()}, } var items []*SessionStatsCollector for i := 0; i < 5; i++ {