From 42b624cbdedbd2f29386427fae0c539041647e09 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 22 Jul 2024 17:21:04 +0800 Subject: [PATCH] domain: avoit to print too many log if the ddl job of runaway table is not finished (#52283) (#54802) close pingcap/tidb#52048 --- pkg/domain/domain.go | 3 +-- pkg/domain/resourcegroup/runaway.go | 21 +++++++++++++++ pkg/domain/runaway.go | 41 ++++++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 0fcf4f2131afa..51bd005d2c63a 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1292,8 +1292,7 @@ func (do *Domain) Init( do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop") do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper") do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper") - do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop") - do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop") + do.wg.Run(do.runawayStartLoop, "runawayStartLoop") do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop") if !skipRegisterToDashboard { do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper") diff --git a/pkg/domain/resourcegroup/runaway.go b/pkg/domain/resourcegroup/runaway.go index a7313324d4bbb..026791297cfe4 100644 --- a/pkg/domain/resourcegroup/runaway.go +++ b/pkg/domain/resourcegroup/runaway.go @@ -184,6 +184,9 @@ func (r *QuarantineRecord) GenDeletionStmt() (string, []any) { // RunawayManager is used to detect and record runaway queries. type RunawayManager struct { + syncerInitialized atomic.Bool + logOnce sync.Once + // queryLock is used to avoid repeated additions. Since we will add new items to the system table, // in order to avoid repeated additions, we need a lock to ensure that // action "judging whether there is this record in the current watch list and adding records" have atomicity. @@ -219,6 +222,7 @@ func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serv go watchList.Start() staleQuarantineChan := make(chan *QuarantineRecord, maxWatchRecordChannelSize) m := &RunawayManager{ + syncerInitialized: atomic.Bool{}, resourceGroupCtl: resourceGroupCtl, watchList: watchList, serverID: serverAddr, @@ -245,6 +249,11 @@ func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serv return m } +// MarkSyncerInitialized is used to mark the syncer is initialized. +func (rm *RunawayManager) MarkSyncerInitialized() { + rm.syncerInitialized.Store(true) +} + // DeriveChecker derives a RunawayChecker from the given resource group func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string) *RunawayChecker { group, err := rm.resourceGroupCtl.GetResourceGroup(resourceGroupName) @@ -282,6 +291,12 @@ func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watc } // Add record without ID into watch list in this TiDB right now. rm.addWatchList(record, ttl, false) + if !rm.syncerInitialized.Load() { + rm.logOnce.Do(func() { + logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") + }) + return + } select { case rm.quarantineChan <- record: default: @@ -380,6 +395,12 @@ func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord { func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest string, action string, matchType RunawayMatchType, now *time.Time) { source := rm.serverID + if !rm.syncerInitialized.Load() { + rm.logOnce.Do(func() { + logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") + }) + return + } select { case rm.runawayQueriesChan <- &RunawayRecord{ ResourceGroupName: resourceGroupName, diff --git a/pkg/domain/runaway.go b/pkg/domain/runaway.go index e2b043625a655..e5fa14e85198d 100644 --- a/pkg/domain/runaway.go +++ b/pkg/domain/runaway.go @@ -54,7 +54,8 @@ const ( runawayRecordGCBatchSize = 100 runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5 - maxIDRetries = 3 + maxIDRetries = 3 + runawayLoopLogErrorIntervalCount = 1800 ) var systemSchemaCIStr = model.NewCIStr("mysql") @@ -140,12 +141,41 @@ func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration t } } +func (do *Domain) runawayStartLoop() { + defer util.Recover(metrics.LabelDomain, "runawayStartLoop", nil, false) + runawayWatchSyncTicker := time.NewTicker(runawayWatchSyncInterval) + count := 0 + var err error + logutil.BgLogger().Info("try to start runaway manager loop") + for { + select { + case <-do.exit: + return + case <-runawayWatchSyncTicker.C: + // Due to the watch and watch done tables is created later than runaway queries table + err = do.updateNewAndDoneWatch() + if err == nil { + logutil.BgLogger().Info("preparations for the runaway manager are finished and start runaway manager loop") + do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop") + do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop") + do.runawayManager.MarkSyncerInitialized() + return + } + } + if count %= runawayLoopLogErrorIntervalCount; count == 0 { + logutil.BgLogger().Warn( + "failed to start runaway manager loop, please check whether the bootstrap or update is finished", + zap.Error(err)) + } + count++ + } +} + func (do *Domain) updateNewAndDoneWatch() error { do.runawaySyncer.mu.Lock() defer do.runawaySyncer.mu.Unlock() records, err := do.runawaySyncer.getNewWatchRecords() if err != nil { - logutil.BgLogger().Error("try to get new runaway watch", zap.Error(err)) return err } for _, r := range records { @@ -153,7 +183,6 @@ func (do *Domain) updateNewAndDoneWatch() error { } doneRecords, err := do.runawaySyncer.getNewWatchDoneRecords() if err != nil { - logutil.BgLogger().Error("try to get done runaway watch", zap.Error(err)) return err } for _, r := range doneRecords { @@ -165,6 +194,7 @@ func (do *Domain) updateNewAndDoneWatch() error { func (do *Domain) runawayWatchSyncLoop() { defer util.Recover(metrics.LabelDomain, "runawayWatchSyncLoop", nil, false) runawayWatchSyncTicker := time.NewTicker(runawayWatchSyncInterval) + count := 0 for { select { case <-do.exit: @@ -172,7 +202,10 @@ func (do *Domain) runawayWatchSyncLoop() { case <-runawayWatchSyncTicker.C: err := do.updateNewAndDoneWatch() if err != nil { - logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err)) + if count %= runawayLoopLogErrorIntervalCount; count == 0 { + logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err)) + } + count++ } } }