Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Optimize statements summary by using buffer pool #58544

Merged
merged 8 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/util/stmtsummary/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//pkg/types",
"//pkg/util",
"//pkg/util/execdetails",
"//pkg/util/hack",
"//pkg/util/plancodec",
"//pkg/util/ppcpuusage",
"@com_github_pingcap_log//:log",
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/stmtsummary/evicted.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newStmtSummaryByDigestEvictedElement(beginTime int64, endTime int64) *stmtS
}

// AddEvicted is used add an evicted record to stmtSummaryByDigestEvicted
func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *stmtSummaryByDigestKey, evictedValue *stmtSummaryByDigest, historySize int) {
func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *StmtDigestKey, evictedValue *stmtSummaryByDigest, historySize int) {
if evictedValue == nil {
return
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (ssbde *stmtSummaryByDigestEvicted) Clear() {
}

// add an evicted record to stmtSummaryByDigestEvictedElement
func (seElement *stmtSummaryByDigestEvictedElement) addEvicted(digestKey *stmtSummaryByDigestKey, digestValue *stmtSummaryByDigestElement) {
func (seElement *stmtSummaryByDigestEvictedElement) addEvicted(digestKey *StmtDigestKey, digestValue *stmtSummaryByDigestElement) {
if digestKey != nil {
seElement.count++
addInfo(seElement.otherSummary, digestValue)
Expand All @@ -169,7 +169,7 @@ const (
// if matches, it will add the digest and return enum match
// if digest too old, it will return enum tooOld and do nothing
// if digest too young, it will return enum tooYoung and do nothing
func (seElement *stmtSummaryByDigestEvictedElement) matchAndAdd(digestKey *stmtSummaryByDigestKey, digestValue *stmtSummaryByDigestElement) (statement int) {
func (seElement *stmtSummaryByDigestEvictedElement) matchAndAdd(digestKey *StmtDigestKey, digestValue *stmtSummaryByDigestElement) (statement int) {
if seElement == nil || digestValue == nil {
return isTooYoung
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/stmtsummary/evicted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ func newInduceSsbde(beginTime int64, endTime int64) *stmtSummaryByDigestElement
return newSsbde
}

// generate new stmtSummaryByDigestKey and stmtSummaryByDigest
func generateStmtSummaryByDigestKeyValue(schema string, beginTime int64, endTime int64) (*stmtSummaryByDigestKey, *stmtSummaryByDigest) {
key := &stmtSummaryByDigestKey{
schemaName: schema,
}
// generate new StmtDigestKey and stmtSummaryByDigest
func generateStmtSummaryByDigestKeyValue(schema string, beginTime int64, endTime int64) (*StmtDigestKey, *stmtSummaryByDigest) {
key := &StmtDigestKey{}
key.Init(schema, "", "", "", "")
value := newInduceSsbd(beginTime, endTime)
return key, value
}
Expand Down Expand Up @@ -191,7 +190,8 @@ func TestSimpleStmtSummaryByDigestEvicted(t *testing.T) {
ssbde.AddEvicted(evictedKey, evictedValue, 3)
require.Equal(t, "{begin: 8, end: 9, count: 1}, {begin: 5, end: 6, count: 1}, {begin: 2, end: 3, count: 1}", getAllEvicted(ssbde))

evictedKey = &stmtSummaryByDigestKey{schemaName: "b"}
evictedKey = &StmtDigestKey{}
evictedKey.Init("b", "", "", "", "")
ssbde.AddEvicted(evictedKey, evictedValue, 4)
require.Equal(t, "{begin: 8, end: 9, count: 2}, {begin: 5, end: 6, count: 2}, {begin: 2, end: 3, count: 2}, {begin: 1, end: 2, count: 1}", getAllEvicted(ssbde))

Expand Down Expand Up @@ -307,7 +307,7 @@ func TestEvictedCountDetailed(t *testing.T) {
ssMap.Clear()
other := ssMap.other
// test poisoning with empty-history digestValue
other.AddEvicted(new(stmtSummaryByDigestKey), new(stmtSummaryByDigest), 100)
other.AddEvicted(new(StmtDigestKey), new(stmtSummaryByDigest), 100)
require.Equal(t, 0, other.history.Len())
}

Expand Down
68 changes: 36 additions & 32 deletions pkg/util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,38 @@ import (
atomic2 "go.uber.org/atomic"
)

// stmtSummaryByDigestKey defines key for stmtSummaryByDigestMap.summaryMap.
type stmtSummaryByDigestKey struct {
// Same statements may appear in different schema, but they refer to different tables.
schemaName string
digest string
// The digest of the previous statement.
prevDigest string
// The digest of the plan of this SQL.
planDigest string
// `resourceGroupName` is the resource group's name of this statement is bind to.
resourceGroupName string
// StmtDigestKeyPool is the pool for StmtDigestKey.
var StmtDigestKeyPool = sync.Pool{
New: func() any {
return &StmtDigestKey{}
},
}

// StmtDigestKey defines key for stmtSummaryByDigestMap.summaryMap.
type StmtDigestKey struct {
// `hash` is the hash value of this object.
hash []byte
}

// Init initialize the hash key.
func (key *StmtDigestKey) Init(schemaName, digest, prevDigest, planDigest, resourceGroupName string) {
length := len(schemaName) + len(digest) + len(prevDigest) + len(planDigest) + len(resourceGroupName)
if cap(key.hash) < length {
key.hash = make([]byte, 0, length)
} else {
key.hash = key.hash[:0]
}
key.hash = append(key.hash, hack.Slice(digest)...)
key.hash = append(key.hash, hack.Slice(schemaName)...)
key.hash = append(key.hash, hack.Slice(prevDigest)...)
key.hash = append(key.hash, hack.Slice(planDigest)...)
key.hash = append(key.hash, hack.Slice(resourceGroupName)...)
}

// Hash implements SimpleLRUCache.Key.
// Only when current SQL is `commit` do we record `prevSQL`. Otherwise, `prevSQL` is empty.
// `prevSQL` is included in the key To distinguish different transactions.
func (key *stmtSummaryByDigestKey) Hash() []byte {
if len(key.hash) == 0 {
key.hash = make([]byte, 0, len(key.schemaName)+len(key.digest)+len(key.prevDigest)+len(key.planDigest)+len(key.resourceGroupName))
key.hash = append(key.hash, hack.Slice(key.digest)...)
key.hash = append(key.hash, hack.Slice(key.schemaName)...)
key.hash = append(key.hash, hack.Slice(key.prevDigest)...)
key.hash = append(key.hash, hack.Slice(key.planDigest)...)
key.hash = append(key.hash, hack.Slice(key.resourceGroupName)...)
}
func (key *StmtDigestKey) Hash() []byte {
return key.hash
}

Expand Down Expand Up @@ -308,7 +313,7 @@ func newStmtSummaryByDigestMap() *stmtSummaryByDigestMap {
}
newSsMap.summaryMap.SetOnEvict(func(k kvcache.Key, v kvcache.Value) {
historySize := newSsMap.historySize()
newSsMap.other.AddEvicted(k.(*stmtSummaryByDigestKey), v.(*stmtSummaryByDigest), historySize)
newSsMap.other.AddEvicted(k.(*StmtDigestKey), v.(*stmtSummaryByDigest), historySize)
})
return newSsMap
}
Expand All @@ -335,16 +340,11 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
historySize = ssMap.historySize()
}

key := &stmtSummaryByDigestKey{
schemaName: sei.SchemaName,
digest: sei.Digest,
prevDigest: sei.PrevSQLDigest,
planDigest: sei.PlanDigest,
resourceGroupName: sei.ResourceGroupName,
}
// Calculate hash value in advance, to reduce the time holding the lock.
key.Hash()
key := StmtDigestKeyPool.Get().(*StmtDigestKey)
// Init hash value in advance, to reduce the time holding the lock.
key.Init(sei.SchemaName, sei.Digest, sei.PrevSQLDigest, sei.PlanDigest, sei.ResourceGroupName)

var exist bool
// Enclose the block in a function to ensure the lock will always be released.
summary, beginTime := func() (*stmtSummaryByDigest, int64) {
ssMap.Lock()
Expand All @@ -365,9 +365,10 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
}

beginTime := ssMap.beginTimeForCurInterval
value, ok := ssMap.summaryMap.Get(key)
var value kvcache.Value
value, exist = ssMap.summaryMap.Get(key)
var summary *stmtSummaryByDigest
if !ok {
if !exist {
// Lazy initialize it to release ssMap.mutex ASAP.
summary = new(stmtSummaryByDigest)
ssMap.summaryMap.Put(key, summary)
Expand All @@ -381,6 +382,9 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
if summary != nil {
summary.add(sei, beginTime, intervalSeconds, historySize)
}
if exist {
StmtDigestKeyPool.Put(key)
}
}

// Clear removes all statement summaries.
Expand Down
106 changes: 30 additions & 76 deletions pkg/util/stmtsummary/statement_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtsummary

import (
"bytes"
"container/list"
"fmt"
"strings"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -75,12 +77,8 @@ func TestAddStatement(t *testing.T) {
// first statement
stmtExecInfo1 := generateAnyExecInfo()
stmtExecInfo1.ExecDetail.CommitDetail.Mu.PrewriteBackoffTypes = make([]string, 0)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
samplePlan, _, _ := stmtExecInfo1.LazyInfo.GetEncodedPlan()
stmtExecInfo1.ExecDetail.CommitDetail.Mu.Lock()
expectedSummaryElement := stmtSummaryByDigestElement{
Expand Down Expand Up @@ -454,12 +452,8 @@ func TestAddStatement(t *testing.T) {
stmtExecInfo4 := stmtExecInfo1
stmtExecInfo4.SchemaName = "schema2"
stmtExecInfo4.ExecDetail.CommitDetail = nil
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo4.SchemaName,
digest: stmtExecInfo4.Digest,
planDigest: stmtExecInfo4.PlanDigest,
resourceGroupName: stmtExecInfo4.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo4.SchemaName, stmtExecInfo4.Digest, "", stmtExecInfo4.PlanDigest, stmtExecInfo4.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo4)
require.Equal(t, 2, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -468,12 +462,8 @@ func TestAddStatement(t *testing.T) {
// Fifth statement has a different digest.
stmtExecInfo5 := stmtExecInfo1
stmtExecInfo5.Digest = "digest2"
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo5.SchemaName,
digest: stmtExecInfo5.Digest,
planDigest: stmtExecInfo4.PlanDigest,
resourceGroupName: stmtExecInfo5.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo5.SchemaName, stmtExecInfo5.Digest, "", stmtExecInfo5.PlanDigest, stmtExecInfo5.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo5)
require.Equal(t, 3, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -482,12 +472,8 @@ func TestAddStatement(t *testing.T) {
// Sixth statement has a different plan digest.
stmtExecInfo6 := stmtExecInfo1
stmtExecInfo6.PlanDigest = "plan_digest2"
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo6.SchemaName,
digest: stmtExecInfo6.Digest,
planDigest: stmtExecInfo6.PlanDigest,
resourceGroupName: stmtExecInfo6.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo6.SchemaName, stmtExecInfo6.Digest, "", stmtExecInfo6.PlanDigest, stmtExecInfo6.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo6)
require.Equal(t, 4, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -507,18 +493,16 @@ func TestAddStatement(t *testing.T) {
binPlan: "",
planDigest: "",
}
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo7.SchemaName,
digest: stmtExecInfo7.Digest,
planDigest: stmtExecInfo7.PlanDigest,
resourceGroupName: stmtExecInfo7.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo7.SchemaName, stmtExecInfo7.Digest, "", stmtExecInfo7.PlanDigest, stmtExecInfo7.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo7)
require.Equal(t, 5, ssMap.summaryMap.Size())
v, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
stmt := v.(*stmtSummaryByDigest)
require.Equal(t, key.digest, stmt.digest)
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.schemaName)))
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.digest)))
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.planDigest)))
e := stmt.history.Back()
ssElement := e.Value.(*stmtSummaryByDigestElement)
require.Equal(t, plancodec.PlanDiscardedEncoded, ssElement.samplePlan)
Expand Down Expand Up @@ -1044,12 +1028,8 @@ func TestMaxStmtCount(t *testing.T) {

// LRU cache should work.
for i := loops - 10; i < loops; i++ {
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: fmt.Sprintf("digest%d", i),
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, fmt.Sprintf("digest%d", i), "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
key.Hash()
_, ok := sm.Get(key)
require.True(t, ok)
Expand Down Expand Up @@ -1092,13 +1072,8 @@ func TestMaxSQLLength(t *testing.T) {
stmtExecInfo1.NormalizedSQL = str
ssMap.AddStatement(stmtExecInfo1)

key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
prevDigest: stmtExecInfo1.PrevSQLDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)

Expand Down Expand Up @@ -1301,12 +1276,8 @@ func TestRefreshCurrentSummary(t *testing.T) {

ssMap.beginTimeForCurInterval = now + 10
stmtExecInfo1 := generateAnyExecInfo()
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo1)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
Expand Down Expand Up @@ -1352,12 +1323,8 @@ func TestSummaryHistory(t *testing.T) {
}()

stmtExecInfo1 := generateAnyExecInfo()
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
for i := range 11 {
ssMap.beginTimeForCurInterval = now + int64(i+1)*10
ssMap.AddStatement(stmtExecInfo1)
Expand Down Expand Up @@ -1425,13 +1392,8 @@ func TestPrevSQL(t *testing.T) {
stmtExecInfo1.PrevSQL = "prevSQL"
stmtExecInfo1.PrevSQLDigest = "prevSQLDigest"
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
prevDigest: stmtExecInfo1.PrevSQLDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, stmtExecInfo1.PrevSQLDigest, stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
_, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand All @@ -1444,9 +1406,9 @@ func TestPrevSQL(t *testing.T) {
stmtExecInfo2 := stmtExecInfo1
stmtExecInfo2.PrevSQL = "prevSQL1"
stmtExecInfo2.PrevSQLDigest = "prevSQLDigest1"
key.prevDigest = stmtExecInfo2.PrevSQLDigest
ssMap.AddStatement(stmtExecInfo2)
require.Equal(t, 2, ssMap.summaryMap.Size())
key.Init(stmtExecInfo2.SchemaName, stmtExecInfo2.Digest, stmtExecInfo2.PrevSQLDigest, stmtExecInfo2.PlanDigest, stmtExecInfo2.ResourceGroupName)
_, ok = ssMap.summaryMap.Get(key)
require.True(t, ok)
}
Expand All @@ -1458,12 +1420,8 @@ func TestEndTime(t *testing.T) {

stmtExecInfo1 := generateAnyExecInfo()
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand Down Expand Up @@ -1508,12 +1466,8 @@ func TestPointGet(t *testing.T) {
stmtExecInfo1.PlanDigest = ""
stmtExecInfo1.LazyInfo.(*mockLazyInfo).plan = fakePlanDigestGenerator()
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: "",
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", "", stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand Down
Loading