Skip to content

Commit

Permalink
statistics: move MergePartTopN2GlobalTopN into handle/globalstats (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 23, 2023
1 parent 0fd866b commit 639f3f2
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 177 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":statistics"],
flaky = True,
shard_count = 34,
shard_count = 32,
deps = [
"//pkg/config",
"//pkg/parser/ast",
Expand Down
80 changes: 0 additions & 80 deletions pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"slices"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -795,84 +793,6 @@ func NewTopN(n int) *TopN {
return &TopN{TopN: make([]TopNMeta, 0, n)}
}

// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram,
isIndex bool, killed *uint32) (*TopN, []TopNMeta, []*Histogram, error) {
if CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
// datumMap is used to store the mapping from the string type to datum type.
// The datum is used to find the value in the histogram.
datumMap := NewDatumMapCache()
for i, topN := range topNs {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
}
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
counter[encodedVal] += float64(val.Count)
if exists {
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
}
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
// Get the encodedVal from the hists[j]
datum, exists := datumMap.Get(encodedVal)
if !exists {
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return nil, nil, nil, err
}
datum = d
}
// Get the row count which the value is equal to the encodedVal from histogram.
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
if count != 0 {
counter[encodedVal] += count
// Remove the value corresponding to encodedVal from the histogram.
hists[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
}
}
}
}
numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
}
sorted := make([]TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)})
}
globalTopN, leftTopN := GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, leftTopN, hists, nil
}

// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
// The output parameters are the newly generated TopN structure and the remaining numbers.
Expand Down
86 changes: 0 additions & 86 deletions pkg/statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"math"
"math/rand"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -256,39 +254,6 @@ func TestCMSketchCodingTopN(t *testing.T) {
require.NoError(t, err)
}

func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
topNs = append(topNs, topN)
}

// Test merge 2 topN with nil hists.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}

func TestSortTopnMeta(t *testing.T) {
data := []TopNMeta{{
Encoded: []byte("a"),
Expand All @@ -300,54 +265,3 @@ func TestSortTopnMeta(t *testing.T) {
SortTopnMeta(data)
require.Equal(t, uint64(2), data[0].Count)
}

func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*Histogram, 0, 10)
for i := 0; i < 10; i++ {
// Construct Hist
h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}

// Test merge 2 topN.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}
5 changes: 3 additions & 2 deletions pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ go_test(
"globalstats_test.go",
"main_test.go",
"topn_bench_test.go",
"topn_test.go",
],
embed = [":globalstats"],
flaky = True,
shard_count = 18,
shard_count = 20,
deps = [
":globalstats",
"//pkg/config",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
82 changes: 81 additions & 1 deletion pkg/statistics/handle/globalstats/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package globalstats
import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/tiancaiamao/gp"
)

Expand All @@ -32,7 +34,7 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
killed := &sc.GetSessionVars().Killed
// use original method if concurrency equals 1 or for version1
if mergeConcurrency < 2 {
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
}
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
Expand Down Expand Up @@ -113,3 +115,81 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
statistics.SortTopnMeta(result)
return globalTopN, result, wrapper.AllHg, nil
}

// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
isIndex bool, killed *uint32) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
if statistics.CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
// datumMap is used to store the mapping from the string type to datum type.
// The datum is used to find the value in the histogram.
datumMap := statistics.NewDatumMapCache()
for i, topN := range topNs {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
}
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
counter[encodedVal] += float64(val.Count)
if exists {
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
}
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
// Get the encodedVal from the hists[j]
datum, exists := datumMap.Get(encodedVal)
if !exists {
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return nil, nil, nil, err
}
datum = d
}
// Get the row count which the value is equal to the encodedVal from histogram.
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
if count != 0 {
counter[encodedVal] += count
// Remove the value corresponding to encodedVal from the histogram.
hists[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
}
}
}
}
numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
}
sorted := make([]statistics.TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)})
}
globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, leftTopN, hists, nil
}
13 changes: 6 additions & 7 deletions pkg/statistics/handle/globalstats/topn_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package globalstats_test
package globalstats

import (
"fmt"
Expand All @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -77,7 +76,7 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
}
}

Expand Down Expand Up @@ -124,20 +123,20 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
wrapper := globalstats.NewStatsWrapper(hists, topNs)
wrapper := NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > globalstats.MaxPartitionMergeBatchSize {
batchSize = globalstats.MaxPartitionMergeBatchSize
} else if batchSize > MaxPartitionMergeBatchSize {
batchSize = MaxPartitionMergeBatchSize
}
gpool := gp.New(mergeConcurrency, 5*time.Minute)
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = globalstats.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
}
}

Expand Down
Loading

0 comments on commit 639f3f2

Please sign in to comment.