Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: avoid copy in the SortSampleItems (#48683) #49536

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
if e.StatsVersion < 2 {
hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
} else {
hg, topn, err = statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType, true, nil)
hg, topn, err = statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType, true, nil, true)
topNs = append(topNs, topn)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ workLoop:
e.memTracker.Release(collector.MemSize)
}
}
hist, topn, err := statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), task.id, collector, task.tp, task.isColumn, e.memTracker)
hist, topn, err := statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), task.id, collector, task.tp, task.isColumn, e.memTracker, e.ctx.GetSessionVars().EnableExtendedStats)
if err != nil {
resultCh <- err
releaseCollectorMemory()
Expand Down
146 changes: 146 additions & 0 deletions pkg/statistics/builder_ext_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func BuildExtendedStats(sctx sessionctx.Context,
tableID int64, cols []*model.ColumnInfo, collectors []*SampleCollector) (*ExtendedStatsColl, error) {
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"

sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, errors.Errorf("invalid sql executor")
}
rows, _, err := sqlExec.ExecRestrictedSQL(kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats), nil, sql, tableID, ExtendedStatsAnalyzed, ExtendedStatsInited)
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, nil
}
statsColl := NewExtendedStatsColl()
for _, row := range rows {
name := row.GetString(0)
item := &ExtendedStatsItem{Tp: uint8(row.GetInt64(1))}
colIDs := row.GetString(2)
err := json.Unmarshal([]byte(colIDs), &item.ColIDs)
if err != nil {
logutil.BgLogger().Error("invalid column_ids in mysql.stats_extended, skip collecting extended stats for this row", zap.String("column_ids", colIDs), zap.Error(err))
continue
}
item = fillExtendedStatsItemVals(sctx, item, cols, collectors)
if item != nil {
statsColl.Stats[name] = item
}
}
if len(statsColl.Stats) == 0 {
return nil, nil
}
return statsColl, nil
}

func fillExtendedStatsItemVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem {
switch item.Tp {
case ast.StatsTypeCardinality, ast.StatsTypeDependency:
return nil
case ast.StatsTypeCorrelation:
return fillExtStatsCorrVals(sctx, item, cols, collectors)
}
return nil
}

func fillExtStatsCorrVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem {
colOffsets := make([]int, 0, 2)
for _, id := range item.ColIDs {
for i, col := range cols {
if col.ID == id {
colOffsets = append(colOffsets, i)
break
}
}
}
if len(colOffsets) != 2 {
return nil
}
// samplesX and samplesY are in order of handle, i.e, their SampleItem.Ordinals are in order.
samplesX := collectors[colOffsets[0]].Samples
// We would modify Ordinal of samplesY, so we make a deep copy.
samplesY := CopySampleItems(collectors[colOffsets[1]].Samples)
sampleNum := min(len(samplesX), len(samplesY))
if sampleNum == 1 {
item.ScalarVals = 1
return item
}
if sampleNum <= 0 {
item.ScalarVals = 0
return item
}

sc := sctx.GetSessionVars().StmtCtx

var err error
err = sortSampleItems(sc, samplesX)
if err != nil {
return nil
}
samplesYInXOrder := make([]*SampleItem, 0, sampleNum)
for i, itemX := range samplesX {
if itemX.Ordinal >= len(samplesY) {
continue
}
itemY := samplesY[itemX.Ordinal]
itemY.Ordinal = i
samplesYInXOrder = append(samplesYInXOrder, itemY)
}
samplesYInYOrder := make([]*SampleItem, len(samplesYInXOrder))
copy(samplesYInYOrder, samplesYInXOrder)
err = sortSampleItems(sc, samplesYInYOrder)
if err != nil {
return nil
}
var corrXYSum float64
for i := 1; i < len(samplesYInYOrder); i++ {
corrXYSum += float64(i) * float64(samplesYInYOrder[i].Ordinal)
}
// X means the ordinal of the item in original sequence, Y means the oridnal of the item in the
// sorted sequence, we know that X and Y value sets are both:
// 0, 1, ..., sampleNum-1
// we can simply compute sum(X) = sum(Y) =
// (sampleNum-1)*sampleNum / 2
// and sum(X^2) = sum(Y^2) =
// (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6
// We use "Pearson correlation coefficient" to compute the order correlation of columns,
// the formula is based on https://en.wikipedia.org/wiki/Pearson_correlation_coefficient.
// Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1.
itemsCount := float64(sampleNum)
corrXSum := (itemsCount - 1) * itemsCount / 2.0
corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0
item.ScalarVals = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
return item
}
77 changes: 77 additions & 0 deletions pkg/statistics/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"testing"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

