Skip to content

Commit

Permalink
br: add more options for br sql client (#53039)
Browse files Browse the repository at this point in the history
close #53040
  • Loading branch information
RidRisR authored May 28, 2024
1 parent bdb0545 commit 04cb62d
Show file tree
Hide file tree
Showing 11 changed files with 11,091 additions and 10,756 deletions.
6 changes: 3 additions & 3 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,12 @@ func parseCipherType(t string) (encryptionpb.EncryptionMethod, error) {
func checkCipherKey(cipherKey, cipherKeyFile string) error {
if (len(cipherKey) == 0) == (len(cipherKeyFile) == 0) {
return errors.Annotate(berrors.ErrInvalidArgument,
"exactly one of --crypter.key or --crypter.key-file should be provided")
"exactly one of cipher key or keyfile path should be provided")
}
return nil
}

func getCipherKeyContent(cipherKey, cipherKeyFile string) ([]byte, error) {
func GetCipherKeyContent(cipherKey, cipherKeyFile string) ([]byte, error) {
if err := checkCipherKey(cipherKey, cipherKeyFile); err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func (cfg *Config) parseCipherInfo(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

cfg.CipherInfo.CipherKey, err = getCipherKeyContent(key, keyFilePath)
cfg.CipherInfo.CipherKey, err = GetCipherKeyContent(key, keyFilePath)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_fn//:fn",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/diagnosticspb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
Expand Down
50 changes: 49 additions & 1 deletion pkg/executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
default:
}

failpoint.Inject("modifyStore", func(v failpoint.Value) {
tidbCfg.Store = v.(string)
})
if tidbCfg.Store != "tikv" {
b.err = errors.Errorf("%s requires tikv store, not %s", s.Kind, tidbCfg.Store)
return nil
Expand All @@ -330,6 +333,28 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
cfg.Checksum = opt.UintValue != 0
case ast.BRIEOptionSendCreds:
cfg.SendCreds = opt.UintValue != 0
case ast.BRIEOptionChecksumConcurrency:
cfg.ChecksumConcurrency = uint(opt.UintValue)
case ast.BRIEOptionEncryptionKeyFile:
cfg.CipherInfo.CipherKey, err = task.GetCipherKeyContent("", opt.StrValue)
if err != nil {
b.err = err
return nil
}
case ast.BRIEOptionEncryptionMethod:
switch opt.StrValue {
case "aes128-ctr":
cfg.CipherInfo.CipherType = encryptionpb.EncryptionMethod_AES128_CTR
case "aes192-ctr":
cfg.CipherInfo.CipherType = encryptionpb.EncryptionMethod_AES192_CTR
case "aes256-ctr":
cfg.CipherInfo.CipherType = encryptionpb.EncryptionMethod_AES256_CTR
case "plaintext":
cfg.CipherInfo.CipherType = encryptionpb.EncryptionMethod_PLAINTEXT
default:
b.err = errors.Errorf("unsupported encryption method: %s", opt.StrValue)
return nil
}
}
}

Expand Down Expand Up @@ -383,6 +408,22 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
return nil
}
e.backupCfg.BackupTS = tso
case ast.BRIEOptionCompression:
switch opt.StrValue {
case "zstd":
e.backupCfg.CompressionConfig.CompressionType = backuppb.CompressionType_ZSTD
case "snappy":
e.backupCfg.CompressionConfig.CompressionType = backuppb.CompressionType_SNAPPY
case "lz4":
e.backupCfg.CompressionConfig.CompressionType = backuppb.CompressionType_LZ4
default:
b.err = errors.Errorf("unsupported compression type: %s", opt.StrValue)
return nil
}
case ast.BRIEOptionCompressionLevel:
e.backupCfg.CompressionConfig.CompressionLevel = int32(opt.UintValue)
case ast.BRIEOptionIgnoreStats:
e.backupCfg.IgnoreStats = opt.UintValue != 0
}
}

