Skip to content

Commit

Permalink
Merge branch 'master' into fix40923
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Feb 8, 2023
2 parents a4ebdee + c392b62 commit 43f5837
Show file tree
Hide file tree
Showing 22 changed files with 10,091 additions and 9,889 deletions.
3 changes: 2 additions & 1 deletion br/pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ func TestCreateStorage(t *testing.T) {
require.Equal(t, "TestKey", s3.SseKmsKeyId)

// special character in access keys
s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw`, nil)
s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw&session-token=FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=`, nil)
require.NoError(t, err)
s3 = s.GetS3()
require.NotNil(t, s3)
require.Equal(t, "bucket4", s3.Bucket)
require.Equal(t, "prefix/path", s3.Prefix)
require.Equal(t, "NXN7IPIOSAAKDEEOLMAF", s3.AccessKey)
require.Equal(t, "nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw", s3.SecretAccessKey)
require.Equal(t, "FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=", s3.SessionToken)
require.True(t, s3.ForcePathStyle)

// parse role ARN and external ID
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type S3BackendOptions struct {
ACL string `json:"acl" toml:"acl"`
AccessKey string `json:"access-key" toml:"access-key"`
SecretAccessKey string `json:"secret-access-key" toml:"secret-access-key"`
SessionToken string `json:"session-token" toml:"session-token"`
Provider string `json:"provider" toml:"provider"`
ForcePathStyle bool `json:"force-path-style" toml:"force-path-style"`
UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"`
Expand Down Expand Up @@ -184,6 +185,7 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error {
s3.Acl = options.ACL
s3.AccessKey = options.AccessKey
s3.SecretAccessKey = options.SecretAccessKey
s3.SessionToken = options.SessionToken
s3.ForcePathStyle = options.ForcePathStyle
s3.RoleArn = options.RoleARN
s3.ExternalId = options.ExternalID
Expand Down Expand Up @@ -262,7 +264,7 @@ func NewS3StorageForTest(svc s3iface.S3API, options *backuppb.S3) *S3Storage {
// auto access without ak / sk.
func autoNewCred(qs *backuppb.S3) (cred *credentials.Credentials, err error) {
if qs.AccessKey != "" && qs.SecretAccessKey != "" {
return credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, ""), nil
return credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, qs.SessionToken), nil
}
endpoint := qs.Endpoint
// if endpoint is empty,return no error and run default(aws) follow.
Expand Down Expand Up @@ -330,6 +332,7 @@ func NewS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St
// Clear the credentials if exists so that they will not be sent to TiKV
backend.AccessKey = ""
backend.SecretAccessKey = ""
backend.SessionToken = ""
} else if ses.Config.Credentials != nil {
if qs.AccessKey == "" || qs.SecretAccessKey == "" {
v, cerr := ses.Config.Credentials.Get()
Expand All @@ -338,6 +341,7 @@ func NewS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St
}
backend.AccessKey = v.AccessKeyID
backend.SecretAccessKey = v.SecretAccessKey
backend.SessionToken = v.SessionToken
}
}

