Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: add setting/getting the owner value info (#43353) #43514

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
WatcherClosed = "watcher_closed"
Cancelled = "cancelled"
Deleted = "deleted"
PutValue = "put_value"
SessionDone = "session_done"
CtxDone = "context_done"
WatchOwnerCounter *prometheus.CounterVec
Expand Down
5 changes: 4 additions & 1 deletion owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ go_library(
importpath = "github.com/pingcap/tidb/owner",
visibility = ["//visibility:public"],
deps = [
"//ddl/util",
"//metrics",
"//parser/terror",
"//util",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_api_v3//mvccpb",
"@io_etcd_go_etcd_api_v3//v3rpc/rpctypes",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -32,10 +34,11 @@ go_test(
],
embed = [":owner"],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//ddl",
"//infoschema",
"//kv",
"//parser/terror",
"//store/mockstore",
"//testkit/testsetup",
Expand Down
122 changes: 108 additions & 14 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package owner

import (
"bytes"
"context"
"fmt"
"os"
Expand All @@ -25,6 +26,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/terror"
util2 "github.com/pingcap/tidb/util"
Expand All @@ -46,6 +48,8 @@ type Manager interface {
RetireOwner()
// GetOwnerID gets the owner ID.
GetOwnerID(ctx context.Context) (string, error)
// SetOwnerOpValue updates the owner op value.
SetOwnerOpValue(ctx context.Context, op OpType) error
// CampaignOwner campaigns the owner.
CampaignOwner() error
// ResignOwner lets the owner start a new election.
Expand All @@ -65,6 +69,25 @@ const (
keyOpDefaultTimeout = 5 * time.Second
)

// OpType is the owner key value operation type.
type OpType byte

// List operation of types.
const (
OpNone OpType = 0
OpGetUpgradingState OpType = 1
)

// String implements fmt.Stringer interface.
func (ot OpType) String() string {
switch ot {
case OpGetUpgradingState:
return "get upgrading state"
default:
return "none"
}
}

// DDLOwnerChecker is used to check whether tidb is owner.
type DDLOwnerChecker interface {
// IsOwner returns whether the ownerManager is the owner.
Expand Down Expand Up @@ -225,6 +248,12 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
return
}
case <-campaignContext.Done():
failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if v.(string) == "delOwnerKeyAndNotOwner" {
logutil.Logger(logCtx).Info("mock break campaign and don't clear related info")
return
}
})
logutil.Logger(logCtx).Info("break campaign loop, context is done")
m.revokeSession(logPrefix, etcdSession.Lease())
return
Expand All @@ -248,7 +277,7 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

ownerKey, err := GetOwnerInfo(campaignContext, logCtx, elec, m.id)
ownerKey, err := GetOwnerKey(campaignContext, logCtx, m.etcdCli, m.key, m.id)
if err != nil {
continue
}
Expand All @@ -274,32 +303,97 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...)
_, ownerID, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, error) {
var op OpType
resp, err := etcdCli.Get(ctx, ownerPath, clientv3.WithFirstCreate()...)
if err != nil {
return "", errors.Trace(err)
logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err))
return "", nil, op, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", concurrency.ErrElectionNoLeader
return "", nil, op, 0, concurrency.ErrElectionNoLeader
}
return string(resp.Kvs[0].Value), nil

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].ModRevision, nil
}

// GetOwnerInfo gets the owner information.
func GetOwnerInfo(ctx, logCtx context.Context, elec *concurrency.Election, id string) (string, error) {
resp, err := elec.Leader(ctx)
// GetOwnerKey gets the owner key information.
func GetOwnerKey(ctx, logCtx context.Context, etcdCli *clientv3.Client, etcdKey, id string) (string, error) {
ownerKey, ownerID, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
if err != nil {
// If no leader elected currently, it returns ErrElectionNoLeader.
logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err))
return "", errors.Trace(err)
}
ownerID := string(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.String("ownerID", ownerID))
if ownerID != id {
if string(ownerID) != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
}

return string(resp.Kvs[0].Key), nil
return ownerKey, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
vals := bytes.Split(val, []byte("_"))
var op OpType
if len(vals) == 2 {
op = OpType(vals[1][0])
}
return vals[0], op
}

func joinOwnerValues(vals ...[]byte) []byte {
return bytes.Join(vals, []byte("_"))
}

// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
if currOp == op {
logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op))
return nil
}
if string(ownerID) != m.id {
return errors.New("ownerInfoNotMatch")
}
newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)})

failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if valStr, ok := v.(string); ok {
if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil {
failpoint.Return(err)
}
}
})

resp, err := m.etcdCli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)).
Then(clientv3.OpPut(ownerKey, string(newOwnerVal))).
Commit()
logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID),
zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Bool("isSuc", resp.Succeeded), zap.Error(err))
if !resp.Succeeded {
err = errors.New("put owner key failed, cmp is false")
}
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}

// GetOwnerOpValue gets the owner op value.
func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) {
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
Expand Down
Loading