Expand All @@ -391,8 +432,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
rcfg.Config = cfg
e.restoreCfg = &rcfg
for _, opt := range s.Options {
if opt.Tp == ast.BRIEOptionOnline {
switch opt.Tp {
case ast.BRIEOptionOnline:
e.restoreCfg.Online = opt.UintValue != 0
case ast.BRIEOptionWaitTiflashReady:
e.restoreCfg.WaitTiflashReady = opt.UintValue != 0
case ast.BRIEOptionWithSysTable:
e.restoreCfg.WithSysTable = opt.UintValue != 0
case ast.BRIEOptionLoadStats:
e.restoreCfg.LoadStats = opt.UintValue != 0
}
}

Expand Down
83 changes: 83 additions & 0 deletions pkg/executor/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package executor
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser"
Expand Down Expand Up @@ -140,3 +144,82 @@ func TestFetchShowBRIE(t *testing.T) {
globalBRIEQueue.clearTask(e.Ctx().GetSessionVars().StmtCtx)
require.Equal(t, info2Res, fetchShowBRIEResult(t, e, brieColTypes))
}

func TestBRIEBuilderOPtions(t *testing.T) {
sctx := mock.NewContext()
sctx.GetSessionVars().User = &auth.UserIdentity{Username: "test"}
is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()})
ResetGlobalBRIEQueueForTest()
builder := NewMockExecutorBuilderForTest(sctx, is)
ctx := context.Background()
p := parser.New()
p.SetParserConfig(parser.ParserConfig{EnableWindowFunction: true, EnableStrictDoubleTypeCheck: true})
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/modifyStore", `return("tikv")`)
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/modifyStore")
err := os.WriteFile("/tmp/keyfile", []byte(strings.Repeat("A", 128)), 0644)

require.NoError(t, err)
stmt, err := p.ParseOneStmt("BACKUP TABLE `a` TO 'noop://' CHECKSUM_CONCURRENCY = 4 IGNORE_STATS = 1 COMPRESSION_LEVEL = 4 COMPRESSION_TYPE = 'lz4' ENCRYPTION_METHOD = 'aes256-ctr' ENCRYPTION_KEYFILE = '/tmp/keyfile'", "", "")
require.NoError(t, err)
plan, err := core.BuildLogicalPlanForTest(ctx, sctx, stmt, infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable(), core.MockView()}))
require.NoError(t, err)
s, ok := stmt.(*ast.BRIEStmt)
require.True(t, ok)
require.True(t, s.Kind == ast.BRIEKindBackup)
for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionChecksumConcurrency:
require.Equal(t, uint64(4), opt.UintValue)
case ast.BRIEOptionCompressionLevel:
require.Equal(t, uint64(4), opt.UintValue)
case ast.BRIEOptionIgnoreStats:
require.Equal(t, uint64(1), opt.UintValue)
case ast.BRIEOptionCompression:
require.Equal(t, "lz4", opt.StrValue)
case ast.BRIEOptionEncryptionMethod:
require.Equal(t, "aes256-ctr", opt.StrValue)
case ast.BRIEOptionEncryptionKeyFile:
require.Equal(t, "/tmp/keyfile", opt.StrValue)
}
}
schema := plan.Schema()
exec := builder.buildBRIE(s, schema)
require.NoError(t, builder.err)
e, ok := exec.(*BRIEExec)
require.True(t, ok)
require.Equal(t, uint(4), e.backupCfg.ChecksumConcurrency)
require.Equal(t, int32(4), e.backupCfg.CompressionLevel)
require.Equal(t, true, e.backupCfg.IgnoreStats)
require.Equal(t, backuppb.CompressionType_LZ4, e.backupCfg.CompressionConfig.CompressionType)
require.Equal(t, encryptionpb.EncryptionMethod_AES256_CTR, e.backupCfg.CipherInfo.CipherType)
require.Greater(t, len(e.backupCfg.CipherInfo.CipherKey), 0)

