diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel index d623615bc8a92..6b07ad7d70f79 100644 --- a/pkg/statistics/BUILD.bazel +++ b/pkg/statistics/BUILD.bazel @@ -75,7 +75,7 @@ go_test( data = glob(["testdata/**"]), embed = [":statistics"], flaky = True, - shard_count = 34, + shard_count = 32, deps = [ "//pkg/config", "//pkg/parser/ast", diff --git a/pkg/statistics/cmsketch.go b/pkg/statistics/cmsketch.go index 3f246d76bd70c..5023bec566d18 100644 --- a/pkg/statistics/cmsketch.go +++ b/pkg/statistics/cmsketch.go @@ -23,8 +23,6 @@ import ( "slices" "sort" "strings" - "sync/atomic" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -795,84 +793,6 @@ func NewTopN(n int) *TopN { return &TopN{TopN: make([]TopNMeta, 0, n)} } -// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN. -// The input parameters: -// 1. `topNs` are the partition-level topNs to be merged. -// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value. -// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate. -// -// The output parameters: -// 1. `*TopN` is the final global-level topN. -// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. -// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. -func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, - isIndex bool, killed *uint32) (*TopN, []TopNMeta, []*Histogram, error) { - if CheckEmptyTopNs(topNs) { - return nil, nil, hists, nil - } - partNum := len(topNs) - // Different TopN structures may hold the same value, we have to merge them. - counter := make(map[hack.MutableString]float64) - // datumMap is used to store the mapping from the string type to datum type. - // The datum is used to find the value in the histogram. - datumMap := NewDatumMapCache() - for i, topN := range topNs { - if atomic.LoadUint32(killed) == 1 { - return nil, nil, nil, errors.Trace(ErrQueryInterrupted) - } - if topN.TotalCount() == 0 { - continue - } - for _, val := range topN.TopN { - encodedVal := hack.String(val.Encoded) - _, exists := counter[encodedVal] - counter[encodedVal] += float64(val.Count) - if exists { - // We have already calculated the encodedVal from the histogram, so just continue to next topN value. - continue - } - // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. - // 1. Check the topN first. - // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. - for j := 0; j < partNum; j++ { - if atomic.LoadUint32(killed) == 1 { - return nil, nil, nil, errors.Trace(ErrQueryInterrupted) - } - if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 { - continue - } - // Get the encodedVal from the hists[j] - datum, exists := datumMap.Get(encodedVal) - if !exists { - d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc) - if err != nil { - return nil, nil, nil, err - } - datum = d - } - // Get the row count which the value is equal to the encodedVal from histogram. - count, _ := hists[j].EqualRowCount(nil, datum, isIndex) - if count != 0 { - counter[encodedVal] += count - // Remove the value corresponding to encodedVal from the histogram. - hists[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) - } - } - } - } - numTop := len(counter) - if numTop == 0 { - return nil, nil, hists, nil - } - sorted := make([]TopNMeta, 0, numTop) - for value, cnt := range counter { - data := hack.Slice(string(value)) - sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)}) - } - globalTopN, leftTopN := GetMergedTopNFromSortedSlice(sorted, n) - return globalTopN, leftTopN, hists, nil -} - // MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size. // The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated. // The output parameters are the newly generated TopN structure and the remaining numbers. diff --git a/pkg/statistics/cmsketch_test.go b/pkg/statistics/cmsketch_test.go index 9e221645f221e..7cbdfc62450d3 100644 --- a/pkg/statistics/cmsketch_test.go +++ b/pkg/statistics/cmsketch_test.go @@ -19,11 +19,9 @@ import ( "math" "math/rand" "testing" - "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" @@ -256,39 +254,6 @@ func TestCMSketchCodingTopN(t *testing.T) { require.NoError(t, err) } -func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) { - loc := time.UTC - sc := stmtctx.NewStmtCtxWithTimeZone(loc) - version := 1 - isKilled := uint32(0) - - // Prepare TopNs. - topNs := make([]*TopN, 0, 10) - for i := 0; i < 10; i++ { - // Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3. - topN := NewTopN(3) - { - key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1)) - require.NoError(t, err) - topN.AppendTopN(key1, 2) - key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2)) - require.NoError(t, err) - topN.AppendTopN(key2, 2) - key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3)) - require.NoError(t, err) - topN.AppendTopN(key3, 3) - } - topNs = append(topNs, topN) - } - - // Test merge 2 topN with nil hists. - globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled) - require.NoError(t, err) - require.Len(t, globalTopN.TopN, 2, "should only have 2 topN") - require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows") - require.Len(t, leftTopN, 1, "should have 1 left topN") -} - func TestSortTopnMeta(t *testing.T) { data := []TopNMeta{{ Encoded: []byte("a"), @@ -300,54 +265,3 @@ func TestSortTopnMeta(t *testing.T) { SortTopnMeta(data) require.Equal(t, uint64(2), data[0].Count) } - -func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) { - loc := time.UTC - sc := stmtctx.NewStmtCtxWithTimeZone(loc) - version := 1 - isKilled := uint32(0) - - // Prepare TopNs. - topNs := make([]*TopN, 0, 10) - for i := 0; i < 10; i++ { - // Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3. - topN := NewTopN(3) - { - key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1)) - require.NoError(t, err) - topN.AppendTopN(key1, 2) - key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2)) - require.NoError(t, err) - topN.AppendTopN(key2, 2) - if i%2 == 0 { - key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3)) - require.NoError(t, err) - topN.AppendTopN(key3, 3) - } - } - topNs = append(topNs, topN) - } - - // Prepare Hists. - hists := make([]*Histogram, 0, 10) - for i := 0; i < 10; i++ { - // Construct Hist - h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0) - h.Bounds.AppendInt64(0, 1) - h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20}) - h.Bounds.AppendInt64(0, 2) - h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 3) - h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30}) - h.Bounds.AppendInt64(0, 4) - h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40}) - hists = append(hists, h) - } - - // Test merge 2 topN. - globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled) - require.NoError(t, err) - require.Len(t, globalTopN.TopN, 2, "should only have 2 topN") - require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55") - require.Len(t, leftTopN, 1, "should have 1 left topN") -} diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index 8cbfd4a96040f..faea29a259d7c 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -38,11 +38,12 @@ go_test( "globalstats_test.go", "main_test.go", "topn_bench_test.go", + "topn_test.go", ], + embed = [":globalstats"], flaky = True, - shard_count = 18, + shard_count = 20, deps = [ - ":globalstats", "//pkg/config", "//pkg/parser/model", "//pkg/parser/mysql", diff --git a/pkg/statistics/handle/globalstats/topn.go b/pkg/statistics/handle/globalstats/topn.go index 8251070f42e1d..19782f76b2616 100644 --- a/pkg/statistics/handle/globalstats/topn.go +++ b/pkg/statistics/handle/globalstats/topn.go @@ -17,11 +17,13 @@ package globalstats import ( "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/util/hack" "github.com/tiancaiamao/gp" ) @@ -32,7 +34,7 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap killed := &sc.GetSessionVars().Killed // use original method if concurrency equals 1 or for version1 if mergeConcurrency < 2 { - return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed) + return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed) } batchSize := len(wrapper.AllTopN) / mergeConcurrency if batchSize < 1 { @@ -113,3 +115,81 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch statistics.SortTopnMeta(result) return globalTopN, result, wrapper.AllHg, nil } + +// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN. +// The input parameters: +// 1. `topNs` are the partition-level topNs to be merged. +// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value. +// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate. +// +// The output parameters: +// 1. `*TopN` is the final global-level topN. +// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. +// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. +func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram, + isIndex bool, killed *uint32) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { + if statistics.CheckEmptyTopNs(topNs) { + return nil, nil, hists, nil + } + partNum := len(topNs) + // Different TopN structures may hold the same value, we have to merge them. + counter := make(map[hack.MutableString]float64) + // datumMap is used to store the mapping from the string type to datum type. + // The datum is used to find the value in the histogram. + datumMap := statistics.NewDatumMapCache() + for i, topN := range topNs { + if atomic.LoadUint32(killed) == 1 { + return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted) + } + if topN.TotalCount() == 0 { + continue + } + for _, val := range topN.TopN { + encodedVal := hack.String(val.Encoded) + _, exists := counter[encodedVal] + counter[encodedVal] += float64(val.Count) + if exists { + // We have already calculated the encodedVal from the histogram, so just continue to next topN value. + continue + } + // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. + // 1. Check the topN first. + // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. + for j := 0; j < partNum; j++ { + if atomic.LoadUint32(killed) == 1 { + return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted) + } + if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 { + continue + } + // Get the encodedVal from the hists[j] + datum, exists := datumMap.Get(encodedVal) + if !exists { + d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc) + if err != nil { + return nil, nil, nil, err + } + datum = d + } + // Get the row count which the value is equal to the encodedVal from histogram. + count, _ := hists[j].EqualRowCount(nil, datum, isIndex) + if count != 0 { + counter[encodedVal] += count + // Remove the value corresponding to encodedVal from the histogram. + hists[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) + } + } + } + } + numTop := len(counter) + if numTop == 0 { + return nil, nil, hists, nil + } + sorted := make([]statistics.TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n) + return globalTopN, leftTopN, hists, nil +} diff --git a/pkg/statistics/handle/globalstats/topn_bench_test.go b/pkg/statistics/handle/globalstats/topn_bench_test.go index 50c17ef147255..e21125aaa7e2c 100644 --- a/pkg/statistics/handle/globalstats/topn_bench_test.go +++ b/pkg/statistics/handle/globalstats/topn_bench_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package globalstats_test +package globalstats import ( "fmt" @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/globalstats" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" @@ -77,7 +76,7 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { // Benchmark merge 10 topN. - _, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled) + _, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled) } } @@ -124,20 +123,20 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40}) hists = append(hists, h) } - wrapper := globalstats.NewStatsWrapper(hists, topNs) + wrapper := NewStatsWrapper(hists, topNs) const mergeConcurrency = 4 batchSize := len(wrapper.AllTopN) / mergeConcurrency if batchSize < 1 { batchSize = 1 - } else if batchSize > globalstats.MaxPartitionMergeBatchSize { - batchSize = globalstats.MaxPartitionMergeBatchSize + } else if batchSize > MaxPartitionMergeBatchSize { + batchSize = MaxPartitionMergeBatchSize } gpool := gp.New(mergeConcurrency, 5*time.Minute) defer gpool.Close() b.ResetTimer() for i := 0; i < b.N; i++ { // Benchmark merge 10 topN. - _, _, _, _ = globalstats.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled) + _, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled) } } diff --git a/pkg/statistics/handle/globalstats/topn_test.go b/pkg/statistics/handle/globalstats/topn_test.go new file mode 100644 index 0000000000000..7f12024fe2054 --- /dev/null +++ b/pkg/statistics/handle/globalstats/topn_test.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/stretchr/testify/require" +) + +func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) { + loc := time.UTC + sc := stmtctx.NewStmtCtxWithTimeZone(loc) + version := 1 + isKilled := uint32(0) + + // Prepare TopNs. + topNs := make([]*statistics.TopN, 0, 10) + for i := 0; i < 10; i++ { + // Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3. + topN := statistics.NewTopN(3) + { + key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1)) + require.NoError(t, err) + topN.AppendTopN(key1, 2) + key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2)) + require.NoError(t, err) + topN.AppendTopN(key2, 2) + key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3)) + require.NoError(t, err) + topN.AppendTopN(key3, 3) + } + topNs = append(topNs, topN) + } + + // Test merge 2 topN with nil hists. + globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled) + require.NoError(t, err) + require.Len(t, globalTopN.TopN, 2, "should only have 2 topN") + require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows") + require.Len(t, leftTopN, 1, "should have 1 left topN") +} + +func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) { + loc := time.UTC + sc := stmtctx.NewStmtCtxWithTimeZone(loc) + version := 1 + isKilled := uint32(0) + + // Prepare TopNs. + topNs := make([]*statistics.TopN, 0, 10) + for i := 0; i < 10; i++ { + // Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3. + topN := statistics.NewTopN(3) + { + key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1)) + require.NoError(t, err) + topN.AppendTopN(key1, 2) + key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2)) + require.NoError(t, err) + topN.AppendTopN(key2, 2) + if i%2 == 0 { + key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3)) + require.NoError(t, err) + topN.AppendTopN(key3, 3) + } + } + topNs = append(topNs, topN) + } + + // Prepare Hists. + hists := make([]*statistics.Histogram, 0, 10) + for i := 0; i < 10; i++ { + // Construct Hist + h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0) + h.Bounds.AppendInt64(0, 1) + h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20}) + h.Bounds.AppendInt64(0, 2) + h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) + h.Bounds.AppendInt64(0, 3) + h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30}) + h.Bounds.AppendInt64(0, 4) + h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40}) + hists = append(hists, h) + } + + // Test merge 2 topN. + globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled) + require.NoError(t, err) + require.Len(t, globalTopN.TopN, 2, "should only have 2 topN") + require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55") + require.Len(t, leftTopN, 1, "should have 1 left topN") +}