Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import into: precheck and register to pd #44313

Merged
merged 8 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,12 @@ br_compatibility_test:
mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
1 change: 1 addition & 0 deletions br/pkg/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"importer.go",
"mock_cluster.go",
"s3iface.go",
"task_register.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/mock",
visibility = ["//visibility:public"],
Expand Down
77 changes: 77 additions & 0 deletions br/pkg/mock/task_register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 29,
shard_count = 30,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
77 changes: 64 additions & 13 deletions br/pkg/utils/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RegisterTaskType int
const (
RegisterRestore RegisterTaskType = iota
RegisterLightning
RegisterImportInto
)

func (tp RegisterTaskType) String() string {
Expand All @@ -42,20 +43,40 @@ func (tp RegisterTaskType) String() string {
return "restore"
case RegisterLightning:
return "lightning"
case RegisterImportInto:
return "import-into"
}
return "default"
}

// The key format should be {RegisterImportTaskPrefix}/{RegisterTaskType}/{taskName}
const (
// RegisterImportTaskPrefix is the prefix of the key for task register
// todo: remove "/import" suffix, it's confusing to have a key like "/tidb/brie/import/restore/restore-xxx"
RegisterImportTaskPrefix = "/tidb/brie/import"

RegisterRetryInternal = 10 * time.Second
defaultTaskRegisterTTL = 3 * time.Minute // 3 minutes
)

// TaskRegister can register the task to PD with a lease, and keepalive it in the background
type TaskRegister struct {
// TaskRegister can register the task to PD with a lease.
type TaskRegister interface {
// Close closes the background task if using RegisterTask
// and revoke the lease.
// NOTE: we don't close the etcd client here, call should do it.
Close(ctx context.Context) (err error)
// RegisterTask firstly put its key to PD with a lease,
// and start to keepalive the lease in the background.
// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
RegisterTask(c context.Context) error
// RegisterTaskOnce put its key to PD with a lease if the key does not exist,
// else we refresh the lease.
// you have to call this method periodically to keep the lease alive.
// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
RegisterTaskOnce(ctx context.Context) error
}

type taskRegister struct {
client *clientv3.Client
ttl time.Duration
secondTTL int64
Expand All @@ -68,8 +89,8 @@ type TaskRegister struct {
}

// NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) *TaskRegister {
return &TaskRegister{
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) TaskRegister {
return &taskRegister{
client: client,
ttl: ttl,
secondTTL: int64(ttl / time.Second),
Expand All @@ -80,13 +101,16 @@ func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp Regis
}

// NewTaskRegister build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) *TaskRegister {
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) TaskRegister {
return NewTaskRegisterWithTTL(client, defaultTaskRegisterTTL, tp, taskName)
}

// Close closes the background task of taskRegister
func (tr *TaskRegister) Close(ctx context.Context) (err error) {
tr.cancel()
// Close implements the TaskRegister interface
func (tr *taskRegister) Close(ctx context.Context) (err error) {
// not needed if using RegisterTaskOnce
if tr.cancel != nil {
tr.cancel()
}
tr.wg.Wait()
if tr.curLeaseID != clientv3.NoLease {
_, err = tr.client.Lease.Revoke(ctx, tr.curLeaseID)
Expand All @@ -97,7 +121,7 @@ func (tr *TaskRegister) Close(ctx context.Context) (err error) {
return err
}

func (tr *TaskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse, error) {
func (tr *taskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse, error) {
lease, err := tr.client.Lease.Grant(ctx, tr.secondTTL)
if err != nil {
return nil, err
Expand All @@ -108,9 +132,36 @@ func (tr *TaskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse
return lease, nil
}

// RegisterTask firstly put its key to PD with a lease,
// and start to keepalive the lease in the background.
func (tr *TaskRegister) RegisterTask(c context.Context) error {
// RegisterTaskOnce implements the TaskRegister interface
func (tr *taskRegister) RegisterTaskOnce(ctx context.Context) error {
resp, err := tr.client.Get(ctx, tr.key)
if err != nil {
return errors.Trace(err)
}
if len(resp.Kvs) == 0 {
lease, err2 := tr.grant(ctx)
if err2 != nil {
return errors.Annotatef(err2, "failed grant a lease")
}
tr.curLeaseID = lease.ID
_, err2 = tr.client.KV.Put(ctx, tr.key, "", clientv3.WithLease(lease.ID))
if err2 != nil {
return errors.Trace(err2)
}
} else {
// if the task is run distributively, like IMPORT INTO, we should refresh the lease ID,
// in case the owner changed during the registration, and the new owner create the key.
tr.curLeaseID = clientv3.LeaseID(resp.Kvs[0].Lease)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it want to prevent that, the new leader overwrites the key, but some peers are still keepalive for the overwritten key?

Copy link
Contributor Author

@D3Hunter D3Hunter Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, other tidb might keeps a stale lease id, although this case seems ok to report err

_, err2 := tr.client.Lease.KeepAliveOnce(ctx, tr.curLeaseID)
if err2 != nil {
return errors.Trace(err2)
}
}
return nil
}

// RegisterTask implements the TaskRegister interface
func (tr *taskRegister) RegisterTask(c context.Context) error {
cctx, cancel := context.WithCancel(c)
tr.cancel = cancel
lease, err := tr.grant(cctx)
Expand All @@ -133,7 +184,7 @@ func (tr *TaskRegister) RegisterTask(c context.Context) error {
return nil
}

func (tr *TaskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) {
func (tr *taskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) {
defer tr.wg.Done()
const minTimeLeftThreshold time.Duration = 20 * time.Second
var (
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/utils/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,41 @@ func TestTaskRegister(t *testing.T) {
require.NoError(t, register.Close(ctx))
}

func TestTaskRegisterOnce(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(t)

// should not close the client manually, the test will fail, since Terminate will close it too.
client := testEtcdCluster.RandClient()

ctx := context.Background()
register := NewTaskRegisterWithTTL(client, 10*time.Second, RegisterImportInto, "test")
defer register.Close(ctx)
err := register.RegisterTaskOnce(ctx)
require.NoError(t, err)

// sleep 3 seconds to make sure the lease TTL is smaller.
time.Sleep(3 * time.Second)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Len(t, list.Tasks, 1)
currTask := list.Tasks[0]
t.Log(currTask.MessageToUser())
require.Equal(t, "/tidb/brie/import/import-into/test", currTask.Key)

// then register again, this time will only refresh the lease, and left TTL will be larger.
err = register.RegisterTaskOnce(ctx)
require.NoError(t, err)
list, err = GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Len(t, list.Tasks, 1)
thisTask := list.Tasks[0]
require.Equal(t, currTask.Key, thisTask.Key)
require.Equal(t, currTask.LeaseID, thisTask.LeaseID)
require.Greater(t, thisTask.TTL, currTask.TTL)
}

func TestTaskRegisterFailedGrant(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, GRPCKeepAliveInterval: time.Second, GRPCKeepAliveTimeout: 10 * time.Second})
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//br/pkg/utils",
"//disttask/framework/dispatcher",
"//disttask/framework/handle",
"//disttask/framework/proto",
Expand All @@ -32,6 +33,7 @@ go_library(
"//parser/mysql",
"//sessionctx",
"//table/tables",
"//util/etcd",
"//util/logutil",
"//util/sqlexec",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
Loading