diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 1ecb1d732ac13..34ba7608c5b5a 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -245,6 +245,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config { return mgr.StoreManager.TLSConfig() } +// GetStore gets the tikvStore. +func (mgr *Mgr) GetStore() tikv.Storage { + return mgr.tikvStore +} + // GetLockResolver gets the LockResolver. func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver { return mgr.tikvStore.GetLockResolver() diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 7509c0098ebf7..cb8442cb618dc 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -41,6 +41,9 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//txnkv/rangetask", + "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", @@ -65,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 18, + shard_count = 19, deps = [ ":streamhelper", "//br/pkg/errors", @@ -82,11 +85,16 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/logbackuppb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//txnkv/txnlock", + "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_server_v3//embed", "@io_etcd_go_etcd_server_v3//mvcc", @@ -94,6 +102,7 @@ go_test( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", + "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", ], diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index c064f2dd5e670..7f6f12eeea6e8 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -3,12 +3,16 @@ package streamhelper import ( + "bytes" "context" + "math" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" @@ -17,7 +21,10 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv/rangetask" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -60,7 +67,9 @@ type CheckpointAdvancer struct { // the cached last checkpoint. // if no progress, this cache can help us don't to send useless requests. - lastCheckpoint uint64 + lastCheckpoint *checkpoint + lastCheckpointMu sync.Mutex + inResolvingLock atomic.Bool checkpoints *spans.ValueSortedFull checkpointsMu sync.Mutex @@ -69,6 +78,53 @@ type CheckpointAdvancer struct { subscriberMu sync.Mutex } +// checkpoint represents the TS with specific range. +// it's only used in advancer.go. +type checkpoint struct { + StartKey []byte + EndKey []byte + TS uint64 + + // It's better to use PD timestamp in future, for now + // use local time to decide the time to resolve lock is ok. + resolveLockTime time.Time +} + +func newCheckpointWithTS(ts uint64) *checkpoint { + return &checkpoint{ + TS: ts, + resolveLockTime: time.Now(), + } +} + +func NewCheckpointWithSpan(s spans.Valued) *checkpoint { + return &checkpoint{ + StartKey: s.Key.StartKey, + EndKey: s.Key.EndKey, + TS: s.Value, + resolveLockTime: time.Now(), + } +} + +func (c *checkpoint) safeTS() uint64 { + return c.TS - 1 +} + +func (c *checkpoint) equal(o *checkpoint) bool { + return bytes.Equal(c.StartKey, o.StartKey) && + bytes.Equal(c.EndKey, o.EndKey) && c.TS == o.TS +} + +// if a checkpoint stay in a time too long(3 min) +// we should try to resolve lock for the range +// to keep the RPO in 5 min. +func (c *checkpoint) needResolveLocks() bool { + failpoint.Inject("NeedResolveLocks", func(val failpoint.Value) { + failpoint.Return(val.(bool)) + }) + return time.Since(c.resolveLockTime) > 3*time.Minute +} + // NewCheckpointAdvancer creates a checkpoint advancer with the env. func NewCheckpointAdvancer(env Env) *CheckpointAdvancer { return &CheckpointAdvancer{ @@ -92,6 +148,13 @@ func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) { c.UpdateConfig(cfg) } +// UpdateLastCheckpoint modify the checkpoint in ticking. +func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint) { + c.lastCheckpointMu.Lock() + c.lastCheckpoint = p + c.lastCheckpointMu.Unlock() +} + // Config returns the current config. func (c *CheckpointAdvancer) Config() config.Config { return c.cfg @@ -172,6 +235,10 @@ func tsoBefore(n time.Duration) uint64 { return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0) } +func tsoAfter(ts uint64, n time.Duration) uint64 { + return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n)) +} + func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) { c.checkpointsMu.Lock() defer c.checkpointsMu.Unlock() @@ -179,8 +246,13 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) { f(c.checkpoints) } +// only used for test +func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) { + c.checkpoints = cps +} + func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, - threshold time.Duration) (uint64, error) { + threshold time.Duration) (spans.Valued, error) { var targets []spans.Valued var minValue spans.Valued c.WithCheckpoints(func(vsf *spans.ValueSortedFull) { @@ -194,13 +266,13 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)), zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339))) if len(targets) == 0 { - return minValue.Value, nil + return minValue, nil } err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key }) if err != nil { - return 0, err + return minValue, err } - return minValue.Value, nil + return minValue, nil } func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error { @@ -293,7 +365,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.task = e.Info c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0)) - c.lastCheckpoint = e.Info.StartTs + c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs) p, err := c.env.BlockGCUntil(ctx, c.task.StartTs) if err != nil { log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err)) @@ -323,33 +395,36 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error return nil } -func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool { - if cp < c.lastCheckpoint { +func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool { + cp := NewCheckpointWithSpan(s) + if cp.TS < c.lastCheckpoint.TS { log.Warn("failed to update global checkpoint: stale", - zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp)) + zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS)) return false } - if cp <= c.lastCheckpoint { + // Need resolve lock for different range and same TS + // so check the range and TS here. + if cp.equal(c.lastCheckpoint) { return false } - c.lastCheckpoint = cp - metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint)) + c.UpdateLastCheckpoint(cp) + metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS)) return true } // advanceCheckpointBy advances the checkpoint by a checkpoint getter function. func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, - getCheckpoint func(context.Context) (uint64, error)) error { + getCheckpoint func(context.Context) (spans.Valued, error)) error { start := time.Now() cp, err := getCheckpoint(ctx) if err != nil { return err } - if c.setCheckpoint(cp) { + if c.setCheckpoint(ctx, cp) { log.Info("uploading checkpoint for task", - zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)), - zap.Uint64("checkpoint", cp), + zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)), + zap.Uint64("checkpoint", cp.Value), zap.String("task", c.task.Name), zap.Stringer("take", time.Since(start))) } @@ -403,29 +478,49 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error { func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { c.checkpointsMu.Lock() - c.setCheckpoint(c.checkpoints.MinValue()) + c.setCheckpoint(ctx, c.checkpoints.Min()) c.checkpointsMu.Unlock() - if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil { + if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil { return errors.Annotate(err, "failed to upload global checkpoint") } - p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint-1) + p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS()) if err != nil { return errors.Annotatef(err, "failed to update service GC safe point, current checkpoint is %d, target checkpoint is %d", - c.lastCheckpoint-1, p) + c.lastCheckpoint.safeTS(), p) } - if p <= c.lastCheckpoint-1 { + if p <= c.lastCheckpoint.safeTS() { log.Info("updated log backup GC safe point.", - zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1)) + zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS())) } - if p > c.lastCheckpoint-1 { + if p > c.lastCheckpoint.safeTS() { log.Warn("update log backup GC safe point failed: stale.", - zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1)) + zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS())) } return nil } func (c *CheckpointAdvancer) optionalTick(cx context.Context) error { + // lastCheckpoint is not increased too long enough. + // assume the cluster has expired locks for whatever reasons. + var targets []spans.Valued + if c.lastCheckpoint != nil && c.lastCheckpoint.needResolveLocks() && c.inResolvingLock.CompareAndSwap(false, true) { + c.WithCheckpoints(func(vsf *spans.ValueSortedFull) { + // when get locks here. assume these locks are not belong to same txn, + // but these locks' start ts are close to 1 minute. try resolve these locks at one time + vsf.TraverseValuesLessThan(tsoAfter(c.lastCheckpoint.TS, time.Minute), func(v spans.Valued) bool { + targets = append(targets, v) + return true + }) + }) + if len(targets) != 0 { + log.Info("Advancer starts to resolve locks", zap.Int("targets", len(targets))) + // use new context here to avoid timeout + ctx := context.Background() + c.asyncResolveLocksForRanges(ctx, targets) + } + c.inResolvingLock.Store(false) + } threshold := c.Config().GetDefaultStartPollThreshold() if err := c.subscribeTick(cx); err != nil { log.Warn("Subscriber meet error, would polling the checkpoint.", zap.String("category", "log backup advancer"), @@ -433,13 +528,9 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error { threshold = c.Config().GetSubscriberErrorStartPollThreshold() } - err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) { + return c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) { return c.CalculateGlobalCheckpointLight(cx, threshold) }) - if err != nil { - return err - } - return nil } func (c *CheckpointAdvancer) tick(ctx context.Context) error { @@ -468,3 +559,46 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { return errs } + +func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, targets []spans.Valued) { + // run in another goroutine + // do not block main tick here + go func() { + handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + // we will scan all locks and try to resolve them by check txn status. + return tikv.ResolveLocksForRange( + ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit) + } + workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks") + var wg sync.WaitGroup + for _, r := range targets { + targetRange := r + wg.Add(1) + workerPool.Apply(func() { + defer wg.Done() + // Run resolve lock on the whole TiKV cluster. + // it will use startKey/endKey to scan region in PD. + // but regionCache already has a codecPDClient. so just use decode key here. + // and it almost only include one region here. so set concurrency to 1. + runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner", + c.env.GetStore(), 1, handler) + err := runner.RunOnRange(ctx, targetRange.Key.StartKey, targetRange.Key.EndKey) + if err != nil { + // wait for next tick + log.Warn("resolve locks failed, wait for next tick", zap.String("category", "advancer"), + zap.String("uuid", "log backup advancer"), + zap.Error(err)) + } + }) + } + wg.Wait() + log.Info("finish resolve locks for checkpoint", zap.String("category", "advancer"), + zap.String("uuid", "log backup advancer"), + logutil.Key("StartKey", c.lastCheckpoint.StartKey), + logutil.Key("EndKey", c.lastCheckpoint.EndKey), + zap.Int("targets", len(targets))) + c.lastCheckpointMu.Lock() + c.lastCheckpoint.resolveLockTime = time.Now() + c.lastCheckpointMu.Unlock() + }() +} diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 1527def725f9a..7707783c495f1 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -10,6 +10,8 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/engine" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" @@ -29,6 +31,8 @@ type Env interface { LogBackupService // StreamMeta connects to the metadata service (normally PD). StreamMeta + // GCLockResolver try to resolve locks when region checkpoint stopped. + tikv.RegionLockResolver } // PDRegionScanner is a simple wrapper over PD @@ -83,8 +87,11 @@ type clusterEnv struct { clis *utils.StoreManager *AdvancerExt PDRegionScanner + *AdvancerLockResolver } +var _ Env = &clusterEnv{} + // GetLogBackupClient gets the log backup client. func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) { var cli logbackup.LogBackupClient @@ -98,16 +105,17 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log } // CliEnv creates the Env for CLI usage. -func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env { +func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env { return clusterEnv{ - clis: cli, - AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, - PDRegionScanner: PDRegionScanner{cli.PDClient()}, + clis: cli, + AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, + PDRegionScanner: PDRegionScanner{cli.PDClient()}, + AdvancerLockResolver: newAdvancerLockResolver(tikvStore), } } // TiDBEnv creates the Env by TiDB config. -func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error) { +func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error) { tconf, err := conf.GetTiKVConfig().Security.ToTLSConfig() if err != nil { return nil, err @@ -117,8 +125,9 @@ func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (En Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second, Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second, }, tconf), - AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, - PDRegionScanner: PDRegionScanner{Client: pdCli}, + AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, + PDRegionScanner: PDRegionScanner{Client: pdCli}, + AdvancerLockResolver: newAdvancerLockResolver(tikvStore), }, nil } @@ -137,3 +146,31 @@ type StreamMeta interface { // ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store. ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error } + +var _ tikv.RegionLockResolver = &AdvancerLockResolver{} + +type AdvancerLockResolver struct { + *tikv.BaseRegionLockResolver +} + +func newAdvancerLockResolver(store tikv.Storage) *AdvancerLockResolver { + return &AdvancerLockResolver{ + BaseRegionLockResolver: tikv.NewRegionLockResolver("log backup advancer", store), + } +} + +// ResolveLocksInOneRegion tries to resolve expired locks with this method. +// It will check status of the txn. Resolve the lock if txn is expired, Or do nothing. +func (l *AdvancerLockResolver) ResolveLocksInOneRegion( + bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { + _, err := l.GetStore().GetLockResolver().ResolveLocks(bo, 0, locks) + if err != nil { + return nil, err + } + return loc, nil +} + +// If we don't implement GetStore here, it won't complie. +func (l *AdvancerLockResolver) GetStore() tikv.Storage { + return l.BaseRegionLockResolver.GetStore() +} diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index a01a13f96053e..fabf220b9ff3b 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -10,12 +10,17 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/config" + "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv/txnlock" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -278,3 +283,74 @@ func TestBlocked(t *testing.T) { }) req.ErrorIs(errors.Cause(err), context.DeadlineExceeded) } + +func TestResolveLock(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + if t.Failed() { + fmt.Println(c) + } + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks")) + }() + + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx := context.Background() + minCheckpoint := c.advanceCheckpoints() + env := &testEnv{fakeCluster: c, testCtx: t} + + lockRegion := c.findRegionByKey([]byte("01")) + allLocks := []*txnlock.Lock{ + { + Key: []byte{1}, + // TxnID == minCheckpoint + TxnID: minCheckpoint, + }, + { + Key: []byte{2}, + // TxnID > minCheckpoint + TxnID: minCheckpoint + 1, + }, + } + c.LockRegion(lockRegion, allLocks) + + // ensure resolve locks triggered and collect all locks from scan locks + resolveLockRef := atomic.NewBool(false) + env.resolveLocks = func(locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { + resolveLockRef.Store(true) + require.ElementsMatch(t, locks, allLocks) + return loc, nil + } + adv := streamhelper.NewCheckpointAdvancer(env) + // make lastCheckpoint stuck at 123 + adv.UpdateLastCheckpoint(streamhelper.NewCheckpointWithSpan(spans.Valued{ + Key: kv.KeyRange{ + StartKey: kv.Key([]byte("1")), + EndKey: kv.Key([]byte("2")), + }, + Value: 123, + })) + adv.NewCheckpoints( + spans.Sorted(spans.NewFullWith([]kv.KeyRange{ + { + StartKey: kv.Key([]byte("1")), + EndKey: kv.Key([]byte("2")), + }, + }, 0)), + ) + adv.StartTaskListener(ctx) + require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil }, + time.Second, 50*time.Millisecond) + coll := streamhelper.NewClusterCollector(ctx, env) + err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) + + require.Eventually(t, func() bool { return resolveLockRef.Load() }, + 8*time.Second, 50*time.Microsecond) + require.NoError(t, err) + r, err := coll.Finish(ctx) + require.NoError(t, err) + require.Len(t, r.FailureSubRanges, 0) + require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 0f8ffabf19e2b..3ee3393535626 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -15,9 +15,11 @@ import ( "sync" "sync/atomic" "testing" + "time" backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -26,6 +28,10 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/codec" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv/txnlock" + pd "github.com/tikv/pd/client" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -71,6 +77,8 @@ type region struct { checkpoint atomic.Uint64 fsim flushSimulator + + locks []*txnlock.Lock } type fakeStore struct { @@ -325,6 +333,11 @@ func (f *fakeCluster) findRegionById(rid uint64) *region { return nil } +func (f *fakeCluster) LockRegion(r *region, locks []*txnlock.Lock) *region { + r.locks = locks + return r +} + func (f *fakeCluster) findRegionByKey(key []byte) *region { for _, r := range f.regions { if bytes.Compare(key, r.rng.StartKey) >= 0 && (len(r.rng.EndKey) == 0 || bytes.Compare(key, r.rng.EndKey) < 0) { @@ -582,7 +595,10 @@ type testEnv struct { ranges []kv.KeyRange taskCh chan<- streamhelper.TaskEvent + resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error) + mu sync.Mutex + pd.Client } func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error { @@ -635,3 +651,105 @@ func (t *testEnv) unregisterTask() { Name: "whole", } } + +func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { + for _, r := range t.regions { + if len(r.locks) != 0 { + return r.locks, &tikv.KeyLocation{ + Region: tikv.NewRegionVerID(r.id, 0, 0), + }, nil + } + } + return nil, nil, nil +} + +func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { + for _, r := range t.regions { + if loc != nil && loc.Region.GetID() == r.id { + // reset locks + r.locks = nil + return t.resolveLocks(locks, loc) + } + } + return nil, nil +} + +func (t *testEnv) Identifier() string { + return "advance test" +} + +func (t *testEnv) GetStore() tikv.Storage { + // only used for GetRegionCache once in resolve lock + return &mockTiKVStore{regionCache: tikv.NewRegionCache(&mockPDClient{fakeRegions: t.regions})} +} + +type mockKVStore struct { + kv.Storage +} + +type mockTiKVStore struct { + mockKVStore + tikv.Storage + regionCache *tikv.RegionCache +} + +func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache { + return s.regionCache +} + +func (s *mockTiKVStore) SendReq(bo *tikv.Backoffer, req *tikvrpc.Request, regionID tikv.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) { + scanResp := kvrpcpb.ScanLockResponse{ + // we don't need mock locks here, because we already have mock locks in testEnv.Scanlocks. + // this behaviour is align with gc_worker_test + Locks: nil, + RegionError: nil, + } + return &tikvrpc.Response{Resp: &scanResp}, nil +} + +type mockPDClient struct { + pd.Client + fakeRegions []*region +} + +func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { + sort.Slice(p.fakeRegions, func(i, j int) bool { + return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0 + }) + + result := make([]*pd.Region, 0, len(p.fakeRegions)) + for _, region := range p.fakeRegions { + if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit { + regionInfo := newMockRegion(region.id, region.rng.StartKey, region.rng.EndKey) + result = append(result, regionInfo) + } else if bytes.Compare(region.rng.StartKey, key) > 0 { + break + } + } + return result, nil +} + +func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Store, error) { + return &metapb.Store{ + Id: storeID, + Address: fmt.Sprintf("127.0.0.%d", storeID), + }, nil +} + +func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { + leader := &metapb.Peer{ + Id: regionID, + StoreId: 1, + Role: metapb.PeerRole_Voter, + } + + return &pd.Region{ + Meta: &metapb.Region{ + Id: regionID, + StartKey: startKey, + EndKey: endKey, + Peers: []*metapb.Peer{leader}, + }, + Leader: leader, + } +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 216b102d6836f..56a7c238bc013 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -877,7 +877,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre if err != nil { return err } - env := streamhelper.CliEnv(mgr.StoreManager, etcdCLI) + env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI) advancer := streamhelper.NewCheckpointAdvancer(env) advancer.UpdateConfig(cfg.AdvancerCfg) advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration) diff --git a/domain/domain.go b/domain/domain.go index 146276c916490..ee0d6774655bf 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1292,7 +1292,12 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { log.Warn("pd / etcd client not provided, won't begin Advancer.") return nil } - env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg) + tikvStore, ok := do.Store().(tikv.Storage) + if !ok { + log.Warn("non tikv store, stop begin Advancer.") + return nil + } + env, err := streamhelper.TiDBEnv(tikvStore, pdClient, do.etcdClient, cfg) if err != nil { return err } diff --git a/store/gcworker/BUILD.bazel b/store/gcworker/BUILD.bazel index a95bdce85b6a4..98a5caffa3453 100644 --- a/store/gcworker/BUILD.bazel +++ b/store/gcworker/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/pingcap/tidb/store/gcworker", visibility = ["//visibility:public"], deps = [ - "//br/pkg/utils", "//ddl", "//ddl/label", "//ddl/placement", @@ -51,7 +50,7 @@ go_test( embed = [":gcworker"], flaky = True, race = "on", - shard_count = 30, + shard_count = 29, deps = [ "//ddl/placement", "//ddl/util", diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 60e5317a1baf9..df5dc5a21a3be 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/label" "github.com/pingcap/tidb/ddl/placement" @@ -65,21 +64,16 @@ import ( // GCWorker periodically triggers GC process on tikv server. type GCWorker struct { - uuid string - desc string - store kv.Storage - tikvStore tikv.Storage - pdClient pd.Client - gcIsRunning bool - lastFinish time.Time - cancel context.CancelFunc - done chan error - testingKnobs struct { - scanLocks func(key []byte, regionID uint64, maxVersion uint64) []*txnlock.Lock - batchResolveLocks func(locks []*txnlock.Lock, regionID tikv.RegionVerID, safepoint uint64) (ok bool, err error) - resolveLocks func(locks []*txnlock.Lock, lowResolutionTS uint64) (int64, error) - } - logBackupEnabled bool // check log-backup task existed. + uuid string + desc string + store kv.Storage + tikvStore tikv.Storage + pdClient pd.Client + gcIsRunning bool + lastFinish time.Time + cancel context.CancelFunc + done chan error + regionLockResolver tikv.RegionLockResolver } // NewGCWorker creates a GCWorker instance. @@ -96,15 +90,18 @@ func NewGCWorker(store kv.Storage, pdClient pd.Client) (*GCWorker, error) { if !ok { return nil, errors.New("GC should run against TiKV storage") } + uuid := strconv.FormatUint(ver.Ver, 16) + resolverIdentifier := fmt.Sprintf("gc-worker-%s", uuid) worker := &GCWorker{ - uuid: strconv.FormatUint(ver.Ver, 16), - desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()), - store: store, - tikvStore: tikvStore, - pdClient: pdClient, - gcIsRunning: false, - lastFinish: time.Now(), - done: make(chan error), + uuid: uuid, + desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()), + store: store, + tikvStore: tikvStore, + pdClient: pdClient, + gcIsRunning: false, + lastFinish: time.Now(), + regionLockResolver: tikv.NewRegionLockResolver(resolverIdentifier, tikvStore), + done: make(chan error), } variable.RegisterStatistics(worker) return worker, nil @@ -151,11 +148,6 @@ const ( gcMinConcurrency = 1 gcMaxConcurrency = 128 - gcTryResolveLocksIntervalFromNow = time.Minute * 5 - - // We don't want gc to sweep out the cached info belong to other processes, like coprocessor. - gcScanLockLimit = txnlock.ResolvedCacheSize / 2 - gcEnableKey = "tikv_gc_enable" gcDefaultEnableValue = true @@ -415,17 +407,6 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { metrics.GCJobFailureCounter.WithLabelValues("prepare").Inc() return errors.Trace(err) } else if !ok { - // If skip gc, it still needs to resolve locks with expired TTL, in order not to block log backup. - if w.logBackupEnabled { - tryResolveLocksTS, err := w.getTryResolveLocksTS() - if err != nil { - return errors.Trace(err) - } - // Set 0 to safepoint, which means resolving locks with expired TTL only. - if err = w.legacyResolveLocks(ctx, 0, tryResolveLocksTS, concurrency); err != nil { - return errors.Trace(err) - } - } return nil } // When the worker is just started, or an old GC job has just finished, @@ -1170,25 +1151,12 @@ func (w *GCWorker) checkUsePhysicalScanLock() (bool, error) { } func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurrency int, usePhysical bool) (bool, error) { - // tryResolveLocksTS is defined as `now() - gcTryResolveLocksIntervalFromNow`, - // it used for trying resolve locks, ts of which is smaller than tryResolveLocksTS and expired. - tryResolveLocksTS, err := w.getTryResolveLocksTS() - if err != nil { - return false, err - } - - if tryResolveLocksTS < safePoint { - tryResolveLocksTS = safePoint - } else if !w.logBackupEnabled { - tryResolveLocksTS = safePoint - } - if !usePhysical { - return false, w.legacyResolveLocks(ctx, safePoint, tryResolveLocksTS, concurrency) + return false, w.legacyResolveLocks(ctx, safePoint, concurrency) } // First try resolve locks with physical scan - err = w.resolveLocksPhysical(ctx, safePoint) + err := w.resolveLocksPhysical(ctx, safePoint) if err == nil { return true, nil } @@ -1196,28 +1164,29 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren logutil.Logger(ctx).Error("resolve locks with physical scan failed, trying fallback to legacy resolve lock", zap.String("category", "gc worker"), zap.String("uuid", w.uuid), zap.Uint64("safePoint", safePoint), - zap.Uint64("try-resolve-locks-ts", tryResolveLocksTS), zap.Error(err)) - return false, w.legacyResolveLocks(ctx, safePoint, tryResolveLocksTS, concurrency) + return false, w.legacyResolveLocks(ctx, safePoint, concurrency) } func (w *GCWorker) legacyResolveLocks( ctx context.Context, safePoint uint64, - tryResolveLocksTS uint64, concurrency int, ) error { metrics.GCWorkerCounter.WithLabelValues("resolve_locks").Inc() logutil.Logger(ctx).Info("start resolve locks", zap.String("category", "gc worker"), zap.String("uuid", w.uuid), zap.Uint64("safePoint", safePoint), - zap.Uint64("try-resolve-locks-ts", tryResolveLocksTS), zap.Int("concurrency", concurrency)) startTime := time.Now() handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - return w.resolveLocksForRange(ctx, safePoint, tryResolveLocksTS, r.StartKey, r.EndKey) + scanLimit := uint32(tikv.GCScanLockLimit) + failpoint.Inject("lowScanLockLimit", func() { + scanLimit = 3 + }) + return tikv.ResolveLocksForRange(ctx, w.regionLockResolver, safePoint, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, scanLimit) } runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", w.tikvStore, concurrency, handler) @@ -1234,212 +1203,11 @@ func (w *GCWorker) legacyResolveLocks( logutil.Logger(ctx).Info("finish resolve locks", zap.String("category", "gc worker"), zap.String("uuid", w.uuid), zap.Uint64("safePoint", safePoint), - zap.Uint64("try-resolve-locks-ts", tryResolveLocksTS), zap.Int("regions", runner.CompletedRegions())) metrics.GCHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds()) return nil } -// getTryResolveLocksTS gets the TryResolveLocksTS -// that is defined as `now() - gcTryResolveLocksIntervalFromNow`. -func (w *GCWorker) getTryResolveLocksTS() (uint64, error) { - now, err := w.tikvStore.CurrentTimestamp(kv.GlobalTxnScope) - if err != nil { - return 0, err - } - - gcTryResolveLockTS := oracle.ComposeTS(oracle.ExtractPhysical(now)-gcTryResolveLocksIntervalFromNow.Milliseconds(), oracle.ExtractLogical(now)) - return gcTryResolveLockTS, nil -} - -// batchResolveExpiredLocks tries to resolve expired locks with batch method. -// Travesal the given locks and check that: -// 1. If the ts of lock is equal with or smaller than forceResolveLocksTS(acually equals safepoint), -// it will rollback the txn, no matter the lock is expired of not. -// 2. If the ts of lock is larger than forceResolveLocksTS, it will check status of the txn. -// Resolve the lock if txn is expired, Or do nothing. -func (w *GCWorker) batchResolveExpiredLocks( - bo *tikv.Backoffer, - locks []*txnlock.Lock, - loc tikv.RegionVerID, - forceResolveLocksTS uint64, - tryResolveLocksTS uint64, -) (bool, error) { - if len(locks) == 0 { - return true, nil - } - - forceResolveLocks := make([]*txnlock.Lock, 0, len(locks)) - tryResolveLocks := make([]*txnlock.Lock, 0, len(locks)) - for _, l := range locks { - if l.TxnID <= forceResolveLocksTS { - forceResolveLocks = append(forceResolveLocks, l) - } else { - tryResolveLocks = append(tryResolveLocks, l) - } - } - - logutil.BgLogger().Debug("batchResolveExpiredLocks", - zap.Uint64("force-resolve-locks-ts", forceResolveLocksTS), - zap.Uint64("try-resolve-locks-ts", tryResolveLocksTS), - zap.Int("force-resolve-locks-count", len(forceResolveLocks)), - zap.Int("try-resolve-locks-count", len(tryResolveLocks))) - - var ( - ok bool - err error - ) - if w.testingKnobs.batchResolveLocks != nil { - ok, err = w.testingKnobs.batchResolveLocks(forceResolveLocks, loc, forceResolveLocksTS) - } else { - ok, err = w.tikvStore.GetLockResolver().BatchResolveLocks(bo, forceResolveLocks, loc) - } - if err != nil || !ok { - return ok, err - } - - if w.testingKnobs.resolveLocks != nil { - _, err = w.testingKnobs.resolveLocks(tryResolveLocks, tryResolveLocksTS) - } else { - _, err = w.tikvStore.GetLockResolver().ResolveLocks(bo, 0, tryResolveLocks) - } - return err == nil, errors.Trace(err) -} - -func (w *GCWorker) resolveLocksForRange( - ctx context.Context, - forceResolveLocksTS uint64, - tryResolveLocksTS uint64, - startKey []byte, - endKey []byte, -) (rangetask.TaskStat, error) { - // for scan lock request, we must return all locks even if they are generated - // by the same transaction. because gc worker need to make sure all locks have been - // cleaned. - req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ - MaxVersion: tryResolveLocksTS, - Limit: gcScanLockLimit, - }, kvrpcpb.Context{ - RequestSource: tikvutil.RequestSourceFromCtx(ctx), - }) - - failpoint.Inject("lowScanLockLimit", func() { - req.ScanLock().Limit = 3 - }) - - var stat rangetask.TaskStat - key := startKey - bo := tikv.NewGcResolveLockMaxBackoffer(ctx) - failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) { - sleep := v.(int) - // cooperate with github.com/tikv/client-go/v2/locate/invalidCacheAndRetry - //nolint: SA1029 - ctx = context.WithValue(ctx, "injectedBackoff", struct{}{}) - bo = tikv.NewBackofferWithVars(ctx, sleep, nil) - }) -retryScanAndResolve: - for { - select { - case <-ctx.Done(): - return stat, errors.New("[gc worker] gc job canceled") - default: - } - - req.ScanLock().StartKey = key - loc, err := w.tikvStore.GetRegionCache().LocateKey(bo, key) - if err != nil { - return stat, errors.Trace(err) - } - req.ScanLock().EndKey = loc.EndKey - resp, err := w.tikvStore.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium) - if err != nil { - return stat, errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return stat, errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) - if err != nil { - return stat, errors.Trace(err) - } - continue - } - if resp.Resp == nil { - return stat, errors.Trace(tikverr.ErrBodyMissing) - } - locksResp := resp.Resp.(*kvrpcpb.ScanLockResponse) - if locksResp.GetError() != nil { - return stat, errors.Errorf("unexpected scanlock error: %s", locksResp) - } - locksInfo := locksResp.GetLocks() - locks := make([]*txnlock.Lock, 0, len(locksInfo)) - for _, li := range locksInfo { - locks = append(locks, txnlock.NewLock(li)) - } - if w.testingKnobs.scanLocks != nil { - locks = append(locks, w.testingKnobs.scanLocks(key, loc.Region.GetID(), tryResolveLocksTS)...) - } - locForResolve := loc - for { - ok, err1 := w.batchResolveExpiredLocks(bo, locks, locForResolve.Region, forceResolveLocksTS, tryResolveLocksTS) - if err1 != nil { - return stat, errors.Trace(err1) - } - if !ok { - err = bo.Backoff(tikv.BoTxnLock(), errors.Errorf("remain locks: %d", len(locks))) - if err != nil { - return stat, errors.Trace(err) - } - stillInSame, refreshedLoc, err := w.tryRelocateLocksRegion(bo, locks) - if err != nil { - return stat, errors.Trace(err) - } - if stillInSame { - locForResolve = refreshedLoc - continue - } - continue retryScanAndResolve - } - break - } - if len(locks) < gcScanLockLimit { - stat.CompletedRegions++ - key = loc.EndKey - } else { - logutil.Logger(ctx).Info("region has more than limit locks", zap.String("category", "gc worker"), - zap.String("uuid", w.uuid), - zap.Uint64("region", locForResolve.Region.GetID()), - zap.Int("scan lock limit", gcScanLockLimit)) - metrics.GCRegionTooManyLocksCounter.Inc() - key = locks[len(locks)-1].Key - } - - if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) { - break - } - bo = tikv.NewGcResolveLockMaxBackoffer(ctx) - failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) { - sleep := v.(int) - bo = tikv.NewBackofferWithVars(ctx, sleep, nil) - }) - } - return stat, nil -} - -func (w *GCWorker) tryRelocateLocksRegion(bo *tikv.Backoffer, locks []*txnlock.Lock) (stillInSameRegion bool, refreshedLoc *tikv.KeyLocation, err error) { - if len(locks) == 0 { - return - } - refreshedLoc, err = w.tikvStore.GetRegionCache().LocateKey(bo, locks[0].Key) - if err != nil { - return - } - stillInSameRegion = refreshedLoc.Contains(locks[len(locks)-1].Key) - return -} - // resolveLocksPhysical uses TiKV's `PhysicalScanLock` to scan stale locks in the cluster and resolve them. It tries to // ensure no lock whose ts <= safePoint is left. func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) error { @@ -1929,7 +1697,6 @@ func (w *GCWorker) checkLeader(ctx context.Context) (bool, error) { se := createSession(w.store) defer se.Close() - w.logBackupEnabled = utils.IsLogBackupInUse(se) _, err := se.ExecuteInternal(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) @@ -2266,11 +2033,13 @@ func getGCRules(ids []int64, rules map[string]*label.Rule) []string { } // RunGCJob sends GC command to KV. It is exported for kv api, do not use it with GCWorker at the same time. -func RunGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int) error { +// only use for test +func RunGCJob(ctx context.Context, regionLockResolver tikv.RegionLockResolver, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int) error { gcWorker := &GCWorker{ - tikvStore: s, - uuid: identifier, - pdClient: pd, + tikvStore: s, + uuid: identifier, + pdClient: pd, + regionLockResolver: regionLockResolver, } if concurrency <= 0 { @@ -2303,11 +2072,12 @@ func RunGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint6 // RunDistributedGCJob notifies TiKVs to do GC. It is exported for kv api, do not use it with GCWorker at the same time. // This function may not finish immediately because it may take some time to do resolveLocks. // Param concurrency specifies the concurrency of resolveLocks phase. -func RunDistributedGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int) error { +func RunDistributedGCJob(ctx context.Context, regionLockResolver tikv.RegionLockResolver, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int) error { gcWorker := &GCWorker{ - tikvStore: s, - uuid: identifier, - pdClient: pd, + tikvStore: s, + uuid: identifier, + pdClient: pd, + regionLockResolver: regionLockResolver, } safePoint, err := gcWorker.setGCWorkerServiceSafePoint(ctx, safePoint) @@ -2339,9 +2109,10 @@ func RunDistributedGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safe // It is exported only for test, do not use it in the production environment. func RunResolveLocks(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int, usePhysical bool) (bool, error) { gcWorker := &GCWorker{ - tikvStore: s, - uuid: identifier, - pdClient: pd, + tikvStore: s, + uuid: identifier, + pdClient: pd, + regionLockResolver: tikv.NewRegionLockResolver("test-resolver", s), } return gcWorker.resolveLocks(ctx, safePoint, concurrency, usePhysical) } @@ -2468,7 +2239,7 @@ func newMergeLockScanner(safePoint uint64, client tikv.Client, stores map[uint64 safePoint: safePoint, client: client, stores: stores, - scanLockLimit: gcScanLockLimit, + scanLockLimit: tikv.GCScanLockLimit, } failpoint.Inject("lowPhysicalScanLockLimit", func() { scanner.scanLockLimit = 3 diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index c6d7bc7ff234c..e51628520c8a2 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -49,6 +49,43 @@ import ( pd "github.com/tikv/pd/client" ) +type mockGCWorkerLockResolver struct { + tikv.RegionLockResolver + tikvStore tikv.Storage + scanLocks func([]*txnlock.Lock, []byte) ([]*txnlock.Lock, *tikv.KeyLocation) + batchResolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error) +} + +func (l *mockGCWorkerLockResolver) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { + locks, loc, err := l.RegionLockResolver.ScanLocksInOneRegion(bo, key, maxVersion, limit) + if err != nil { + return nil, nil, err + } + if l.scanLocks != nil { + mockLocks, mockLoc := l.scanLocks(locks, key) + // append locks from mock function + locks = append(locks, mockLocks...) + // use location from mock function + loc = mockLoc + } + return locks, loc, nil +} + +func (l *mockGCWorkerLockResolver) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { + if l.batchResolveLocks != nil { + return l.batchResolveLocks(locks, loc) + } + return l.RegionLockResolver.ResolveLocksInOneRegion(bo, locks, loc) +} + +func (l *mockGCWorkerLockResolver) GetStore() tikv.Storage { + return l.tikvStore +} + +func (l *mockGCWorkerLockResolver) Identifier() string { + return "gc worker test" +} + type mockGCWorkerClient struct { tikv.Client unsafeDestroyRangeHandler handler @@ -272,16 +309,6 @@ func TestGetOracleTime(t *testing.T) { timeEqual(t, t2, t1.Add(time.Second*10), time.Millisecond*10) } -func TestGetLowResolveTS(t *testing.T) { - s := createGCWorkerSuite(t) - - lowResolveTS, err := s.gcWorker.getTryResolveLocksTS() - require.NoError(t, err) - - lowResolveTime := oracle.GetTimeFromTS(lowResolveTS) - timeEqual(t, time.Now(), lowResolveTime.Add(gcTryResolveLocksIntervalFromNow), time.Millisecond*10) -} - func TestMinStartTS(t *testing.T) { s := createGCWorkerSuite(t) @@ -1021,13 +1048,25 @@ func TestResolveLockRangeInfine(t *testing.T) { s := createGCWorkerSuite(t) require.NoError(t, failpoint.Enable("tikvclient/invalidCacheAndRetry", "return(true)")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff", "return(1)")) defer func() { require.NoError(t, failpoint.Disable("tikvclient/invalidCacheAndRetry")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff")) }() - _, err := s.gcWorker.resolveLocksForRange(gcContext(), 1, 3, []byte{0}, []byte{1}) + mockLockResolver := &mockGCWorkerLockResolver{ + RegionLockResolver: tikv.NewRegionLockResolver("test", s.tikvStore), + tikvStore: s.tikvStore, + scanLocks: func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + return []*txnlock.Lock{}, &tikv.KeyLocation{} + }, + batchResolveLocks: func( + locks []*txnlock.Lock, + loc *tikv.KeyLocation, + ) (*tikv.KeyLocation, error) { + // mock error to test backoff + return nil, errors.New("mock error") + }, + } + _, err := tikv.ResolveLocksForRange(gcContext(), mockLockResolver, 1, []byte{0}, []byte{1}, tikv.NewNoopBackoff, 10) require.Error(t, err) } @@ -1040,11 +1079,7 @@ func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) { resolveCnt int resolveCntRef = &resolveCnt - scanLockCnt int - resolveBeforeSafepointLockCnt int - resolveAfterSafepointLockCnt int - safepointTS uint64 = 434245550444904450 - lowResolveTS uint64 = 434245550449098752 + safepointTS uint64 = 434245550444904450 ) allLocks := []*txnlock.Lock{ @@ -1074,57 +1109,32 @@ func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) { }, } - s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64, maxVersion uint64) []*txnlock.Lock { - *scanCntRef++ - - locks := make([]*txnlock.Lock, 0) - for _, l := range allLocks { - if l.TxnID <= maxVersion { - locks = append(locks, l) - scanLockCnt++ + mockLockResolver := &mockGCWorkerLockResolver{ + RegionLockResolver: tikv.NewRegionLockResolver("test", s.tikvStore), + tikvStore: s.tikvStore, + scanLocks: func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + *scanCntRef++ + return allLocks, &tikv.KeyLocation{ + Region: tikv.NewRegionVerID(s.initRegion.regionID, 0, 0), } - } - return locks - } - s.gcWorker.testingKnobs.batchResolveLocks = func( - locks []*txnlock.Lock, - regionID tikv.RegionVerID, - safepoint uint64, - ) (ok bool, err error) { - *resolveCntRef++ - if *resolveCntRef == 1 { - s.gcWorker.tikvStore.GetRegionCache().InvalidateCachedRegion(regionID) - // mock the region cache miss error - return false, nil - } - - resolveBeforeSafepointLockCnt = len(locks) - for _, l := range locks { - require.True(t, l.TxnID <= safepoint) - } - return true, nil - } - - s.gcWorker.testingKnobs.resolveLocks = func( - locks []*txnlock.Lock, - lowResolutionTS uint64, - ) (int64, error) { - for _, l := range locks { - expiredTS := oracle.ComposeTS(oracle.ExtractPhysical(l.TxnID)+int64(l.TTL), oracle.ExtractLogical(l.TxnID)) - if expiredTS <= lowResolutionTS { - resolveAfterSafepointLockCnt++ + }, + batchResolveLocks: func( + locks []*txnlock.Lock, + loc *tikv.KeyLocation, + ) (*tikv.KeyLocation, error) { + *resolveCntRef++ + if *resolveCntRef == 1 { + s.gcWorker.tikvStore.GetRegionCache().InvalidateCachedRegion(loc.Region) + // mock the region cache miss error + return nil, nil } - } - return 0, nil + return loc, nil + }, } - - _, err := s.gcWorker.resolveLocksForRange(gcContext(), safepointTS, lowResolveTS, []byte{0}, []byte{10}) + _, err := tikv.ResolveLocksForRange(gcContext(), mockLockResolver, safepointTS, []byte{0}, []byte{10}, tikv.NewNoopBackoff, 10) require.NoError(t, err) require.Equal(t, 2, resolveCnt) - require.Equal(t, 1, scanCnt) - require.Equal(t, 3, scanLockCnt) - require.Equal(t, 1, resolveBeforeSafepointLockCnt) - require.Equal(t, 1, resolveAfterSafepointLockCnt) + require.Equal(t, 2, scanCnt) } func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { @@ -1148,23 +1158,41 @@ func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { newPeers := []uint64{s.cluster.AllocID(), s.cluster.AllocID(), s.cluster.AllocID()} s.cluster.Split(s.initRegion.regionID, region2, []byte("m"), newPeers, newPeers[0]) - // init a, b lock in region1 and o, p locks in region2 - s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64, maxVersion uint64) []*txnlock.Lock { - if regionID == s.initRegion.regionID { - return []*txnlock.Lock{{Key: []byte("a")}, {Key: []byte("b")}} - } - if regionID == region2 { - return []*txnlock.Lock{{Key: []byte("o")}, {Key: []byte("p")}} - } - return []*txnlock.Lock{} + mockGCLockResolver := &mockGCWorkerLockResolver{ + RegionLockResolver: tikv.NewRegionLockResolver("test", s.tikvStore), + tikvStore: s.tikvStore, + scanLocks: func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + // first time scan locks + region, _, _ := s.cluster.GetRegionByKey(key) + if region.GetId() == s.initRegion.regionID { + return []*txnlock.Lock{{Key: []byte("a")}, {Key: []byte("b")}}, + &tikv.KeyLocation{ + Region: tikv.NewRegionVerID( + region.GetId(), + region.GetRegionEpoch().ConfVer, + region.GetRegionEpoch().Version, + ), + } + } + // second time scan locks + if region.GetId() == region2 { + return []*txnlock.Lock{{Key: []byte("o")}, {Key: []byte("p")}}, + &tikv.KeyLocation{ + Region: tikv.NewRegionVerID( + region.GetId(), + region.GetRegionEpoch().ConfVer, + region.GetRegionEpoch().Version, + ), + } + } + return []*txnlock.Lock{}, nil + }, } - - s.gcWorker.testingKnobs.batchResolveLocks = func( + mockGCLockResolver.batchResolveLocks = func( locks []*txnlock.Lock, - regionID tikv.RegionVerID, - safepoint uint64, - ) (ok bool, err error) { - if regionID.GetID() == s.initRegion.regionID && *firstAccessRef { + loc *tikv.KeyLocation, + ) (*tikv.KeyLocation, error) { + if loc.Region.GetID() == s.initRegion.regionID && *firstAccessRef { *firstAccessRef = false // merge region2 into region1 and return EpochNotMatch error. mCluster := s.cluster.(*testutils.MockCluster) @@ -1172,12 +1200,12 @@ func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID) _, err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch( tikv.NewNoopBackoff(context.Background()), - &tikv.RPCContext{Region: regionID, Store: &tikv.Store{}}, + &tikv.RPCContext{Region: loc.Region, Store: &tikv.Store{}}, []*metapb.Region{regionMeta}) require.NoError(t, err) // also let region1 contains all 4 locks - s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64, maxVersion uint64) []*txnlock.Lock { - if regionID == s.initRegion.regionID { + mockGCLockResolver.scanLocks = func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + if bytes.Equal(key, []byte("")) { locks := []*txnlock.Lock{ {Key: []byte("a")}, {Key: []byte("b")}, @@ -1186,27 +1214,23 @@ func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { } for i, lock := range locks { if bytes.Compare(key, lock.Key) <= 0 { - return locks[i:] + return locks[i:], &tikv.KeyLocation{Region: tikv.NewRegionVerID( + regionMeta.GetId(), + regionMeta.GetRegionEpoch().ConfVer, + regionMeta.GetRegionEpoch().Version)} } } } - return []*txnlock.Lock{} + return []*txnlock.Lock{}, nil } - return false, nil + return nil, nil } for _, lock := range locks { resolvedLock = append(resolvedLock, lock.Key) } - return true, nil - } - s.gcWorker.testingKnobs.resolveLocks = func( - locks []*txnlock.Lock, - lowResolutionTS uint64, - ) (int64, error) { - return 0, nil + return loc, nil } - - _, err := s.gcWorker.resolveLocksForRange(gcContext(), 1, 3, []byte(""), []byte("z")) + _, err := tikv.ResolveLocksForRange(gcContext(), mockGCLockResolver, 1, []byte(""), []byte("z"), tikv.NewGcResolveLockMaxBackoffer, 10) require.NoError(t, err) require.Len(t, resolvedLock, 4) expects := [][]byte{[]byte("a"), []byte("b"), []byte("o"), []byte("p")} @@ -1300,12 +1324,26 @@ func TestSetServiceSafePoint(t *testing.T) { func TestRunGCJobAPI(t *testing.T) { s := createGCWorkerSuite(t) + mockLockResolver := &mockGCWorkerLockResolver{ + RegionLockResolver: tikv.NewRegionLockResolver("test", s.tikvStore), + tikvStore: s.tikvStore, + scanLocks: func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + return []*txnlock.Lock{}, &tikv.KeyLocation{} + }, + batchResolveLocks: func( + locks []*txnlock.Lock, + loc *tikv.KeyLocation, + ) (*tikv.KeyLocation, error) { + // no locks + return loc, nil + }, + } gcSafePointCacheInterval = 0 p := s.createGCProbe(t, "k1") safePoint := s.mustAllocTs(t) - err := RunGCJob(gcContext(), s.tikvStore, s.pdClient, safePoint, "mock", 1) + err := RunGCJob(gcContext(), mockLockResolver, s.tikvStore, s.pdClient, safePoint, "mock", 1) require.NoError(t, err) s.checkCollected(t, p) etcdSafePoint := s.loadEtcdSafePoint(t) @@ -1317,9 +1355,23 @@ func TestRunDistGCJobAPI(t *testing.T) { s := createGCWorkerSuite(t) gcSafePointCacheInterval = 0 + mockLockResolver := &mockGCWorkerLockResolver{ + RegionLockResolver: tikv.NewRegionLockResolver("test", s.tikvStore), + tikvStore: s.tikvStore, + scanLocks: func(_ []*txnlock.Lock, key []byte) ([]*txnlock.Lock, *tikv.KeyLocation) { + return []*txnlock.Lock{}, &tikv.KeyLocation{} + }, + batchResolveLocks: func( + locks []*txnlock.Lock, + loc *tikv.KeyLocation, + ) (*tikv.KeyLocation, error) { + // no locks + return loc, nil + }, + } safePoint := s.mustAllocTs(t) - err := RunDistributedGCJob(gcContext(), s.tikvStore, s.pdClient, safePoint, "mock", 1) + err := RunDistributedGCJob(gcContext(), mockLockResolver, s.tikvStore, s.pdClient, safePoint, "mock", 1) require.NoError(t, err) pdSafePoint := s.mustGetSafePointFromPd(t) require.Equal(t, safePoint, pdSafePoint) @@ -1912,8 +1964,6 @@ func TestGCLabelRules(t *testing.T) { func TestGCWithPendingTxn(t *testing.T) { s := createGCWorkerSuite(t) - // set to false gc worker won't resolve locks after safepoint. - s.gcWorker.logBackupEnabled = false ctx := gcContext() gcSafePointCacheInterval = 0 @@ -1965,8 +2015,6 @@ func TestGCWithPendingTxn(t *testing.T) { func TestGCWithPendingTxn2(t *testing.T) { s := createGCWorkerSuite(t) - // only when log backup enabled will scan locks after safepoint. - s.gcWorker.logBackupEnabled = true ctx := gcContext() gcSafePointCacheInterval = 0 @@ -2030,15 +2078,13 @@ func TestGCWithPendingTxn2(t *testing.T) { require.NoError(t, err) err = txn.Commit(ctx) - require.Error(t, err) + require.NoError(t, err) err = txn2.Commit(ctx) require.NoError(t, err) } func TestSkipGCAndOnlyResolveLock(t *testing.T) { s := createGCWorkerSuite(t) - // only when log backup enabled will scan locks after safepoint. - s.gcWorker.logBackupEnabled = true ctx := gcContext() gcSafePointCacheInterval = 0 @@ -2083,9 +2129,9 @@ func TestSkipGCAndOnlyResolveLock(t *testing.T) { err = s.gcWorker.leaderTick(ctx) require.NoError(t, err) - // check the lock has been resolved. + // check the lock has not been resolved. err = txn.Commit(ctx) - require.Error(t, err) + require.NoError(t, err) // check gc is skipped last, err := s.gcWorker.loadTime(gcLastRunTimeKey)