From 0c5736bf90cd4e5f6c7066610f64a9eb88a11631 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 27 Feb 2024 17:51:48 +0800 Subject: [PATCH] log_backup: added more info for slow regions in log backup advancer (#51137) (#51175) close pingcap/tidb#51046 --- br/pkg/streamhelper/advancer.go | 36 ++++++++++++++++++++++++++++--- br/pkg/streamhelper/regioniter.go | 13 +++++++++++ metrics/log_backup.go | 16 ++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 74be82c1d0208..b3dba99b13292 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -5,6 +5,7 @@ package streamhelper import ( "bytes" "context" + "fmt" "math" "strings" "sync" @@ -272,20 +273,49 @@ func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) { c.checkpoints = cps } +func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string { + region, err := locateKeyOfRegion(ctx, c.env, startKey) + if err != nil { + return errors.Annotate(err, "failed to fetch region").Error() + } + r := region.Region + l := region.Leader + prs := []int{} + for _, p := range r.GetPeers() { + prs = append(prs, int(p.StoreId)) + } + metrics.LogBackupCurrentLastRegionID.Set(float64(r.Id)) + metrics.LogBackupCurrentLastRegionLeaderStoreID.Set(float64(l.StoreId)) + return fmt.Sprintf("ID=%d,Leader=%d,ConfVer=%d,Version=%d,Peers=%v,RealRange=%s", + r.GetId(), l.GetStoreId(), r.GetRegionEpoch().GetConfVer(), r.GetRegionEpoch().GetVersion(), + prs, logutil.StringifyRange{StartKey: r.GetStartKey(), EndKey: r.GetEndKey()}) +} + func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (spans.Valued, error) { var targets []spans.Valued var minValue spans.Valued + thresholdTso := tsoBefore(threshold) c.WithCheckpoints(func(vsf *spans.ValueSortedFull) { - vsf.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool { + vsf.TraverseValuesLessThan(thresholdTso, func(v spans.Valued) bool { targets = append(targets, v) return true }) minValue = vsf.Min() }) - log.Info("[log backup advancer hint] current last region", + sctx, cancel := context.WithTimeout(ctx, time.Second) + // Always fetch the hint and update the metrics. + hint := c.fetchRegionHint(sctx, minValue.Key.StartKey) + logger := log.Debug + if minValue.Value < thresholdTso { + logger = log.Info + } + logger("current last region", zap.String("category", "log backup advancer hint"), zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)), - zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339))) + zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)), + zap.String("region-hint", hint), + ) + cancel() if len(targets) == 0 { return minValue, nil } diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index a741824ea36c2..02d8ca9d94e94 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -82,6 +82,19 @@ func IterateRegion(cli TiKVClusterMeta, startKey, endKey []byte) *RegionIter { } } +// locateKeyOfRegion locates the place of the region in the key. +func locateKeyOfRegion(ctx context.Context, cli TiKVClusterMeta, key []byte) (RegionWithLeader, error) { + regions, err := cli.RegionScan(ctx, key, kv.Key(key).Next(), 1) + if err != nil { + return RegionWithLeader{}, err + } + if len(regions) == 0 { + return RegionWithLeader{}, errors.Annotatef(berrors.ErrPDBatchScanRegion, + "scanning the key %s returns empty region", redact.Key(key)) + } + return regions[0], nil +} + func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error { // current pd can't guarantee the consistency of returned regions if len(regions) == 0 { diff --git a/metrics/log_backup.go b/metrics/log_backup.go index e11c82285b54e..f3c522024fc28 100644 --- a/metrics/log_backup.go +++ b/metrics/log_backup.go @@ -28,6 +28,9 @@ var ( RegionCheckpointRequest *prometheus.CounterVec RegionCheckpointFailure *prometheus.CounterVec RegionCheckpointSubscriptionEvent *prometheus.HistogramVec + + LogBackupCurrentLastRegionID prometheus.Gauge + LogBackupCurrentLastRegionLeaderStoreID prometheus.Gauge ) // InitLogBackupMetrics initializes log backup metrics. @@ -84,4 +87,17 @@ func InitLogBackupMetrics() { Help: "The region flush event size.", Buckets: prometheus.ExponentialBuckets(8, 2.0, 12), }, []string{"store"}) + + LogBackupCurrentLastRegionID = NewGauge(prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "log_backup", + Name: "current_last_region_id", + Help: "The id of the region have minimal checkpoint ts in the current running task.", + }) + LogBackupCurrentLastRegionLeaderStoreID = NewGauge(prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "log_backup", + Name: "current_last_region_leader_store_id", + Help: "The leader's store id of the region have minimal checkpoint ts in the current running task.", + }) }