diff --git a/DEPS.bzl b/DEPS.bzl index 7b9d7307da651..2b18bac5e6dff 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4194,8 +4194,8 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sum = "h1:dO0YtekJ9IKiO9dXMZz0rNHPBiGCo+c37RUsnshoLwo=", - version = "v0.0.0-20230519121736-d15a686a670e", + sum = "h1:pKIvLrqEmTMw+J8wwdv4r2QTp+cjW9giiOIyaRy9mLM=", + version = "v0.0.0-20230905092614-113cdedbebb6", ) go_repository( name = "com_github_timakin_bodyclose", @@ -6193,8 +6193,8 @@ def go_deps(): name = "org_uber_go_automaxprocs", build_file_proto_mode = "disable_global", importpath = "go.uber.org/automaxprocs", - sum = "h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=", - version = "v1.5.3", + sum = "h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=", + version = "v1.4.0", ) go_repository( name = "org_uber_go_dig", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index d70d9425e0653..5eca340f1e622 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,7 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), - newOpeartorCommand(), + newOperatorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 57ed59b224d06..2a6d80aa12ffa 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -11,7 +11,7 @@ import ( "github.com/spf13/cobra" ) -func newOpeartorCommand() *cobra.Command { +func newOperatorCommand() *cobra.Command { cmd := &cobra.Command{ Use: "operator ", Short: "utilities for operators like tidb-operator.", @@ -26,14 +26,19 @@ func newOpeartorCommand() *cobra.Command { }, Hidden: true, } - cmd.AddCommand(newPauseGcCommand()) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "pause-gc-and-schedulers", + "(Will be replaced with `prepare-for-snapshot-backup`) pause gc, schedulers and importing until the program exits.")) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "prepare-for-snapshot-backup", + "pause gc, schedulers and importing until the program exits, for snapshot backup.")) return cmd } -func newPauseGcCommand() *cobra.Command { +func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command { cmd := &cobra.Command{ - Use: "pause-gc-and-schedulers", - Short: "pause gc and schedulers to the ts until the program exits.", + Use: use, + Short: short, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} @@ -41,9 +46,9 @@ func newPauseGcCommand() *cobra.Command { return err } ctx := GetDefaultContext() - return operator.PauseGCAndScheduler(ctx, &cfg) + return operator.AdaptEnvForSnapshotBackup(ctx, &cfg) }, } - operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + operator.DefineFlagsForPrepareSnapBackup(cmd.Flags()) return cmd } diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index bf9478a2a3e9c..1c18f0b45fe6e 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -111,4 +111,6 @@ var ( ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed")) // ErrKVIngestFailed indicates a generic, retryable ingest error. ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed")) + + ErrPossibleInconsistency = errors.Normalize("the cluster state might be inconsistent", errors.RFCCodeText("BR:KV:ErrPossibleInconsistency")) ) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index e84fb66188e49..e459cfcb60bff 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" @@ -144,6 +145,16 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if e != nil { return errors.Trace(err) } + denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager) + _, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL) + if err != nil { + return errors.Annotate(err, "lightning from running") + } + go func() { + if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil { + log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err)) + } + }() defer func() { if ctx.Err() != nil { log.Warn("context canceled, doing clean work with background context") @@ -155,6 +166,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if restoreE := restoreFunc(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL)) + } + if err := denyLightning.ConsistentWithPrev(res); err != nil { + log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err)) + } }() } diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index a291d68df5b12..5ce85cbd1313f 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -13,8 +13,10 @@ go_library( "//br/pkg/pdutil", "//br/pkg/task", "//br/pkg/utils", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 95c922b1c19cf..909d18911c8d0 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -6,8 +6,10 @@ import ( "context" "crypto/tls" "strings" + "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -15,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/keepalive" ) func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { @@ -34,48 +37,129 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } -func cleanUpWith(f func(ctx context.Context)) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) { + _ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) +} + +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL) defer cancel() - f(ctx) + return f(ctx) } -// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. +type AdaptEnvForSnapshotBackupContext struct { + context.Context + + pdMgr *pdutil.PdController + kvMgr *utils.StoreManager + cfg PauseGcConfig + + rdGrp sync.WaitGroup + runGrp *errgroup.Group +} + +func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) { + logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...) + cx.rdGrp.Done() +} + +func hintAllReady() { + // Hacking: some version of operators using the follow two logs to check whether we are ready... + log.Info("Schedulers are paused.") + log.Info("GC is paused.") + log.Info("All ready.") +} + +// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. -func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error { +func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { - return err + return errors.Annotate(err, "failed to dial PD") } - + var tconf *tls.Config + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return errors.Annotate(err, "invalid tls config") + } + } + kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{ + Time: cfg.Config.GRPCKeepaliveTime, + Timeout: cfg.Config.GRPCKeepaliveTimeout, + }, tconf) eg, ectx := errgroup.WithContext(ctx) + cx := &AdaptEnvForSnapshotBackupContext{ + Context: logutil.ContextWithField(ectx, zap.String("tag", "br_operator")), + pdMgr: mgr, + kvMgr: kvMgr, + cfg: *cfg, + rdGrp: sync.WaitGroup{}, + runGrp: eg, + } + cx.rdGrp.Add(3) - eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) }) - eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) }) + eg.Go(func() error { return pauseGCKeeper(cx) }) + eg.Go(func() error { return pauseSchedulerKeeper(cx) }) + eg.Go(func() error { return pauseImporting(cx) }) + go func() { + cx.rdGrp.Wait() + hintAllReady() + }() return eg.Wait() } -func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error { +func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { + denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr) + if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil { + return errors.Trace(err) + } + cx.ReadyL("pause_lightning") + cx.runGrp.Go(func() error { + err := denyLightning.Keeper(cx, cx.cfg.TTL) + if errors.Cause(err) != context.Canceled { + logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err)) + } + return cx.cleanUpWithErr(func(ctx context.Context) error { + for { + if ctx.Err() != nil { + return errors.Annotate(ctx.Err(), "cleaning up timed out") + } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err)) + // Retry for 10 times. + time.Sleep(cx.cfg.TTL / 10) + continue + } + return denyLightning.ConsistentWithPrev(res) + } + }) + }) + return nil +} + +func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ ID: utils.MakeSafePointID(), - TTL: int64(cfg.TTL.Seconds()), - BackupTS: cfg.SafePoint, + TTL: int64(ctx.cfg.TTL.Seconds()), + BackupTS: ctx.cfg.SafePoint, } if sp.BackupTS == 0 { - rts, err := ctl.GetMinResolvedTS(ctx) + rts, err := ctx.pdMgr.GetMinResolvedTS(ctx) if err != nil { return err } - log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) sp.BackupTS = rts } - err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp) + err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp) if err != nil { return err } - log.Info("GC is paused.", zap.Object("safepoint", sp)) + ctx.ReadyL("pause_gc", zap.Object("safepoint", sp)) // Note: in fact we can directly return here. // But the name `keeper` implies once the function exits, // the GC should be resume, so let's block here. @@ -83,10 +167,10 @@ func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdContro return nil } -func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { - undo, err := ctl.RemoveAllPDSchedulers(ctx) +func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { + undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx) if undo != nil { - defer cleanUpWith(func(ctx context.Context) { + defer ctx.cleanUpWith(func(ctx context.Context) { if err := undo(ctx); err != nil { log.Warn("failed to restore pd scheduler.", logutil.ShortError(err)) } @@ -95,7 +179,7 @@ func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { if err != nil { return err } - log.Info("Schedulers are paused.") + ctx.ReadyL("pause_scheduler") // Wait until the context canceled. // So we can properly do the clean up work. <-ctx.Done() diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index eb7e12a49af56..998fdc64d961e 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -16,7 +16,7 @@ type PauseGcConfig struct { TTL time.Duration `json:"ttl" yaml:"ttl"` } -func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { +func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) { _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") } diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 3ca5b6b83554a..ad17b725129b7 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "schema.go", "sensitive.go", "store_manager.go", + "suspend_importing.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", @@ -38,6 +39,7 @@ go_library( "//parser/types", "//sessionctx", "//util", + "//util/engine", "//util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", "@com_github_cznic_mathutil//:mathutil", @@ -46,6 +48,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", @@ -87,10 +90,11 @@ go_test( "safe_point_test.go", "schema_test.go", "sensitive_test.go", + "suspend_importing_test.go", ], embed = [":utils"], flaky = True, - shard_count = 29, + shard_count = 33, deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -111,13 +115,17 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", "@org_uber_go_goleak//:goleak", "@org_uber_go_multierr//:multierr", ], diff --git a/br/pkg/utils/store_manager.go b/br/pkg/utils/store_manager.go index 8a89e49022806..430d1394b0037 100644 --- a/br/pkg/utils/store_manager.go +++ b/br/pkg/utils/store_manager.go @@ -148,6 +148,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) if addr == "" { addr = store.GetAddress() } + log.Info("StoreManager: dialing to store.", zap.String("address", addr), zap.Uint64("store-id", storeID)) conn, err := grpc.DialContext( ctx, addr, diff --git a/br/pkg/utils/suspend_importing.go b/br/pkg/utils/suspend_importing.go new file mode 100644 index 0000000000000..c2df70229c525 --- /dev/null +++ b/br/pkg/utils/suspend_importing.go @@ -0,0 +1,144 @@ +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/util/engine" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + DenyLightningUpdateFrequency = 5 +) + +func (mgr *StoreManager) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + return mgr.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) +} + +func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) { + var cli import_sstpb.ImportSSTClient + err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) { + cli = import_sstpb.NewImportSSTClient(cc) + }) + if err != nil { + return nil, err + } + return cli, nil +} + +type SuspendImportingEnv interface { + GetAllStores(ctx context.Context) ([]*metapb.Store, error) + GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) +} + +type SuspendImportingClient interface { + // Temporarily disable ingest / download / write for data listeners don't support catching import data. + SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) +} + +type SuspendImporting struct { + env SuspendImportingEnv + name string +} + +func NewSuspendImporting(name string, env SuspendImportingEnv) *SuspendImporting { + return &SuspendImporting{ + env: env, + name: name, + } +} + +// DenyAllStores tries to deny all current stores' lightning execution for the period of time. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *SuspendImporting) DenyAllStores(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: true, + DurationInSecs: uint64(dur.Seconds()), + Caller: d.name, + } + }) +} + +func (d *SuspendImporting) AllowAllStores(ctx context.Context) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: false, + Caller: d.name, + } + }) +} + +// forEachStores send the request to each stores reachable. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *SuspendImporting) forEachStores(ctx context.Context, makeReq func() *import_sstpb.SuspendImportRPCRequest) (map[uint64]bool, error) { + stores, err := d.env.GetAllStores(ctx) + if err != nil { + return nil, errors.Annotate(err, "failed to get all stores") + } + + result := map[uint64]bool{} + for _, store := range stores { + logutil.CL(ctx).Info("Handling store.", zap.Stringer("store", store)) + if engine.IsTiFlash(store) { + logutil.CL(ctx).Info("Store is tiflash, skipping.", zap.Stringer("store", store)) + continue + } + cli, err := d.env.GetDenyLightningClient(ctx, store.Id) + if err != nil { + return nil, errors.Annotatef(err, "failed to get client for store %d", store.Id) + } + req := makeReq() + resp, err := cli.SuspendImportRPC(ctx, req) + if err != nil { + return nil, errors.Annotatef(err, "failed to deny lightning rpc for store %d", store.Id) + } + result[store.Id] = resp.AlreadySuspended + } + return result, nil +} + +// HasKeptDenying checks whether a result returned by `DenyAllStores` is able to keep the consistency with last request. +// i.e. Whether the store has some holes of pausing the import requests. +func (d *SuspendImporting) ConsistentWithPrev(result map[uint64]bool) error { + for storeId, denied := range result { + if !denied { + return errors.Annotatef(berrors.ErrPossibleInconsistency, "failed to keep importing to store %d being denied, the state might be inconsistency", storeId) + } + } + return nil +} + +func (d *SuspendImporting) Keeper(ctx context.Context, ttl time.Duration) error { + lastSuccess := time.Now() + t := time.NewTicker(ttl / DenyLightningUpdateFrequency) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + res, err := d.DenyAllStores(ctx, ttl) + if err != nil { + if time.Since(lastSuccess) < ttl { + logutil.CL(ctx).Warn("Failed to send deny one of the stores.", logutil.ShortError(err)) + continue + } + return err + } + if err := d.ConsistentWithPrev(res); err != nil { + return err + } + + lastSuccess = time.Now() + } + } +} diff --git a/br/pkg/utils/suspend_importing_test.go b/br/pkg/utils/suspend_importing_test.go new file mode 100644 index 0000000000000..8ee04af072048 --- /dev/null +++ b/br/pkg/utils/suspend_importing_test.go @@ -0,0 +1,209 @@ +package utils_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type ImportTargetStore struct { + mu sync.Mutex + Id uint64 + LastSuccessDenyCall time.Time + SuspendImportFor time.Duration + SuspendedImport bool + + ErrGen func() error +} + +type ImportTargetStores struct { + mu sync.Mutex + items map[uint64]*ImportTargetStore +} + +func initWithIDs(ids []int) *ImportTargetStores { + ss := &ImportTargetStores{ + items: map[uint64]*ImportTargetStore{}, + } + for _, id := range ids { + store := new(ImportTargetStore) + store.Id = uint64(id) + ss.items[uint64(id)] = store + } + return ss +} + +func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + s.mu.Lock() + defer s.mu.Unlock() + + stores := make([]*metapb.Store, 0, len(s.items)) + for _, store := range s.items { + stores = append(stores, &metapb.Store{Id: store.Id}) + } + return stores, nil +} + +func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendImportingClient, error) { + s.mu.Lock() + defer s.mu.Unlock() + + store, ok := s.items[storeID] + if !ok { + return nil, errors.Trace(fmt.Errorf("store %d not found", storeID)) + } + + return store, nil +} + +// Temporarily disable ingest / download / write for data listeners don't support catching import data. +func (s *ImportTargetStore) SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.ErrGen != nil { + if err := s.ErrGen(); err != nil { + return nil, s.ErrGen() + } + } + + suspended := s.SuspendedImport + if in.ShouldSuspendImports { + s.SuspendedImport = true + s.SuspendImportFor = time.Duration(in.DurationInSecs) * time.Second + s.LastSuccessDenyCall = time.Now() + } else { + s.SuspendedImport = false + } + return &import_sstpb.SuspendImportRPCResponse{ + AlreadySuspended: suspended, + }, nil +} + +func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, store := range s.items { + func() { + store.mu.Lock() + defer store.mu.Unlock() + + require.True(t, store.SuspendedImport, "ID = %d", store.Id) + require.Less(t, time.Since(store.LastSuccessDenyCall), store.SuspendImportFor, "ID = %d", store.Id) + }() + } +} + +func TestBasic(t *testing.T) { + req := require.New(t) + + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + + ctx := context.Background() + res, err := deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.Error(deny.ConsistentWithPrev(res)) + for id, inner := range ss.items { + req.True(inner.SuspendedImport, "at %d", id) + req.Equal(inner.SuspendImportFor, 10*time.Second, "at %d", id) + } + + res, err = deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) + + res, err = deny.AllowAllStores(ctx) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) +} + +func TestKeeperError(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := time.Second + + now := time.Now() + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + if time.Since(now) > 600*time.Millisecond { + return nil + } + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + cx, cancel := context.WithCancel(ctx) + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + time.Sleep(ttl) + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) + req.Positive(triggeredErr) +} + +func TestKeeperErrorExit(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := time.Second + + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(ctx, ttl) }) + time.Sleep(ttl) + req.Error(wg.Wait()) + req.Positive(triggeredErr) +} + +func TestKeeperCalled(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewSuspendImporting(t.Name(), ss) + ttl := 1 * time.Second + + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + cx, cancel := context.WithCancel(ctx) + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + for i := 0; i < 20; i++ { + ss.assertAllStoresDenied(t) + time.Sleep(ttl / 10) + } + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) +} diff --git a/errors.toml b/errors.toml index 3955ab97ec4a0..46ac796446aa1 100644 --- a/errors.toml +++ b/errors.toml @@ -146,6 +146,11 @@ error = ''' storage is not tikv ''' +["BR:KV:ErrPossibleInconsistency"] +error = ''' +the cluster state might be inconsistent +''' + ["BR:PD:ErrPDBatchScanRegion"] error = ''' batch scan region diff --git a/go.mod b/go.mod index dc96ae38a1e0c..be9d75b38ac50 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.8-0.20231012084106-14934ce6283e - github.com/tikv/pd/client v0.0.0-20230519121736-d15a686a670e + github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6 github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible @@ -110,7 +110,7 @@ require ( go.etcd.io/etcd/tests/v3 v3.5.2 go.opencensus.io v0.24.0 go.uber.org/atomic v1.11.0 - go.uber.org/automaxprocs v1.5.3 + go.uber.org/automaxprocs v1.4.0 go.uber.org/goleak v1.2.1 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 diff --git a/go.sum b/go.sum index 26d12ea8b01b8..026b76f48c8e2 100644 --- a/go.sum +++ b/go.sum @@ -816,7 +816,6 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= @@ -966,8 +965,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tikv/client-go/v2 v2.0.8-0.20231012084106-14934ce6283e h1:rJGhMdMHG8EjdPhpNB6UejQwuE8rtSqBxBVvbiQHyrY= github.com/tikv/client-go/v2 v2.0.8-0.20231012084106-14934ce6283e/go.mod h1:tXip3uaG8miN+BWvD1os8aczFM++vGQx2DerDD4zrvQ= -github.com/tikv/pd/client v0.0.0-20230519121736-d15a686a670e h1:dO0YtekJ9IKiO9dXMZz0rNHPBiGCo+c37RUsnshoLwo= -github.com/tikv/pd/client v0.0.0-20230519121736-d15a686a670e/go.mod h1:5vgcvO020ZCdMZkTrRdS/wFZQUab82BSfKE38T61ro0= +github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6 h1:pKIvLrqEmTMw+J8wwdv4r2QTp+cjW9giiOIyaRy9mLM= +github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6/go.mod h1:5vgcvO020ZCdMZkTrRdS/wFZQUab82BSfKE38T61ro0= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= @@ -1076,8 +1075,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= -go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= +go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0= +go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=