From 97c774124e4ddc0af2d259d57ab9596bcb062cce Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 10 Oct 2023 18:46:55 +0800 Subject: [PATCH] planner: encapsulate session stats collection logic into a single dedicated structure (#47512) ref pingcap/tidb#46905 --- session/BUILD.bazel | 1 - session/session.go | 7 +- statistics/handle/BUILD.bazel | 4 +- statistics/handle/handle.go | 18 +- statistics/handle/update.go | 107 +------ statistics/handle/usage/BUILD.bazel | 14 +- statistics/handle/usage/index_usage.go | 21 +- .../handle/usage/session_stats_collect.go | 275 ++++++++++++++++++ .../session_stats_collect_test.go} | 21 +- statistics/handle/usage/stats_usage.go | 66 ----- statistics/handle/usage/table_delta.go | 86 ------ 11 files changed, 320 insertions(+), 300 deletions(-) create mode 100644 statistics/handle/usage/session_stats_collect.go rename statistics/handle/{update_list_test.go => usage/session_stats_collect_test.go} (70%) delete mode 100644 statistics/handle/usage/stats_usage.go delete mode 100644 statistics/handle/usage/table_delta.go diff --git a/session/BUILD.bazel b/session/BUILD.bazel index b2c24ecada999..81832113248e8 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -60,7 +60,6 @@ go_library( "//sessiontxn", "//sessiontxn/isolation", "//sessiontxn/staleread", - "//statistics/handle", "//statistics/handle/usage", "//store/driver/error", "//store/driver/txn", diff --git a/session/session.go b/session/session.go index a954926cee4b2..f699b225feea0 100644 --- a/session/session.go +++ b/session/session.go @@ -76,7 +76,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/statistics/handle/usage" storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/driver/txn" @@ -228,7 +227,7 @@ type session struct { sessionVars *variable.SessionVars sessionManager util.SessionManager - statsCollector *handle.SessionStatsCollector + statsCollector *usage.SessionStatsItem // ddlOwnerManager is used in `select tidb_is_ddl_owner()` statement; ddlOwnerManager owner.Manager // lockedTables use to record the table locks hold by the session. @@ -3073,7 +3072,7 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (Session, error) { // Add stats collector, and it will be freed by background stats worker // which periodically updates stats using the collected data. if do.StatsHandle() != nil && do.StatsUpdating() { - s.statsCollector = do.StatsHandle().NewSessionStatsCollector() + s.statsCollector = do.StatsHandle().NewSessionStatsItem() if GetIndexUsageSyncLease() > 0 { s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector() } @@ -3625,7 +3624,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { func attachStatsCollector(s *session, dom *domain.Domain) *session { if dom.StatsHandle() != nil && dom.StatsUpdating() { if s.statsCollector == nil { - s.statsCollector = dom.StatsHandle().NewSessionStatsCollector() + s.statsCollector = dom.StatsHandle().NewSessionStatsItem() } if s.idxUsageCollector == nil && GetIndexUsageSyncLease() > 0 { s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector() diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 6c4553274583b..d8e873392dec1 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -61,12 +61,11 @@ go_test( "gc_test.go", "handle_hist_test.go", "main_test.go", - "update_list_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 28, + shard_count = 27, deps = [ "//config", "//domain", @@ -79,7 +78,6 @@ go_test( "//statistics/handle/globalstats", "//statistics/handle/internal", "//statistics/handle/storage", - "//statistics/handle/usage", "//testkit", "//testkit/testsetup", "//types", diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 2c4f79b2c84fa..bf998a7af92b6 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -79,19 +79,13 @@ type Handle struct { // idxUsageListHead contains all the index usage collectors required by session. idxUsageListHead *usage.SessionIndexUsageCollector - // listHead contains all the stats collector required by session. - listHead *SessionStatsCollector + // SessionStatsList contains all the stats collector required by session. + *usage.SessionStatsList // It can be read by multiple readers at the same time without acquiring lock, but it can be // written only after acquiring the lock. statsCache *cache.StatsCachePointer - // tableDelta contains all the delta map from collectors when we dump them to KV. - tableDelta *usage.TableDelta - - // statsUsage contains all the column stats usage information from collectors when we dump them to KV. - statsUsage *usage.StatsUsage - // StatsLoad is used to load stats concurrently StatsLoad StatsLoad @@ -117,9 +111,7 @@ func (h *Handle) Clear() { for len(h.ddlEventCh) > 0 { <-h.ddlEventCh } - h.listHead.ClearForTest() - h.tableDelta.Reset() - h.statsUsage.Reset() + h.ResetSessionStatsList() } // NewHandle creates a Handle for update stats. @@ -129,7 +121,7 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti handle := &Handle{ gpool: gp.New(math.MaxInt16, time.Minute), ddlEventCh: make(chan *ddlUtil.Event, 1000), - listHead: NewSessionStatsCollector(), + SessionStatsList: usage.NewSessionStatsList(), idxUsageListHead: usage.NewSessionIndexUsageCollector(nil), pool: pool, sysProcTracker: tracker, @@ -147,8 +139,6 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti return nil, err } handle.statsCache = statsCache - handle.tableDelta = usage.NewTableDelta() - handle.statsUsage = usage.NewStatsUsage() handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 1d37812ea10ba..64ffda5809b1d 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -18,7 +18,6 @@ import ( "cmp" "slices" "strings" - "sync" "time" "github.com/pingcap/errors" @@ -35,74 +34,6 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) -func merge(s *SessionStatsCollector, deltaMap *usage.TableDelta, colMap *usage.StatsUsage) { - deltaMap.Merge(s.mapper.GetDeltaAndReset()) - colMap.Merge(s.statsUsage.GetUsageAndReset()) -} - -// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it. -type SessionStatsCollector struct { - mapper *usage.TableDelta - statsUsage *usage.StatsUsage - next *SessionStatsCollector - sync.Mutex - - // deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector. - deleted bool -} - -// NewSessionStatsCollector initializes a new SessionStatsCollector. -func NewSessionStatsCollector() *SessionStatsCollector { - return &SessionStatsCollector{ - mapper: usage.NewTableDelta(), - statsUsage: usage.NewStatsUsage(), - } -} - -// Delete only sets the deleted flag true, it will be deleted from list when DumpStatsDeltaToKV is called. -func (s *SessionStatsCollector) Delete() { - s.Lock() - defer s.Unlock() - s.deleted = true -} - -// Update will updates the delta and count for one table id. -func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { - s.Lock() - defer s.Unlock() - s.mapper.Update(id, delta, count, colSize) -} - -// ClearForTest clears the mapper for test. -func (s *SessionStatsCollector) ClearForTest() { - s.Lock() - defer s.Unlock() - s.mapper = usage.NewTableDelta() - s.statsUsage = usage.NewStatsUsage() - s.next = nil - s.deleted = false -} - -// UpdateColStatsUsage updates the last time when the column stats are used(needed). -func (s *SessionStatsCollector) UpdateColStatsUsage(colMap map[model.TableItemID]time.Time) { - s.Lock() - defer s.Unlock() - s.statsUsage.Merge(colMap) -} - -// NewSessionStatsCollector allocates a stats collector for a session. -func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { - h.listHead.Lock() - defer h.listHead.Unlock() - newCollector := &SessionStatsCollector{ - mapper: usage.NewTableDelta(), - next: h.listHead.next, - statsUsage: usage.NewStatsUsage(), - } - h.listHead.next = newCollector - return newCollector -} - // NewSessionIndexUsageCollector will add a new SessionIndexUsageCollector into linked list headed by idxUsageListHead. // idxUsageListHead always points to an empty SessionIndexUsageCollector as a sentinel node. So we let idxUsageListHead.next // points to new item. It's helpful to sweepIdxUsageList. @@ -180,39 +111,13 @@ const ( DumpDelta dumpMode = false ) -// sweepList will loop over the list, merge each session's local stats into handle -// and remove closed session's collector. -func (h *Handle) sweepList() { - deltaMap := usage.NewTableDelta() - colMap := usage.NewStatsUsage() - prev := h.listHead - prev.Lock() - for curr := prev.next; curr != nil; curr = curr.next { - curr.Lock() - // Merge the session stats into deltaMap respectively. - merge(curr, deltaMap, colMap) - if curr.deleted { - prev.next = curr.next - // Since the session is already closed, we can safely unlock it here. - curr.Unlock() - } else { - // Unlock the previous lock, so we only holds at most two session's lock at the same time. - prev.Unlock() - prev = curr - } - } - prev.Unlock() - h.tableDelta.Merge(deltaMap.GetDeltaAndReset()) - h.statsUsage.Merge(colMap.GetUsageAndReset()) -} - // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { - h.sweepList() - deltaMap := h.tableDelta.GetDeltaAndReset() + h.SweepSessionStatsList() + deltaMap := h.SessionTableDelta().GetDeltaAndReset() defer func() { - h.tableDelta.Merge(deltaMap) + h.SessionTableDelta().Merge(deltaMap) }() return h.callWithSCtx(func(sctx sessionctx.Context) error { @@ -339,10 +244,10 @@ func (h *Handle) DumpColStatsUsageToKV() error { if !variable.EnableColumnTracking.Load() { return nil } - h.sweepList() - colMap := h.statsUsage.GetUsageAndReset() + h.SweepSessionStatsList() + colMap := h.SessionStatsUsage().GetUsageAndReset() defer func() { - h.statsUsage.Merge(colMap) + h.SessionStatsUsage().Merge(colMap) }() type pair struct { lastUsedAt string diff --git a/statistics/handle/usage/BUILD.bazel b/statistics/handle/usage/BUILD.bazel index 339e6beecc187..4743b33155e52 100644 --- a/statistics/handle/usage/BUILD.bazel +++ b/statistics/handle/usage/BUILD.bazel @@ -1,12 +1,11 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "usage", srcs = [ "index_usage.go", "predicate_column.go", - "stats_usage.go", - "table_delta.go", + "session_stats_collect.go", ], importpath = "github.com/pingcap/tidb/statistics/handle/usage", visibility = ["//visibility:public"], @@ -24,3 +23,12 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "usage_test", + timeout = "short", + srcs = ["session_stats_collect_test.go"], + embed = [":usage"], + flaky = True, + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/statistics/handle/usage/index_usage.go b/statistics/handle/usage/index_usage.go index 6d83a430455d1..505b220845e06 100644 --- a/statistics/handle/usage/index_usage.go +++ b/statistics/handle/usage/index_usage.go @@ -39,10 +39,11 @@ type GlobalIndexID struct { IndexID int64 } -type indexUsageMap map[GlobalIndexID]IndexUsageInformation +type indexUsage map[GlobalIndexID]IndexUsageInformation // SessionIndexUsageCollector is a list item that holds the index usage mapper. If you want to write or read mapper, you must lock it. // TODO: use a third-party thread-safe list implementation instead of maintaining the list manually. +// TODO: merge this list into SessionStatsList. /* [session1] [session2] [sessionN] | | | @@ -59,7 +60,7 @@ type indexUsageMap map[GlobalIndexID]IndexUsageInformation [storage] */ type SessionIndexUsageCollector struct { - mapper indexUsageMap + mapper indexUsage next *SessionIndexUsageCollector sync.Mutex @@ -70,19 +71,19 @@ type SessionIndexUsageCollector struct { // If listHead is not nil, add this element to the list. func NewSessionIndexUsageCollector(listHead *SessionIndexUsageCollector) *SessionIndexUsageCollector { if listHead == nil { - return &SessionIndexUsageCollector{mapper: make(indexUsageMap)} + return &SessionIndexUsageCollector{mapper: make(indexUsage)} } listHead.Lock() defer listHead.Unlock() newCollector := &SessionIndexUsageCollector{ - mapper: make(indexUsageMap), + mapper: make(indexUsage), next: listHead.next, } listHead.next = newCollector return newCollector } -func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformation) { +func (m indexUsage) updateByKey(id GlobalIndexID, value *IndexUsageInformation) { item := m[id] item.QueryCount += value.QueryCount item.RowsSelected += value.RowsSelected @@ -92,12 +93,12 @@ func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformatio m[id] = item } -func (m indexUsageMap) update(tableID int64, indexID int64, value *IndexUsageInformation) { +func (m indexUsage) update(tableID int64, indexID int64, value *IndexUsageInformation) { id := GlobalIndexID{TableID: tableID, IndexID: indexID} m.updateByKey(id, value) } -func (m indexUsageMap) merge(destMap indexUsageMap) { +func (m indexUsage) merge(destMap indexUsage) { for id := range destMap { item := destMap[id] m.updateByKey(id, &item) @@ -122,10 +123,10 @@ func (s *SessionIndexUsageCollector) Delete() { // sweepIdxUsageList will loop over the list, merge each session's local index usage information into handle // and remove closed session's collector. // For convenience, we keep idxUsageListHead always points to sentinel node. So that we don't need to consider corner case. -func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap { +func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsage { prev := listHead prev.Lock() - mapper := make(indexUsageMap) + mapper := make(indexUsage) for curr := prev.next; curr != nil; curr = curr.next { curr.Lock() mapper.merge(curr.mapper) @@ -134,7 +135,7 @@ func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap { curr.Unlock() } else { prev.Unlock() - curr.mapper = make(indexUsageMap) + curr.mapper = make(indexUsage) prev = curr } } diff --git a/statistics/handle/usage/session_stats_collect.go b/statistics/handle/usage/session_stats_collect.go new file mode 100644 index 0000000000000..e13bb3862ae2f --- /dev/null +++ b/statistics/handle/usage/session_stats_collect.go @@ -0,0 +1,275 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usage + +import ( + "sync" + "time" + + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" +) + +func merge(s *SessionStatsItem, deltaMap *TableDelta, colMap *StatsUsage) { + deltaMap.Merge(s.mapper.GetDeltaAndReset()) + colMap.Merge(s.statsUsage.GetUsageAndReset()) +} + +// SessionStatsItem is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it. +type SessionStatsItem struct { + mapper *TableDelta + statsUsage *StatsUsage + next *SessionStatsItem + sync.Mutex + + // deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector. + deleted bool +} + +// Delete only sets the deleted flag true, it will be deleted from list when DumpStatsDeltaToKV is called. +func (s *SessionStatsItem) Delete() { + s.Lock() + defer s.Unlock() + s.deleted = true +} + +// Update will updates the delta and count for one table id. +func (s *SessionStatsItem) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { + s.Lock() + defer s.Unlock() + s.mapper.Update(id, delta, count, colSize) +} + +// ClearForTest clears the mapper for test. +func (s *SessionStatsItem) ClearForTest() { + s.Lock() + defer s.Unlock() + s.mapper = NewTableDelta() + s.statsUsage = NewStatsUsage() + s.next = nil + s.deleted = false +} + +// UpdateColStatsUsage updates the last time when the column stats are used(needed). +func (s *SessionStatsItem) UpdateColStatsUsage(colMap map[model.TableItemID]time.Time) { + s.Lock() + defer s.Unlock() + s.statsUsage.Merge(colMap) +} + +// SessionStatsList is a list of SessionStatsItem, which is used to collect stats usage and table delta information from sessions. +// TODO: merge SessionIndexUsage into this list. +/* + [session1] [session2] [sessionN] + | | | + update into update into update into + | | | + v v v +[StatsList.Head] --> [session1.StatsItem] --> [session2.StatsItem] --> ... --> [sessionN.StatsItem] + | | | + +-------------------------+---------------------------------+ + | + collect and dump into storage periodically + | + v + [storage] +*/ +type SessionStatsList struct { + // tableDelta contains all the delta map from collectors when we dump them to KV. + tableDelta *TableDelta + + // statsUsage contains all the column stats usage information from collectors when we dump them to KV. + statsUsage *StatsUsage + + // listHead contains all the stats collector required by session. + listHead *SessionStatsItem +} + +// NewSessionStatsList initializes a new SessionStatsList. +func NewSessionStatsList() *SessionStatsList { + return &SessionStatsList{ + tableDelta: NewTableDelta(), + statsUsage: NewStatsUsage(), + listHead: &SessionStatsItem{ + mapper: NewTableDelta(), + statsUsage: NewStatsUsage(), + }, + } +} + +// NewSessionStatsItem allocates a stats collector for a session. +func (sl *SessionStatsList) NewSessionStatsItem() *SessionStatsItem { + sl.listHead.Lock() + defer sl.listHead.Unlock() + newCollector := &SessionStatsItem{ + mapper: NewTableDelta(), + next: sl.listHead.next, + statsUsage: NewStatsUsage(), + } + sl.listHead.next = newCollector + return newCollector +} + +// SweepSessionStatsList will loop over the list, merge each session's local stats into handle +// and remove closed session's collector. +func (sl *SessionStatsList) SweepSessionStatsList() { + deltaMap := NewTableDelta() + colMap := NewStatsUsage() + prev := sl.listHead + prev.Lock() + for curr := prev.next; curr != nil; curr = curr.next { + curr.Lock() + // Merge the session stats into deltaMap respectively. + merge(curr, deltaMap, colMap) + if curr.deleted { + prev.next = curr.next + // Since the session is already closed, we can safely unlock it here. + curr.Unlock() + } else { + // Unlock the previous lock, so we only holds at most two session's lock at the same time. + prev.Unlock() + prev = curr + } + } + prev.Unlock() + sl.tableDelta.Merge(deltaMap.GetDeltaAndReset()) + sl.statsUsage.Merge(colMap.GetUsageAndReset()) +} + +// SessionTableDelta returns the current *TableDelta. +func (sl *SessionStatsList) SessionTableDelta() *TableDelta { + return sl.tableDelta +} + +// SessionStatsUsage returns the current *StatsUsage. +func (sl *SessionStatsList) SessionStatsUsage() *StatsUsage { + return sl.statsUsage +} + +// ResetSessionStatsList resets this list. +func (sl *SessionStatsList) ResetSessionStatsList() { + sl.listHead.ClearForTest() + sl.tableDelta.Reset() + sl.statsUsage.Reset() +} + +// TableDelta is used to collect tables' change information. +// All methods of it are thread-safe. +type TableDelta struct { + delta map[int64]variable.TableDelta // map[tableID]delta + lock sync.Mutex +} + +// NewTableDelta creates a new TableDelta. +func NewTableDelta() *TableDelta { + return &TableDelta{ + delta: make(map[int64]variable.TableDelta), + } +} + +// Reset resets the TableDelta. +func (m *TableDelta) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.delta = make(map[int64]variable.TableDelta) +} + +// GetDeltaAndReset gets the delta and resets the TableDelta. +func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta { + m.lock.Lock() + defer m.lock.Unlock() + ret := m.delta + m.delta = make(map[int64]variable.TableDelta) + return ret +} + +// Update updates the delta of the table. +func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { + m.lock.Lock() + defer m.lock.Unlock() + UpdateTableDeltaMap(m.delta, id, delta, count, colSize) +} + +// Merge merges the deltaMap into the TableDelta. +func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) { + if len(deltaMap) == 0 { + return + } + m.lock.Lock() + defer m.lock.Unlock() + for id, item := range deltaMap { + UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) + } +} + +// UpdateTableDeltaMap updates the delta of the table. +func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { + item := m[id] + item.Delta += delta + item.Count += count + if item.ColSize == nil { + item.ColSize = make(map[int64]int64) + } + if colSize != nil { + for key, val := range *colSize { + item.ColSize[key] += val + } + } + m[id] = item +} + +// StatsUsage maps (tableID, columnID) to the last time when the column stats are used(needed). +// All methods of it are thread-safe. +type StatsUsage struct { + usage map[model.TableItemID]time.Time + lock sync.RWMutex +} + +// NewStatsUsage creates a new StatsUsage. +func NewStatsUsage() *StatsUsage { + return &StatsUsage{ + usage: make(map[model.TableItemID]time.Time), + } +} + +// Reset resets the StatsUsage. +func (m *StatsUsage) Reset() { + m.lock.Lock() + defer m.lock.Unlock() + m.usage = make(map[model.TableItemID]time.Time) +} + +// GetUsageAndReset gets the usage and resets the StatsUsage. +func (m *StatsUsage) GetUsageAndReset() map[model.TableItemID]time.Time { + m.lock.Lock() + defer m.lock.Unlock() + ret := m.usage + m.usage = make(map[model.TableItemID]time.Time) + return ret +} + +// Merge merges the usageMap into the StatsUsage. +func (m *StatsUsage) Merge(other map[model.TableItemID]time.Time) { + if len(other) == 0 { + return + } + m.lock.Lock() + defer m.lock.Unlock() + for id, t := range other { + if mt, ok := m.usage[id]; !ok || mt.Before(t) { + m.usage[id] = t + } + } +} diff --git a/statistics/handle/update_list_test.go b/statistics/handle/usage/session_stats_collect_test.go similarity index 70% rename from statistics/handle/update_list_test.go rename to statistics/handle/usage/session_stats_collect_test.go index c4f4431e57bf4..321599c889782 100644 --- a/statistics/handle/update_list_test.go +++ b/statistics/handle/usage/session_stats_collect_test.go @@ -1,4 +1,4 @@ -// Copyright 2017 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,35 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handle +package usage import ( "testing" - "github.com/pingcap/tidb/statistics/handle/usage" "github.com/stretchr/testify/require" ) func TestInsertAndDelete(t *testing.T) { - h := Handle{ - listHead: &SessionStatsCollector{mapper: usage.NewTableDelta()}, - } - var items []*SessionStatsCollector + sl := NewSessionStatsList() + var items []*SessionStatsItem for i := 0; i < 5; i++ { - items = append(items, h.NewSessionStatsCollector()) + items = append(items, sl.NewSessionStatsItem()) } items[0].Delete() // delete tail items[2].Delete() // delete middle items[4].Delete() // delete head - h.sweepList() + sl.SweepSessionStatsList() - require.Equal(t, items[3], h.listHead.next) + require.Equal(t, items[3], sl.listHead.next) require.Equal(t, items[1], items[3].next) require.Nil(t, items[1].next) // delete rest items[1].Delete() items[3].Delete() - h.sweepList() - require.Nil(t, h.listHead.next) + sl.SweepSessionStatsList() + require.Nil(t, sl.listHead.next) } diff --git a/statistics/handle/usage/stats_usage.go b/statistics/handle/usage/stats_usage.go deleted file mode 100644 index 2f0c14a50687f..0000000000000 --- a/statistics/handle/usage/stats_usage.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package usage - -import ( - "sync" - "time" - - "github.com/pingcap/tidb/parser/model" -) - -// StatsUsage maps (tableID, columnID) to the last time when the column stats are used(needed). -// All methods of it are thread-safe. -type StatsUsage struct { - usage map[model.TableItemID]time.Time - lock sync.RWMutex -} - -// NewStatsUsage creates a new StatsUsage. -func NewStatsUsage() *StatsUsage { - return &StatsUsage{ - usage: make(map[model.TableItemID]time.Time), - } -} - -// Reset resets the StatsUsage. -func (m *StatsUsage) Reset() { - m.lock.Lock() - defer m.lock.Unlock() - m.usage = make(map[model.TableItemID]time.Time) -} - -// GetUsageAndReset gets the usage and resets the StatsUsage. -func (m *StatsUsage) GetUsageAndReset() map[model.TableItemID]time.Time { - m.lock.Lock() - defer m.lock.Unlock() - ret := m.usage - m.usage = make(map[model.TableItemID]time.Time) - return ret -} - -// Merge merges the usageMap into the StatsUsage. -func (m *StatsUsage) Merge(other map[model.TableItemID]time.Time) { - if len(other) == 0 { - return - } - m.lock.Lock() - defer m.lock.Unlock() - for id, t := range other { - if mt, ok := m.usage[id]; !ok || mt.Before(t) { - m.usage[id] = t - } - } -} diff --git a/statistics/handle/usage/table_delta.go b/statistics/handle/usage/table_delta.go deleted file mode 100644 index 4b46c1f859b1b..0000000000000 --- a/statistics/handle/usage/table_delta.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package usage - -import ( - "sync" - - "github.com/pingcap/tidb/sessionctx/variable" -) - -// TableDelta is used to collect tables' change information. -// All methods of it are thread-safe. -type TableDelta struct { - delta map[int64]variable.TableDelta // map[tableID]delta - lock sync.Mutex -} - -// NewTableDelta creates a new TableDelta. -func NewTableDelta() *TableDelta { - return &TableDelta{ - delta: make(map[int64]variable.TableDelta), - } -} - -// Reset resets the TableDelta. -func (m *TableDelta) Reset() { - m.lock.Lock() - defer m.lock.Unlock() - m.delta = make(map[int64]variable.TableDelta) -} - -// GetDeltaAndReset gets the delta and resets the TableDelta. -func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta { - m.lock.Lock() - defer m.lock.Unlock() - ret := m.delta - m.delta = make(map[int64]variable.TableDelta) - return ret -} - -// Update updates the delta of the table. -func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { - m.lock.Lock() - defer m.lock.Unlock() - UpdateTableDeltaMap(m.delta, id, delta, count, colSize) -} - -// Merge merges the deltaMap into the TableDelta. -func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) { - if len(deltaMap) == 0 { - return - } - m.lock.Lock() - defer m.lock.Unlock() - for id, item := range deltaMap { - UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) - } -} - -// UpdateTableDeltaMap updates the delta of the table. -func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { - item := m[id] - item.Delta += delta - item.Count += count - if item.ColSize == nil { - item.ColSize = make(map[int64]int64) - } - if colSize != nil { - for key, val := range *colSize { - item.ColSize[key] += val - } - } - m[id] = item -}