diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index e4ec714ebc3a1..8cbfd4a96040f 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -40,7 +40,7 @@ go_test( "topn_bench_test.go", ], flaky = True, - shard_count = 14, + shard_count = 18, deps = [ ":globalstats", "//pkg/config", diff --git a/pkg/statistics/handle/globalstats/global_stats_async.go b/pkg/statistics/handle/globalstats/global_stats_async.go index a5816016d20a7..f46f18292c750 100644 --- a/pkg/statistics/handle/globalstats/global_stats_async.go +++ b/pkg/statistics/handle/globalstats/global_stats_async.go @@ -31,6 +31,8 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -79,8 +81,11 @@ type AsyncMergePartitionStats2GlobalStats struct { PartitionDefinition map[int64]model.PartitionDefinition tableInfo map[int64]*model.TableInfo // key is partition id and histID - skipPartition map[skipItem]struct{} - exitWhenErrChan chan struct{} + skipPartition map[skipItem]struct{} + // ioWorker meet error, it will close this channel to notify cpuWorker. + ioWorkerExitWhenErrChan chan struct{} + // cpuWorker exit, it will close this channel to notify ioWorker. + cpuWorkerExitChan chan struct{} globalTableInfo *model.TableInfo histIDs []int64 globalStatsNDV []int64 @@ -97,20 +102,21 @@ func NewAsyncMergePartitionStats2GlobalStats( is infoschema.InfoSchema) (*AsyncMergePartitionStats2GlobalStats, error) { partitionNum := len(globalTableInfo.Partition.Definitions) return &AsyncMergePartitionStats2GlobalStats{ - statsHandle: statsHandle, - cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), - fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), - histogramAndTopn: make(chan mergeItem[*StatsWrapper]), - PartitionDefinition: make(map[int64]model.PartitionDefinition), - tableInfo: make(map[int64]*model.TableInfo), - partitionIDs: make([]int64, 0, partitionNum), - exitWhenErrChan: make(chan struct{}), - skipPartition: make(map[skipItem]struct{}), - allPartitionStats: make(map[int64]*statistics.Table), - globalTableInfo: globalTableInfo, - histIDs: histIDs, - is: is, - partitionNum: partitionNum, + statsHandle: statsHandle, + cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), + fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), + histogramAndTopn: make(chan mergeItem[*StatsWrapper]), + PartitionDefinition: make(map[int64]model.PartitionDefinition), + tableInfo: make(map[int64]*model.TableInfo), + partitionIDs: make([]int64, 0, partitionNum), + ioWorkerExitWhenErrChan: make(chan struct{}), + cpuWorkerExitChan: make(chan struct{}), + skipPartition: make(map[skipItem]struct{}), + allPartitionStats: make(map[int64]*statistics.Table), + globalTableInfo: globalTableInfo, + histIDs: histIDs, + is: is, + partitionNum: partitionNum, }, nil } @@ -218,25 +224,32 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissin func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) { defer func() { if r := recover(); r != nil { - close(a.exitWhenErrChan) + logutil.BgLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats")) + close(a.ioWorkerExitWhenErrChan) err = errors.New(fmt.Sprint(r)) } }() err = a.loadFmsketch(sctx, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.fmsketch) err = a.loadCMsketch(sctx, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.cmsketch) + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex) if err != nil { - close(a.exitWhenErrChan) + close(a.ioWorkerExitWhenErrChan) return err } close(a.histogramAndTopn) @@ -246,13 +259,14 @@ func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { defer func() { if r := recover(); r != nil { - close(a.exitWhenErrChan) + logutil.BgLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats")) err = errors.New(fmt.Sprint(r)) } + close(a.cpuWorkerExitChan) }() a.dealFMSketch() select { - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil default: for i := 0; i < a.globalStats.Num; i++ { @@ -267,10 +281,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem } err = a.dealCMSketch() if err != nil { + logutil.BgLogger().Warn("dealCMSketch failed", zap.Error(err), zap.String("category", "stats")) return err } + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) if err != nil { + logutil.BgLogger().Warn("dealHistogramAndTopN failed", zap.Error(err), zap.String("category", "stats")) return err } return nil @@ -337,7 +359,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Cont case a.fmsketch <- mergeItem[*statistics.FMSketch]{ fmsketch, i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -367,7 +390,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont case a.cmsketch <- mergeItem[*statistics.CMSketch]{ cmsketch, i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -376,6 +400,12 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont } func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error { + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) for i := 0; i < a.globalStats.Num; i++ { hists := make([]*statistics.Histogram, 0, a.partitionNum) topn := make([]*statistics.TopN, 0, a.partitionNum) @@ -402,7 +432,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx session case a.histogramAndTopn <- mergeItem[*StatsWrapper]{ NewStatsWrapper(hists, topn), i, }: - case <-a.exitWhenErrChan: + case <-a.cpuWorkerExitChan: + logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats")) return nil } } @@ -422,13 +453,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() { } else { a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item) } - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return } } } func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { + failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealCMSketch returned error")) + } + }) for { select { case cms, ok := <-a.cmsketch: @@ -443,13 +479,24 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { return err } } - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil } } } func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealHistogramAndTopNErr returned error")) + } + }) + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) for { select { case item, ok := <-a.histogramAndTopn: @@ -478,7 +525,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm a.globalStats.Hg[item.idx].Buckets[j].NDV = 0 } a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx] - case <-a.exitWhenErrChan: + case <-a.ioWorkerExitWhenErrChan: return nil } } diff --git a/pkg/statistics/handle/globalstats/globalstats_test.go b/pkg/statistics/handle/globalstats/globalstats_test.go index 663667e5735d8..84a534517dd0a 100644 --- a/pkg/statistics/handle/globalstats/globalstats_test.go +++ b/pkg/statistics/handle/globalstats/globalstats_test.go @@ -76,6 +76,24 @@ func TestGlobalStatsPanicInIOWorker(t *testing.T) { simpleTest(t) } +func TestGlobalStatsWithCMSketchErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + func TestGlobalStatsPanicInCPUWorker(t *testing.T) { fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker" require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) @@ -85,6 +103,24 @@ func TestGlobalStatsPanicInCPUWorker(t *testing.T) { simpleTest(t) } +func TestGlobalStatsPanicSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsErrorSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + func TestBuildGlobalLevelStats(t *testing.T) { store := testkit.CreateMockStore(t) testKit := testkit.NewTestKit(t, store)