// BenchmarkBuildHistAndTopN is used to benchmark the performance of BuildHistAndTopN.
// go test -benchmem -run=^$ -bench ^BenchmarkBuildHistAndTopN$ github.com/pingcap/tidb/pkg/statistics
func BenchmarkBuildHistAndTopN(b *testing.B) {
ctx := mock.NewContext()
const cnt = 1000_000
sketch := NewFMSketch(cnt)
data := make([]*SampleItem, 0, 8)
for i := 1; i <= cnt; i++ {
d := types.NewIntDatum(int64(i))
err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d)
require.NoError(b, err)
data = append(data, &SampleItem{Value: d})
}
for i := 1; i < 10; i++ {
d := types.NewIntDatum(int64(2))
err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d)
require.NoError(b, err)
data = append(data, &SampleItem{Value: d})
}
for i := 1; i < 7; i++ {
d := types.NewIntDatum(int64(4))
err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d)
require.NoError(b, err)
data = append(data, &SampleItem{Value: d})
}
for i := 1; i < 5; i++ {
d := types.NewIntDatum(int64(7))
err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d)
require.NoError(b, err)
data = append(data, &SampleItem{Value: d})
}
for i := 1; i < 3; i++ {
d := types.NewIntDatum(int64(11))
err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d)
require.NoError(b, err)
data = append(data, &SampleItem{Value: d})
}
collector := &SampleCollector{
Samples: data,
NullCount: 0,
Count: int64(len(data)),
FMSketch: sketch,
TotalSize: int64(len(data)) * 8,
}
filedType := types.NewFieldType(mysql.TypeLong)
memoryTracker := memory.NewTracker(10, 1024*1024*1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, _ = BuildHistAndTopN(ctx, 256, 500, 0, collector, filedType, true, memoryTracker, false)
}
}
14 changes: 11 additions & 3 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *Sa
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
samples, err := SortSampleItems(sc, samples)
err := sortSampleItems(sc, samples)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -240,6 +240,7 @@ func BuildHistAndTopN(
tp *types.FieldType,
isColumn bool,
memTracker *memory.Tracker,
needExtStats bool,
) (*Histogram, *TopN, error) {
bufferedMemSize := int64(0)
bufferedReleaseSize := int64(0)
Expand Down Expand Up @@ -276,8 +277,15 @@ func BuildHistAndTopN(
return NewHistogram(id, ndv, nullCount, 0, tp, 0, collector.TotalSize), nil, nil
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
samples, err := SortSampleItems(sc, samples)
var samples []*SampleItem
// if we need to build extended stats, we need to copy the samples to avoid modifying the original samples.
if needExtStats {
samples = make([]*SampleItem, len(collector.Samples))
copy(samples, collector.Samples)
} else {
samples = collector.Samples
}
err := sortSampleItems(sc, samples)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions statistics/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ func createTestStatisticsSamples(t *testing.T) *testStatisticsSamples {
}
sc := new(stmtctx.StatementContext)

var err error
s.samples, err = SortSampleItems(sc, samples)
err := sortSampleItems(sc, samples)
require.NoError(t, err)

s.samples = samples
rc := &recordSet{
data: make([]types.Datum, s.count),
count: s.count,
Expand Down
14 changes: 14 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func CopySampleItems(items []*SampleItem) []*SampleItem {
return n
}

<<<<<<< HEAD:statistics/sample.go
// SortSampleItems shallow copies and sorts a slice of SampleItem.
func SortSampleItems(sc *stmtctx.StatementContext, items []*SampleItem) ([]*SampleItem, error) {
sortedItems := make([]*SampleItem, len(items))
Expand Down Expand Up @@ -91,6 +92,19 @@ func (s *sampleItemSorter) Less(i, j int) bool {

func (s *sampleItemSorter) Swap(i, j int) {
s.items[i], s.items[j] = s.items[j], s.items[i]
=======
func sortSampleItems(sc *stmtctx.StatementContext, items []*SampleItem) error {
var err error
slices.SortStableFunc(items, func(i, j *SampleItem) int {
var cmp int
cmp, err = i.Value.Compare(sc.TypeCtx(), &j.Value, collate.GetBinaryCollator())
if err != nil {
return -1
}
return cmp
})
return err
>>>>>>> 4279cd6f230 (*: avoid copy in the SortSampleItems (#48683)):pkg/statistics/sample.go
}

// SampleCollector will collect Samples and calculate the count and ndv of an attribute.
Expand Down
2 changes: 1 addition & 1 deletion statistics/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestBuildStatsOnRowSample(t *testing.T) {
TotalSize: int64(len(data)) * 8,
}
tp := types.NewFieldType(mysql.TypeLonglong)
hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true, nil)
hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true, nil, false)
require.Nilf(t, err, "%+v", err)
topNStr, err := topN.DecodedString(ctx, []byte{tp.GetType()})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion statistics/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func SubTestBuild() func(*testing.T) {
count = col.lessRowCount(nil, types.NewIntDatum(1))
require.Equal(t, 5, int(count))

colv2, topnv2, err := BuildHistAndTopN(ctx, int(bucketCount), topNCount, 2, collector, types.NewFieldType(mysql.TypeLonglong), true, nil)
colv2, topnv2, err := BuildHistAndTopN(ctx, int(bucketCount), topNCount, 2, collector, types.NewFieldType(mysql.TypeLonglong), true, nil, false)
require.NoError(t, err)
require.NotNil(t, topnv2.TopN)
// The most common one's occurrence is 9990, the second most common one's occurrence is 30.
Expand Down