stmt, err = p.ParseOneStmt("RESTORE TABLE `a` FROM 'noop://' CHECKSUM_CONCURRENCY = 4 WAIT_TIFLASH_READY = 1 WITH_SYS_TABLE = 1 LOAD_STATS = 1", "", "")
require.NoError(t, err)
plan, err = core.BuildLogicalPlanForTest(ctx, sctx, stmt, infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable(), core.MockView()}))
require.NoError(t, err)
s, ok = stmt.(*ast.BRIEStmt)
require.True(t, ok)
require.True(t, s.Kind == ast.BRIEKindRestore)
for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionChecksumConcurrency:
require.Equal(t, uint64(4), opt.UintValue)
case ast.BRIEOptionWaitTiflashReady:
require.Equal(t, uint64(1), opt.UintValue)
case ast.BRIEOptionWithSysTable:
require.Equal(t, uint64(1), opt.UintValue)
case ast.BRIEOptionLoadStats:
require.Equal(t, uint64(1), opt.UintValue)
}
}
schema = plan.Schema()
exec = builder.buildBRIE(s, schema)
require.NoError(t, builder.err)
e, ok = exec.(*BRIEExec)
require.True(t, ok)
require.Equal(t, uint(4), e.restoreCfg.ChecksumConcurrency)
require.True(t, e.restoreCfg.WaitTiflashReady)
require.True(t, e.restoreCfg.WithSysTable)
require.True(t, e.restoreCfg.LoadStats)
}
29 changes: 28 additions & 1 deletion pkg/parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3278,17 +3278,26 @@ const (
BRIEOptionCheckpoint
BRIEOptionStartTS
BRIEOptionUntilTS
BRIEOptionChecksumConcurrency
BRIEOptionEncryptionMethod
BRIEOptionEncryptionKeyFile
// backup options
BRIEOptionBackupTimeAgo
BRIEOptionBackupTS
BRIEOptionBackupTSO
BRIEOptionLastBackupTS
BRIEOptionLastBackupTSO
BRIEOptionGCTTL
BRIEOptionCompressionLevel
BRIEOptionCompression
BRIEOptionIgnoreStats
BRIEOptionLoadStats
// restore options
BRIEOptionOnline
BRIEOptionFullBackupStorage
BRIEOptionRestoredTS
BRIEOptionWaitTiflashReady
BRIEOptionWithSysTable
// import options
BRIEOptionAnalyze
BRIEOptionBackend
Expand Down Expand Up @@ -3408,6 +3417,24 @@ func (kind BRIEOptionType) String() string {
return "UNTIL_TS"
case BRIEOptionGCTTL:
return "GC_TTL"
case BRIEOptionWaitTiflashReady:
return "WAIT_TIFLASH_READY"
case BRIEOptionWithSysTable:
return "WITH_SYS_TABLE"
case BRIEOptionIgnoreStats:
return "IGNORE_STATS"
case BRIEOptionLoadStats:
return "LOAD_STATS"
case BRIEOptionChecksumConcurrency:
return "CHECKSUM_CONCURRENCY"
case BRIEOptionCompressionLevel:
return "COMPRESSION_LEVEL"
case BRIEOptionCompression:
return "COMPRESSION_TYPE"
case BRIEOptionEncryptionMethod:
return "ENCRYPTION_METHOD"
case BRIEOptionEncryptionKeyFile:
return "ENCRYPTION_KEY_FILE"
default:
return ""
}
Expand Down Expand Up @@ -3436,7 +3463,7 @@ func (opt *BRIEOption) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord(opt.Tp.String())
ctx.WritePlain(" = ")
switch opt.Tp {
case BRIEOptionBackupTS, BRIEOptionLastBackupTS, BRIEOptionBackend, BRIEOptionOnDuplicate, BRIEOptionTiKVImporter, BRIEOptionCSVDelimiter, BRIEOptionCSVNull, BRIEOptionCSVSeparator, BRIEOptionFullBackupStorage, BRIEOptionRestoredTS, BRIEOptionStartTS, BRIEOptionUntilTS, BRIEOptionGCTTL:
case BRIEOptionBackupTS, BRIEOptionLastBackupTS, BRIEOptionBackend, BRIEOptionOnDuplicate, BRIEOptionTiKVImporter, BRIEOptionCSVDelimiter, BRIEOptionCSVNull, BRIEOptionCSVSeparator, BRIEOptionFullBackupStorage, BRIEOptionRestoredTS, BRIEOptionStartTS, BRIEOptionUntilTS, BRIEOptionGCTTL, BRIEOptionCompression, BRIEOptionEncryptionMethod, BRIEOptionEncryptionKeyFile:
ctx.WriteString(opt.StrValue)
case BRIEOptionBackupTimeAgo:
ctx.WritePlainf("%d ", opt.UintValue/1000)
Expand Down
9 changes: 9 additions & 0 deletions pkg/parser/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ var Keywords = []KeywordsType{
{"CHARSET", false, "unreserved"},
{"CHECKPOINT", false, "unreserved"},
{"CHECKSUM", false, "unreserved"},
{"CHECKSUM_CONCURRENCY", false, "unreserved"},
{"CIPHER", false, "unreserved"},
{"CLEANUP", false, "unreserved"},
{"CLIENT", false, "unreserved"},
Expand All @@ -317,6 +318,8 @@ var Keywords = []KeywordsType{
{"COMPACT", false, "unreserved"},
{"COMPRESSED", false, "unreserved"},
{"COMPRESSION", false, "unreserved"},
{"COMPRESSION_LEVEL", false, "unreserved"},
{"COMPRESSION_TYPE", false, "unreserved"},
{"CONCURRENCY", false, "unreserved"},
{"CONFIG", false, "unreserved"},
{"CONNECTION", false, "unreserved"},
Expand Down Expand Up @@ -353,6 +356,8 @@ var Keywords = []KeywordsType{
{"ENABLE", false, "unreserved"},
{"ENABLED", false, "unreserved"},
{"ENCRYPTION", false, "unreserved"},
{"ENCRYPTION_KEYFILE", false, "unreserved"},
{"ENCRYPTION_METHOD", false, "unreserved"},
{"END", false, "unreserved"},
{"ENFORCED", false, "unreserved"},
{"ENGINE", false, "unreserved"},
Expand Down Expand Up @@ -394,6 +399,7 @@ var Keywords = []KeywordsType{
{"HOUR", false, "unreserved"},
{"HYPO", false, "unreserved"},
{"IDENTIFIED", false, "unreserved"},
{"IGNORE_STATS", false, "unreserved"},
{"IMPORT", false, "unreserved"},
{"IMPORTS", false, "unreserved"},
{"INCREMENT", false, "unreserved"},
Expand All @@ -417,6 +423,7 @@ var Keywords = []KeywordsType{
{"LESS", false, "unreserved"},
{"LEVEL", false, "unreserved"},
{"LIST", false, "unreserved"},
{"LOAD_STATS", false, "unreserved"},
{"LOCAL", false, "unreserved"},
{"LOCATION", false, "unreserved"},
{"LOCKED", false, "unreserved"},
Expand Down Expand Up @@ -619,10 +626,12 @@ var Keywords = []KeywordsType{
{"VIEW", false, "unreserved"},
{"VISIBLE", false, "unreserved"},
{"WAIT", false, "unreserved"},
{"WAIT_TIFLASH_READY", false, "unreserved"},
{"WARNINGS", false, "unreserved"},
{"WEEK", false, "unreserved"},
{"WEIGHT_STRING", false, "unreserved"},
{"WITHOUT", false, "unreserved"},
{"WITH_SYS_TABLE", false, "unreserved"},
{"WORKLOAD", false, "unreserved"},
{"X509", false, "unreserved"},
{"YEAR", false, "unreserved"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/keywords_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestKeywords(t *testing.T) {
}

func TestKeywordsLength(t *testing.T) {
require.Equal(t, 644, len(parser.Keywords))
require.Equal(t, 653, len(parser.Keywords))

reservedNr := 0
for _, kw := range parser.Keywords {
Expand Down
9 changes: 9 additions & 0 deletions pkg/parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ var tokenMap = map[string]int{
"CSV_NULL": csvNull,
"CSV_SEPARATOR": csvSeparator,
"CSV_TRIM_LAST_SEPARATORS": csvTrimLastSeparators,
"WAIT_TIFLASH_READY": waitTiflashReady,
"WITH_SYS_TABLE": withSysTable,
"IGNORE_STATS": ignoreStats,
"LOAD_STATS": loadStats,
"CHECKSUM_CONCURRENCY": checksumConcurrency,
"COMPRESSION_LEVEL": compressionLevel,
"COMPRESSION_TYPE": compressionType,
"ENCRYPTION_METHOD": encryptionMethod,
"ENCRYPTION_KEYFILE": encryptionKeyFile,
"CURDATE": curDate,
"CURRENT_DATE": currentDate,
"CURRENT_ROLE": currentRole,
Expand Down
Loading

0 comments on commit 04cb62d

Please sign in to comment.