Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#48025
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Nov 8, 2023
1 parent 5a6c8c8 commit bf7ce3d
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 16 deletions.
12 changes: 12 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ go_test(
],
embed = [":task"],
flaky = True,
<<<<<<< HEAD
=======
shard_count = 21,
>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025))
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand All @@ -101,10 +105,18 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
<<<<<<< HEAD
"//config",
"//parser/model",
"//statistics/handle",
"//tablecodec",
=======
"//pkg/config",
"//pkg/parser/model",
"//pkg/statistics/handle/util",
"//pkg/tablecodec",
"//pkg/util/table-filter",
>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025))
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
<<<<<<< HEAD
=======
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/multierr"
>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025))
"go.uber.org/zap"
)

Expand Down Expand Up @@ -711,6 +716,21 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) {
return oracle.GoTimeToTS(t1), nil
}

func DefaultBackupConfig() BackupConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineBackupFlags(fs)
cfg := BackupConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
if err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
}
return cfg
}

func parseCompressionType(s string) (backuppb.CompressionType, error) {
var ct backuppb.CompressionType
switch s {
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ func HiddenFlagsForStream(flags *pflag.FlagSet) {
storage.HiddenFlagsForStream(flags)
}

func DefaultConfig() Config {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
cfg := Config{}
if err := cfg.ParseFromFlags(fs); err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
}
return cfg
}

// DefineDatabaseFlags defines the required --db flag for `db` subcommand.
func DefineDatabaseFlags(command *cobra.Command) {
command.Flags().String(flagDatabase, "", "database name")
Expand Down
84 changes: 84 additions & 0 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
<<<<<<< HEAD
"github.com/pingcap/tidb/config"
=======
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025))
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -157,3 +164,80 @@ func TestCheckCipherKey(t *testing.T) {
}
}
}

func must[T any](t T, err error) T {
if err != nil {
panic(err)
}
return t
}

func expectedDefaultConfig() Config {
return Config{
BackendOptions: storage.BackendOptions{S3: storage.S3BackendOptions{ForcePathStyle: true}},
PD: []string{"127.0.0.1:2379"},
ChecksumConcurrency: 4,
Checksum: true,
SendCreds: true,
CheckRequirements: true,
FilterStr: []string(nil),
TableFilter: filter.CaseInsensitive(must(filter.Parse([]string{"*.*"}))),
Schemas: map[string]struct{}{},
Tables: map[string]struct{}{},
SwitchModeInterval: 300000000000,
GRPCKeepaliveTime: 10000000000,
GRPCKeepaliveTimeout: 3000000000,
CipherInfo: backup.CipherInfo{CipherType: 1},
MetadataDownloadBatchSize: 0x80,
}
}

func expectedDefaultBackupConfig() BackupConfig {
return BackupConfig{
Config: expectedDefaultConfig(),
GCTTL: utils.DefaultBRGCSafePointTTL,
CompressionConfig: CompressionConfig{
CompressionType: backup.CompressionType_ZSTD,
},
IgnoreStats: true,
UseBackupMetaV2: true,
UseCheckpoint: true,
}
}

func expectedDefaultRestoreConfig() RestoreConfig {
defaultConfig := expectedDefaultConfig()
defaultConfig.Concurrency = defaultRestoreConcurrency
return RestoreConfig{
Config: defaultConfig,
RestoreCommonConfig: RestoreCommonConfig{Online: false,
MergeSmallRegionSizeBytes: 0x6000000,
MergeSmallRegionKeyCount: 0xea600,
WithSysTable: false,
ResetSysUsers: []string{"cloud_admin", "root"}},
NoSchema: false,
PDConcurrency: 0x1,
BatchFlushInterval: 16000000000,
DdlBatchSize: 0x80,
WithPlacementPolicy: "STRICT",
UseCheckpoint: true,
}
}

func TestDefault(t *testing.T) {
def := DefaultConfig()
defaultConfig := expectedDefaultConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultBackup(t *testing.T) {
def := DefaultBackupConfig()
defaultConfig := expectedDefaultBackupConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultRestore(t *testing.T) {
def := DefaultRestoreConfig()
defaultConfig := expectedDefaultRestoreConfig()
require.Equal(t, defaultConfig, def)
}
42 changes: 42 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,48 @@ func IsStreamRestore(cmdName string) bool {
return cmdName == PointRestoreCmd
}

<<<<<<< HEAD
=======
func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF func(context.Context) error, err error) {
register := utils.NewTaskRegister(etcdCLI, utils.RegisterRestore, fmt.Sprintf("restore-%s", uuid.New()))
err = register.RegisterTask(ctx)
return register.Close, errors.Trace(err)
}

func removeCheckpointDataForSnapshotRestore(ctx context.Context, storageName string, taskName string, config *Config) error {
_, s, err := GetStorage(ctx, storageName, config)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(checkpoint.RemoveCheckpointDataForRestore(ctx, s, taskName))
}

func removeCheckpointDataForLogRestore(ctx context.Context, storageName string, taskName string, clusterID uint64, config *Config) error {
_, s, err := GetStorage(ctx, storageName, config)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID))
}

func DefaultRestoreConfig() RestoreConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineRestoreFlags(fs)
cfg := RestoreConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.RestoreCommonConfig.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
if err != nil {
log.Panic("infallible failed.", zap.Error(err))
}

return cfg
}

>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025))
// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
if err := checkTaskExists(c, cfg); err != nil {
Expand Down
30 changes: 14 additions & 16 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
}

tidbCfg := config.GetGlobalConfig()
cfg := task.Config{
TLS: task.TLSConfig{
CA: tidbCfg.Security.ClusterSSLCA,
Cert: tidbCfg.Security.ClusterSSLCert,
Key: tidbCfg.Security.ClusterSSLKey,
},
PD: strings.Split(tidbCfg.Path, ","),
Concurrency: 4,
Checksum: true,
SendCreds: true,
LogProgress: true,
CipherInfo: backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
tlsCfg := task.TLSConfig{
CA: tidbCfg.Security.ClusterSSLCA,
Cert: tidbCfg.Security.ClusterSSLCert,
Key: tidbCfg.Security.ClusterSSLKey,
}
pds := strings.Split(tidbCfg.Path, ",")
cfg := task.DefaultConfig()
cfg.PD = pds
cfg.TLS = tlsCfg

storageURL, err := storage.ParseRawURL(s.Storage)
if err != nil {
Expand Down Expand Up @@ -301,7 +295,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)

switch s.Kind {
case ast.BRIEKindBackup:
e.backupCfg = &task.BackupConfig{Config: cfg}
bcfg := task.DefaultBackupConfig()
bcfg.Config = cfg
e.backupCfg = &bcfg

for _, opt := range s.Options {
switch opt.Tp {
Expand Down Expand Up @@ -329,7 +325,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
}

case ast.BRIEKindRestore:
e.restoreCfg = &task.RestoreConfig{Config: cfg}
rcfg := task.DefaultRestoreConfig()
rcfg.Config = cfg
e.restoreCfg = &rcfg
for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionOnline:
Expand Down

0 comments on commit bf7ce3d

Please sign in to comment.