Expand Down
13 changes: 12 additions & 1 deletion br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestApplyUpdate(t *testing.T) {
if test.setEnv {
require.NoError(t, os.Setenv("AWS_ACCESS_KEY_ID", "ab"))
require.NoError(t, os.Setenv("AWS_SECRET_ACCESS_KEY", "cd"))
require.NoError(t, os.Setenv("AWS_SESSION_TOKEN", "ef"))
}
u, err := ParseBackend("s3://bucket/prefix/", &BackendOptions{S3: test.options})
require.NoError(t, err)
Expand Down Expand Up @@ -260,11 +261,13 @@ func TestApplyUpdate(t *testing.T) {
Region: "us-west-2",
AccessKey: "ab",
SecretAccessKey: "cd",
SessionToken: "ef",
},
s3: &backuppb.S3{
Region: "us-west-2",
AccessKey: "ab",
SecretAccessKey: "cd",
SessionToken: "ef",
Bucket: "bucket",
Prefix: "prefix",
},
Expand Down Expand Up @@ -354,6 +357,7 @@ func TestS3Storage(t *testing.T) {
Endpoint: s.URL,
AccessKey: "ab",
SecretAccessKey: "cd",
SessionToken: "ef",
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
Expand Down Expand Up @@ -1112,10 +1116,12 @@ func TestWalkDirWithEmptyPrefix(t *testing.T) {
func TestSendCreds(t *testing.T) {
accessKey := "ab"
secretAccessKey := "cd"
sessionToken := "ef"
backendOpt := BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
},
}
backend, err := ParseBackend("s3://bucket/prefix/", &backendOpt)
Expand All @@ -1128,12 +1134,15 @@ func TestSendCreds(t *testing.T) {
sentAccessKey := backend.GetS3().AccessKey
require.Equal(t, accessKey, sentAccessKey)
sentSecretAccessKey := backend.GetS3().SecretAccessKey
require.Equal(t, sentSecretAccessKey, sentSecretAccessKey)
require.Equal(t, secretAccessKey, sentSecretAccessKey)
sentSessionToken := backend.GetS3().SessionToken
require.Equal(t, sessionToken, sentSessionToken)

backendOpt = BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
},
}
backend, err = ParseBackend("s3://bucket/prefix/", &backendOpt)
Expand All @@ -1147,6 +1156,8 @@ func TestSendCreds(t *testing.T) {
require.Equal(t, "", sentAccessKey)
sentSecretAccessKey = backend.GetS3().SecretAccessKey
require.Equal(t, "", sentSecretAccessKey)
sentSessionToken = backend.GetS3().SessionToken
require.Equal(t, "", sentSessionToken)
}

func TestObjectLock(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"@com_github_golang_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down Expand Up @@ -78,6 +79,7 @@ go_test(
"//tablecodec",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
return
case e, ok := <-ch:
if !ok {
log.Info("[log backup advancer] Task watcher exits due to stream ends.")
return
}
log.Info("meet task event", zap.Stringer("event", &e))
log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(ctx, e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
}
log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err))
return
}
}
Expand Down
46 changes: 32 additions & 14 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta

func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) {
result := make([]TaskEvent, 0, len(resp.Events))
if err := resp.Err(); err != nil {
return nil, err
}
for _, event := range resp.Events {
te, err := t.toTaskEvent(ctx, event)
if err != nil {
Expand All @@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err))
ch <- errorEvent(err)
return false
}
Expand All @@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
}
return true
}
collectRemaining := func() {
log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c)))
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
}

go func() {
defer close(ch)
for {
select {
case resp, ok := <-c:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
// drain the remain event from channel.
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
collectRemaining()
ch <- errorEvent(ctx.Err())
return
}
}
}()
Expand Down
44 changes: 42 additions & 2 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"net/url"
"path"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) {
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
Expand All @@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
require.Equal(t, forth.Type, streamhelper.EventDel)
require.Equal(t, forth.Name, taskName2)
cancel()
_, ok := <-ch
require.False(t, ok)
fifth, ok := <-ch
require.True(t, ok)
require.Equal(t, fifth.Type, streamhelper.EventErr)
require.Error(t, fifth.Err, context.Canceled)
item, ok := <-ch
require.False(t, ok, "%v", item)
}

func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
taskName := "close_simple"
taskInfo := simpleTask(taskName, 4)

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
ch := make(chan streamhelper.TaskEvent, 1024)
require.NoError(t, metaCli.Begin(ctx, ch))
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
second := <-ch
require.Equal(t, second.Type, streamhelper.EventDel, "%s", second)
require.Equal(t, second.Name, taskName, "%s", second)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return"))
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel")
// We need to make the channel file some events hence we can simulate the closed channel.
taskName2 := "close_simple2"
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

third := <-ch
require.Equal(t, third.Type, streamhelper.EventErr)
require.Error(t, third.Err, io.EOF)
item, ok := <-ch
require.False(t, ok, "%#v", item)
}

func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,8 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement
placementSettings.FollowerConstraints = stringVal
case ast.PlacementOptionVoterConstraints:
placementSettings.VoterConstraints = stringVal
case ast.PlacementOptionSurvivalPreferences:
placementSettings.SurvivalPreferences = stringVal
default:
return errors.Trace(errors.New("unknown placement policy option"))
}
Expand Down
Loading

0 comments on commit 43f5837

Please sign in to comment.