Skip to content

Commit

Permalink
executor: reuse fm sketch when to analyze (#47268)
Browse files Browse the repository at this point in the history
close #47071
  • Loading branch information
hawkingrei authored Sep 26, 2023
1 parent e70f6e4 commit 62a048c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
2 changes: 2 additions & 0 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
"merge subMergeWorker in AnalyzeColumnsExecV2", -1)
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
mergeResult.collector.DestroyAndPutToPool()
}
defer e.memTracker.Release(rootRowCollector.Base().MemSize)
if err != nil {
Expand Down Expand Up @@ -668,6 +669,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
subCollectorSize := subCollector.Base().MemSize
e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize)
e.memTracker.Release(dataSize + colRespSize)
subCollector.DestroyAndPutToPool()
}

resultCh <- &samplingMergeResult{collector: retCollector}
Expand Down
11 changes: 10 additions & 1 deletion statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,16 @@ func (s *FMSketch) MemoryUsage() (sum int64) {
}

func (s *FMSketch) reset() {
s.hashset.Clear()
// not use hashset.Clear, it will release all memory and Not conducive to memory reuse.
// the size of set is not more than 10000.
set := make([]uint64, 0, s.hashset.Count())
s.hashset.Iter(func(k uint64, v bool) (stop bool) {
set = append(set, k)
return false
})
for _, k := range set {
s.hashset.Delete(k)
}
s.mask = 0
s.maxSize = 0
}
Expand Down
17 changes: 17 additions & 0 deletions statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type RowSampleCollector interface {
MergeCollector(collector RowSampleCollector)
sampleRow(row []types.Datum, rng *rand.Rand)
Base() *baseCollector
DestroyAndPutToPool()
}

type baseCollector struct {
Expand Down Expand Up @@ -228,6 +229,12 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) {
return collector, nil
}

func (s *baseCollector) destroyAndPutToPool() {
for _, sketch := range s.FMSketches {
sketch.DestroyAndPutToPool()
}
}

func (s *baseCollector) collectColumns(sc *stmtctx.StatementContext, cols []types.Datum, sizes []int64) error {
for i, col := range cols {
if col.IsNull() {
Expand Down Expand Up @@ -389,6 +396,11 @@ func (s *ReservoirRowSampleCollector) MergeCollector(subCollector RowSampleColle
}
}

// DestroyAndPutToPool implements the interface RowSampleCollector.
func (s *ReservoirRowSampleCollector) DestroyAndPutToPool() {
s.baseCollector.destroyAndPutToPool()
}

// RowSamplesToProto converts the samp slice to the pb struct.
func RowSamplesToProto(samples WeightedRowSampleHeap) []*tipb.RowSample {
if len(samples) == 0 {
Expand Down Expand Up @@ -472,3 +484,8 @@ func (s *BernoulliRowSampleCollector) MergeCollector(subCollector RowSampleColle
func (s *BernoulliRowSampleCollector) Base() *baseCollector {
return s.baseCollector
}

// DestroyAndPutToPool implements the interface RowSampleCollector.
func (s *BernoulliRowSampleCollector) DestroyAndPutToPool() {
s.baseCollector.destroyAndPutToPool()
}

0 comments on commit 62a048c

Please sign in to comment.