Skip to content

Commit

Permalink
planner: encapsulate session stats collection logic into a single ded…
Browse files Browse the repository at this point in the history
…icated structure (#47512)

ref #46905
  • Loading branch information
qw4990 authored Oct 10, 2023
1 parent 6102427 commit 97c7741
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 300 deletions.
1 change: 0 additions & 1 deletion session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ go_library(
"//sessiontxn",
"//sessiontxn/isolation",
"//sessiontxn/staleread",
"//statistics/handle",
"//statistics/handle/usage",
"//store/driver/error",
"//store/driver/txn",
Expand Down
7 changes: 3 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/statistics/handle/usage"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/driver/txn"
Expand Down Expand Up @@ -228,7 +227,7 @@ type session struct {
sessionVars *variable.SessionVars
sessionManager util.SessionManager

statsCollector *handle.SessionStatsCollector
statsCollector *usage.SessionStatsItem
// ddlOwnerManager is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerManager owner.Manager
// lockedTables use to record the table locks hold by the session.
Expand Down Expand Up @@ -3073,7 +3072,7 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (Session, error) {
// Add stats collector, and it will be freed by background stats worker
// which periodically updates stats using the collected data.
if do.StatsHandle() != nil && do.StatsUpdating() {
s.statsCollector = do.StatsHandle().NewSessionStatsCollector()
s.statsCollector = do.StatsHandle().NewSessionStatsItem()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector()
}
Expand Down Expand Up @@ -3625,7 +3624,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
if s.statsCollector == nil {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
s.statsCollector = dom.StatsHandle().NewSessionStatsItem()
}
if s.idxUsageCollector == nil && GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
Expand Down
4 changes: 1 addition & 3 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ go_test(
"gc_test.go",
"handle_hist_test.go",
"main_test.go",
"update_list_test.go",
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 28,
shard_count = 27,
deps = [
"//config",
"//domain",
Expand All @@ -79,7 +78,6 @@ go_test(
"//statistics/handle/globalstats",
"//statistics/handle/internal",
"//statistics/handle/storage",
"//statistics/handle/usage",
"//testkit",
"//testkit/testsetup",
"//types",
Expand Down
18 changes: 4 additions & 14 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ type Handle struct {
// idxUsageListHead contains all the index usage collectors required by session.
idxUsageListHead *usage.SessionIndexUsageCollector

// listHead contains all the stats collector required by session.
listHead *SessionStatsCollector
// SessionStatsList contains all the stats collector required by session.
*usage.SessionStatsList

// It can be read by multiple readers at the same time without acquiring lock, but it can be
// written only after acquiring the lock.
statsCache *cache.StatsCachePointer

// tableDelta contains all the delta map from collectors when we dump them to KV.
tableDelta *usage.TableDelta

// statsUsage contains all the column stats usage information from collectors when we dump them to KV.
statsUsage *usage.StatsUsage

// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad

Expand All @@ -117,9 +111,7 @@ func (h *Handle) Clear() {
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
h.listHead.ClearForTest()
h.tableDelta.Reset()
h.statsUsage.Reset()
h.ResetSessionStatsList()
}

// NewHandle creates a Handle for update stats.
Expand All @@ -129,7 +121,7 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
handle := &Handle{
gpool: gp.New(math.MaxInt16, time.Minute),
ddlEventCh: make(chan *ddlUtil.Event, 1000),
listHead: NewSessionStatsCollector(),
SessionStatsList: usage.NewSessionStatsList(),
idxUsageListHead: usage.NewSessionIndexUsageCollector(nil),
pool: pool,
sysProcTracker: tracker,
Expand All @@ -147,8 +139,6 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
return nil, err
}
handle.statsCache = statsCache
handle.tableDelta = usage.NewTableDelta()
handle.statsUsage = usage.NewStatsUsage()
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
Expand Down
107 changes: 6 additions & 101 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"cmp"
"slices"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
Expand All @@ -35,74 +34,6 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

func merge(s *SessionStatsCollector, deltaMap *usage.TableDelta, colMap *usage.StatsUsage) {
deltaMap.Merge(s.mapper.GetDeltaAndReset())
colMap.Merge(s.statsUsage.GetUsageAndReset())
}

// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it.
type SessionStatsCollector struct {
mapper *usage.TableDelta
statsUsage *usage.StatsUsage
next *SessionStatsCollector
sync.Mutex

// deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector.
deleted bool
}

// NewSessionStatsCollector initializes a new SessionStatsCollector.
func NewSessionStatsCollector() *SessionStatsCollector {
return &SessionStatsCollector{
mapper: usage.NewTableDelta(),
statsUsage: usage.NewStatsUsage(),
}
}

// Delete only sets the deleted flag true, it will be deleted from list when DumpStatsDeltaToKV is called.
func (s *SessionStatsCollector) Delete() {
s.Lock()
defer s.Unlock()
s.deleted = true
}

// Update will updates the delta and count for one table id.
func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSize *map[int64]int64) {
s.Lock()
defer s.Unlock()
s.mapper.Update(id, delta, count, colSize)
}

// ClearForTest clears the mapper for test.
func (s *SessionStatsCollector) ClearForTest() {
s.Lock()
defer s.Unlock()
s.mapper = usage.NewTableDelta()
s.statsUsage = usage.NewStatsUsage()
s.next = nil
s.deleted = false
}

// UpdateColStatsUsage updates the last time when the column stats are used(needed).
func (s *SessionStatsCollector) UpdateColStatsUsage(colMap map[model.TableItemID]time.Time) {
s.Lock()
defer s.Unlock()
s.statsUsage.Merge(colMap)
}

// NewSessionStatsCollector allocates a stats collector for a session.
func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
h.listHead.Lock()
defer h.listHead.Unlock()
newCollector := &SessionStatsCollector{
mapper: usage.NewTableDelta(),
next: h.listHead.next,
statsUsage: usage.NewStatsUsage(),
}
h.listHead.next = newCollector
return newCollector
}

// NewSessionIndexUsageCollector will add a new SessionIndexUsageCollector into linked list headed by idxUsageListHead.
// idxUsageListHead always points to an empty SessionIndexUsageCollector as a sentinel node. So we let idxUsageListHead.next
// points to new item. It's helpful to sweepIdxUsageList.
Expand Down Expand Up @@ -180,39 +111,13 @@ const (
DumpDelta dumpMode = false
)

// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
deltaMap := usage.NewTableDelta()
colMap := usage.NewStatsUsage()
prev := h.listHead
prev.Lock()
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
// Merge the session stats into deltaMap respectively.
merge(curr, deltaMap, colMap)
if curr.deleted {
prev.next = curr.next
// Since the session is already closed, we can safely unlock it here.
curr.Unlock()
} else {
// Unlock the previous lock, so we only holds at most two session's lock at the same time.
prev.Unlock()
prev = curr
}
}
prev.Unlock()
h.tableDelta.Merge(deltaMap.GetDeltaAndReset())
h.statsUsage.Merge(colMap.GetUsageAndReset())
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.sweepList()
deltaMap := h.tableDelta.GetDeltaAndReset()
h.SweepSessionStatsList()
deltaMap := h.SessionTableDelta().GetDeltaAndReset()
defer func() {
h.tableDelta.Merge(deltaMap)
h.SessionTableDelta().Merge(deltaMap)
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
Expand Down Expand Up @@ -339,10 +244,10 @@ func (h *Handle) DumpColStatsUsageToKV() error {
if !variable.EnableColumnTracking.Load() {
return nil
}
h.sweepList()
colMap := h.statsUsage.GetUsageAndReset()
h.SweepSessionStatsList()
colMap := h.SessionStatsUsage().GetUsageAndReset()
defer func() {
h.statsUsage.Merge(colMap)
h.SessionStatsUsage().Merge(colMap)
}()
type pair struct {
lastUsedAt string
Expand Down
14 changes: 11 additions & 3 deletions statistics/handle/usage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "usage",
srcs = [
"index_usage.go",
"predicate_column.go",
"stats_usage.go",
"table_delta.go",
"session_stats_collect.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle/usage",
visibility = ["//visibility:public"],
Expand All @@ -24,3 +23,12 @@ go_library(
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "usage_test",
timeout = "short",
srcs = ["session_stats_collect_test.go"],
embed = [":usage"],
flaky = True,
deps = ["@com_github_stretchr_testify//require"],
)
21 changes: 11 additions & 10 deletions statistics/handle/usage/index_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type GlobalIndexID struct {
IndexID int64
}

type indexUsageMap map[GlobalIndexID]IndexUsageInformation
type indexUsage map[GlobalIndexID]IndexUsageInformation

// SessionIndexUsageCollector is a list item that holds the index usage mapper. If you want to write or read mapper, you must lock it.
// TODO: use a third-party thread-safe list implementation instead of maintaining the list manually.
// TODO: merge this list into SessionStatsList.
/*
[session1] [session2] [sessionN]
| | |
Expand All @@ -59,7 +60,7 @@ type indexUsageMap map[GlobalIndexID]IndexUsageInformation
[storage]
*/
type SessionIndexUsageCollector struct {
mapper indexUsageMap
mapper indexUsage
next *SessionIndexUsageCollector
sync.Mutex

Expand All @@ -70,19 +71,19 @@ type SessionIndexUsageCollector struct {
// If listHead is not nil, add this element to the list.
func NewSessionIndexUsageCollector(listHead *SessionIndexUsageCollector) *SessionIndexUsageCollector {
if listHead == nil {
return &SessionIndexUsageCollector{mapper: make(indexUsageMap)}
return &SessionIndexUsageCollector{mapper: make(indexUsage)}
}
listHead.Lock()
defer listHead.Unlock()
newCollector := &SessionIndexUsageCollector{
mapper: make(indexUsageMap),
mapper: make(indexUsage),
next: listHead.next,
}
listHead.next = newCollector
return newCollector
}

func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformation) {
func (m indexUsage) updateByKey(id GlobalIndexID, value *IndexUsageInformation) {
item := m[id]
item.QueryCount += value.QueryCount
item.RowsSelected += value.RowsSelected
Expand All @@ -92,12 +93,12 @@ func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformatio
m[id] = item
}

func (m indexUsageMap) update(tableID int64, indexID int64, value *IndexUsageInformation) {
func (m indexUsage) update(tableID int64, indexID int64, value *IndexUsageInformation) {
id := GlobalIndexID{TableID: tableID, IndexID: indexID}
m.updateByKey(id, value)
}

func (m indexUsageMap) merge(destMap indexUsageMap) {
func (m indexUsage) merge(destMap indexUsage) {
for id := range destMap {
item := destMap[id]
m.updateByKey(id, &item)
Expand All @@ -122,10 +123,10 @@ func (s *SessionIndexUsageCollector) Delete() {
// sweepIdxUsageList will loop over the list, merge each session's local index usage information into handle
// and remove closed session's collector.
// For convenience, we keep idxUsageListHead always points to sentinel node. So that we don't need to consider corner case.
func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap {
func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsage {
prev := listHead
prev.Lock()
mapper := make(indexUsageMap)
mapper := make(indexUsage)
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
mapper.merge(curr.mapper)
Expand All @@ -134,7 +135,7 @@ func sweepIdxUsageList(listHead *SessionIndexUsageCollector) indexUsageMap {
curr.Unlock()
} else {
prev.Unlock()
curr.mapper = make(indexUsageMap)
curr.mapper = make(indexUsage)
prev = curr
}
}
Expand Down
Loading

0 comments on commit 97c7741

Please sign in to comment.