From 724379d5446d6b3c2512d55eba04580172548e76 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 26 Nov 2024 14:52:43 +0800 Subject: [PATCH 1/2] change --- pkg/owner/BUILD.bazel | 4 +- pkg/owner/manager.go | 161 ++++++++++++++++-------------------- pkg/owner/manager_test.go | 103 +++++++++++++++++++---- pkg/session/sync_upgrade.go | 2 +- 4 files changed, 164 insertions(+), 106 deletions(-) diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel index e494e0f54666f..ad74fc9e74c1d 100644 --- a/pkg/owner/BUILD.bazel +++ b/pkg/owner/BUILD.bazel @@ -37,12 +37,12 @@ go_test( ], embed = [":owner"], flaky = True, - shard_count = 10, + shard_count = 11, deps = [ "//pkg/parser/terror", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", "//pkg/util", - "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index d71538f0df0ba..99d28380a64ec 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -90,12 +90,11 @@ type Manager interface { ForceToBeOwner(ctx context.Context) error } -const ( - keyOpDefaultTimeout = 5 * time.Second +const keyOpDefaultTimeout = 5 * time.Second - // WaitTimeOnForceOwner is the time to wait before or after force to be owner. - WaitTimeOnForceOwner = 5 * time.Second -) +// WaitTimeOnForceOwner is the time to wait before or after force to be owner. +// make it a var for test. +var WaitTimeOnForceOwner = 5 * time.Second // OpType is the owner key value operation type. type OpType byte @@ -133,8 +132,7 @@ type ownerManager struct { key string ctx context.Context prompt string - logPrefix string - logCtx context.Context + logger *zap.Logger etcdCli *clientv3.Client elec atomic.Pointer[concurrency.Election] sessionLease *atomicutil.Int64 @@ -147,15 +145,13 @@ type ownerManager struct { // NewOwnerManager creates a new Manager. func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager { - logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id) return &ownerManager{ etcdCli: etcdCli, id: id, key: key, ctx: ctx, prompt: prompt, - logPrefix: logPrefix, - logCtx: logutil.WithKeyValue(context.Background(), "owner info", logPrefix), + logger: logutil.BgLogger().With(zap.String("key", key), zap.String("id", id)), sessionLease: atomicutil.NewInt64(0), } } @@ -181,8 +177,7 @@ func (m *ownerManager) SetListener(listener Listener) { } func (m *ownerManager) ForceToBeOwner(context.Context) error { - logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) - logutil.BgLogger().Info("force to be owner", zap.String("ownerInfo", logPrefix)) + m.logger.Info("force to be owner") if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ManagerSessionTTL); err != nil { return errors.Trace(err) } @@ -206,7 +201,7 @@ func (m *ownerManager) ForceToBeOwner(context.Context) error { // immediately after we delete their key. time.Sleep(WaitTimeOnForceOwner) if err := m.tryToBeOwnerOnce(); err != nil { - logutil.Logger(m.logCtx).Warn("failed to retire owner on older version", zap.Error(err)) + m.logger.Warn("failed to retire owner on older version", zap.Error(err)) continue } break @@ -290,15 +285,13 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { if len(withTTL) == 1 { ttl = withTTL[0] } - logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) if m.etcdSes == nil { - logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) + m.logger.Info("start campaign owner") if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ttl); err != nil { return errors.Trace(err) } } else { - logutil.BgLogger().Info("start campaign owner with existing session", - zap.String("ownerInfo", logPrefix), + m.logger.Info("start campaign owner with existing session", zap.String("lease", util2.FormatLeaseID(m.etcdSes.Lease()))) } m.wg.Add(1) @@ -322,13 +315,13 @@ func (m *ownerManager) ResignOwner(ctx context.Context) error { return errors.Trace(err) } - logutil.Logger(m.logCtx).Warn("resign owner success") + m.logger.Warn("resign owner success") return nil } func (m *ownerManager) toBeOwner(elec *concurrency.Election) { m.elec.Store(elec) - logutil.Logger(m.logCtx).Info("become owner") + m.logger.Info("become owner") if m.listener != nil { m.listener.OnBecomeOwner() } @@ -337,7 +330,7 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) { // RetireOwner make the manager to be a not owner. func (m *ownerManager) RetireOwner() { m.elec.Store(nil) - logutil.Logger(m.logCtx).Info("retire owner") + m.logger.Info("retire owner") if m.listener != nil { m.listener.OnRetireOwner() } @@ -360,80 +353,82 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context) { defer func() { m.campaignCancel() if r := recover(); r != nil { - logutil.BgLogger().Error("recover panic", zap.String("prompt", m.prompt), zap.Any("error", r), zap.Stack("buffer")) + m.logger.Error("recover panic", zap.String("prompt", m.prompt), zap.Any("error", r), zap.Stack("buffer")) metrics.PanicCounter.WithLabelValues(metrics.LabelDDLOwner).Inc() } m.wg.Done() }() - logCtx := m.logCtx - var err error leaseNotFoundCh := make(chan struct{}) for { - if err != nil { - metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc() - } - select { case <-m.etcdSes.Done(): - logutil.Logger(logCtx).Info("etcd session done, refresh it") + m.logger.Info("etcd session done, refresh it") if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { - logutil.Logger(logCtx).Info("break campaign loop, refresh session failed", zap.Error(err2)) + m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2)) return } case <-leaseNotFoundCh: - logutil.Logger(logCtx).Info("meet lease not found error, refresh session") + m.logger.Info("meet lease not found error, refresh session") if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { - logutil.Logger(logCtx).Info("break campaign loop, refresh session failed", zap.Error(err2)) + m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2)) return } leaseNotFoundCh = make(chan struct{}) case <-campaignContext.Done(): failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { if v.(string) == "delOwnerKeyAndNotOwner" { - logutil.Logger(logCtx).Info("mock break campaign and don't clear related info") + m.logger.Info("mock break campaign and don't clear related info") return } }) - logutil.Logger(logCtx).Info("break campaign loop, context is done") + m.logger.Info("break campaign loop, context is done") return default: } - // If the etcd server turns clocks forward,the following case may occur. - // The etcd server deletes this session's lease ID, but etcd session doesn't find it. - // In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound. - if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { - close(leaseNotFoundCh) - err = nil - continue - } - elec := concurrency.NewElection(m.etcdSes, m.key) - err = elec.Campaign(campaignContext, m.id) - if err != nil { - logutil.Logger(logCtx).Info("failed to campaign", zap.Error(err)) - continue - } + if err := m.campaignAndWatch(campaignContext); err != nil { + metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc() - ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id) - if err != nil { - continue + // If the etcd server turns clocks forward,the following case may occur. + // The etcd server deletes this session's lease ID, but etcd session doesn't find it. + // In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound. + if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { + close(leaseNotFoundCh) + } + m.logger.Info("campaign and watch failed", zap.Error(err)) } + } +} - m.toBeOwner(elec) - err = m.watchOwner(campaignContext, m.etcdSes, ownerKey, currRev) - logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err)) - m.RetireOwner() +func (m *ownerManager) campaignAndWatch(ctx context.Context) error { + elec := concurrency.NewElection(m.etcdSes, m.key) + failpoint.InjectCall("beforeElectionCampaign", m.etcdSes) + err := elec.Campaign(ctx, m.id) + if err != nil { + return err + } - metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() - logutil.Logger(logCtx).Info("is not the owner") + ownerKey, currRev, err := GetOwnerKeyInfo(ctx, m.etcdCli, m.key, m.id) + if err != nil { + return err } + + m.toBeOwner(elec) + + err = m.watchOwner(ctx, m.etcdSes, ownerKey, currRev) + m.logger.Info("watch owner finished", zap.Error(err)) + m.RetireOwner() + + metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() + m.logger.Info("is not the owner") + return err } func (m *ownerManager) closeSession() { if m.etcdSes != nil { if err := m.etcdSes.Close(); err != nil { - logutil.Logger(m.logCtx).Info("etcd session close failed", zap.Error(err)) + m.logger.Info("etcd session close failed", zap.Error(err)) } m.etcdSes = nil } @@ -447,7 +442,8 @@ func (m *ownerManager) refreshSession(retryCnt, ttl int) error { // One drawback is that when you want to break the campaign loop, and the campaign // loop is refreshing the session, it might wait for a long time to return, it // should be fine as long as network is ok, and acceptable to wait when not. - sess, err2 := util2.NewSession(m.ctx, m.logPrefix, m.etcdCli, retryCnt, ttl) + logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id) + sess, err2 := util2.NewSession(m.ctx, logPrefix, m.etcdCli, retryCnt, ttl) if err2 != nil { return errors.Trace(err2) } @@ -456,26 +452,17 @@ func (m *ownerManager) refreshSession(retryCnt, ttl int) error { return nil } -func (m *ownerManager) revokeSession(leaseID clientv3.LeaseID) { - // Revoke the session lease. - // If revoke takes longer than the ttl, lease is expired anyway. - cancelCtx, cancel := context.WithTimeout(context.Background(), - time.Duration(ManagerSessionTTL)*time.Second) - _, err := m.etcdCli.Revoke(cancelCtx, leaseID) - cancel() - logutil.Logger(m.logCtx).Info("revoke session", zap.Error(err)) -} - // GetOwnerID implements Manager.GetOwnerID interface. func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) { - _, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + _, ownerID, _, _, _, err := getOwnerInfo(ctx, m.etcdCli, m.key) return string(ownerID), errors.Trace(err) } -func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) { +func getOwnerInfo(ctx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) { var op OpType var resp *clientv3.GetResponse var err error + logger := logutil.BgLogger().With(zap.String("key", ownerPath)) for i := 0; i < 3; i++ { if err = ctx.Err(); err != nil { return "", nil, op, 0, 0, errors.Trace(err) @@ -487,11 +474,11 @@ func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPa if err == nil { break } - logutil.Logger(logCtx).Info("etcd-cli get owner info failed", zap.String("key", ownerPath), zap.Int("retryCnt", i), zap.Error(err)) + logger.Info("etcd-cli get owner info failed", zap.Int("retryCnt", i), zap.Error(err)) time.Sleep(util.KeyOpRetryInterval) } if err != nil { - logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err)) + logger.Warn("etcd-cli get owner info failed", zap.Error(err)) return "", nil, op, 0, 0, errors.Trace(err) } if len(resp.Kvs) == 0 { @@ -500,23 +487,23 @@ func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPa var ownerID []byte ownerID, op = splitOwnerValues(resp.Kvs[0].Value) - logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key), + logger.Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key), zap.ByteString("ownerID", ownerID), zap.Stringer("op", op)) return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil } // GetOwnerKeyInfo gets the owner key and current revision. func GetOwnerKeyInfo( - ctx, logCtx context.Context, + ctx context.Context, etcdCli *clientv3.Client, etcdKey, id string, ) (string, int64, error) { - ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey) + ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, etcdCli, etcdKey) if err != nil { return "", 0, errors.Trace(err) } if string(ownerID) != id { - logutil.Logger(logCtx).Warn("is not the owner") + logutil.BgLogger().Warn("is not the owner", zap.String("id", id), zap.String("ownerID", string(ownerID))) return "", 0, errors.New("ownerInfoNotMatch") } @@ -539,12 +526,12 @@ func joinOwnerValues(vals ...[]byte) []byte { // SetOwnerOpValue implements Manager.SetOwnerOpValue interface. func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error { // owner don't change. - ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.etcdCli, m.key) if err != nil { return errors.Trace(err) } if currOp == op { - logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op)) + m.logger.Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op)) return nil } if string(ownerID) != m.id { @@ -568,21 +555,20 @@ func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error { if err == nil && !resp.Succeeded { err = errors.New("put owner key failed, cmp is false") } - logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID), + m.logger.Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID), zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err)) metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc() return errors.Trace(err) } // GetOwnerOpValue gets the owner op value. -func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) { +func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath string) (OpType, error) { // It's using for testing. if etcdCli == nil { return *mockOwnerOpValue.Load(), nil } - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath) + _, _, op, _, _, err := getOwnerInfo(ctx, etcdCli, ownerPath) return op, errors.Trace(err) } @@ -596,9 +582,8 @@ func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency. } func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error { - logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key) - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - logutil.BgLogger().Debug(logPrefix) + logger := m.logger.With(zap.String("ownerKey", key), zap.Int64("currRev", currRev)) + logger.Info("watching owner key") // we need to watch the ownerKey since currRev + 1. watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1)) for { @@ -606,19 +591,19 @@ func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency. case resp, ok := <-watchCh: if !ok { metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc() - logutil.Logger(logCtx).Info("watcher is closed, no owner") + logger.Info("watcher is closed, no owner") return errors.Errorf("watcher is closed, key: %v", key) } if resp.Canceled { metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc() - logutil.Logger(logCtx).Info("watch canceled, no owner") + logger.Info("watch canceled, no owner") return errors.Errorf("watch canceled, key: %v", key) } for _, ev := range resp.Events { if ev.Type == mvccpb.DELETE { metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc() - logutil.Logger(logCtx).Info("watch failed, owner is deleted") + logger.Info("watch failed, owner is deleted") return nil } } diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index 61b5be6fceadf..b1dd667b37c56 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/owner" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -65,6 +65,45 @@ func (l *listener) OnRetireOwner() { l.val.Store(false) } +func TestForceToBeOwner(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + client := tInfo.client + defer tInfo.Close(t) + + // put a key with same prefix to mock another node + ctx := context.Background() + testKey := "/owner/key/a" + _, err := client.Put(ctx, testKey, "a") + require.NoError(t, err) + resp, err := client.Get(ctx, testKey) + require.NoError(t, err) + require.Len(t, resp.Kvs, 1) + + bak := owner.WaitTimeOnForceOwner + t.Cleanup(func() { + owner.WaitTimeOnForceOwner = bak + }) + owner.WaitTimeOnForceOwner = time.Millisecond + ownerMgr := owner.NewOwnerManager(ctx, client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() + lis := &listener{} + ownerMgr.SetListener(lis) + require.NoError(t, ownerMgr.ForceToBeOwner(ctx)) + // key of other node is deleted + resp, err = client.Get(ctx, testKey) + require.NoError(t, err) + require.Empty(t, resp.Kvs) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) + require.True(t, isOwner) + require.True(t, lis.val.Load()) +} + func TestSingle(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") @@ -74,6 +113,42 @@ func TestSingle(t *testing.T) { tInfo := newTestInfo(t) client := tInfo.client defer tInfo.Close(t) + + t.Run("retry on session closed before election", func(t *testing.T) { + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() + var counter atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/owner/beforeElectionCampaign", + func(se *concurrency.Session) { + if counter.Add(1) <= 1 { + require.NoError(t, se.Close()) + } + }, + ) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) + require.True(t, isOwner) + require.EqualValues(t, 2, counter.Load()) + }) + + t.Run("retry on lease revoked before election", func(t *testing.T) { + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() + var counter atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/owner/beforeElectionCampaign", + func(se *concurrency.Session) { + if counter.Add(1) <= 2 { + _, err := client.Revoke(context.Background(), se.Lease()) + require.NoError(t, err) + } + }, + ) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) + require.True(t, isOwner) + require.EqualValues(t, 3, counter.Load()) + }) + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") lis := &listener{} ownerMgr.SetListener(lis) @@ -107,7 +182,7 @@ func TestSingle(t *testing.T) { // err is ok to be not nil since we canceled the manager. ownerID, _ := ownerMgr2.GetOwnerID(ctx) require.Equal(t, "", ownerID) - op, _ := owner.GetOwnerOpValue(ctx, client, "/owner/key", "log prefix") + op, _ := owner.GetOwnerOpValue(ctx, client, "/owner/key") require.Equal(t, op, owner.OpNone) } @@ -130,20 +205,20 @@ func TestSetAndGetOwnerOpValue(t *testing.T) { ownerID, err := ownerMgr.GetOwnerID(context.Background()) require.NoError(t, err) require.Equal(t, ownerMgr.ID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.False(t, op.IsSyncedUpgradingState()) err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) require.True(t, op.IsSyncedUpgradingState()) // update the same as the original value err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) require.True(t, op.IsSyncedUpgradingState()) @@ -151,7 +226,7 @@ func TestSetAndGetOwnerOpValue(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`)) err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpNone) require.Error(t, err, "put owner key failed, cmp is false") - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.NotNil(t, err) require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error()) require.Equal(t, op, owner.OpNone) @@ -169,7 +244,7 @@ func TestSetAndGetOwnerOpValue(t *testing.T) { require.Error(t, err, "put owner key failed, cmp is false") isOwner = checkOwner(ownerMgr, true) require.True(t, isOwner) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) @@ -192,13 +267,13 @@ func TestGetOwnerOpValueBeforeSet(t *testing.T) { ownerID, err := ownerMgr.GetOwnerID(context.Background()) require.NoError(t, err) require.Equal(t, ownerMgr.ID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), nil, "/owner/key", "log prefix") + op, err := owner.GetOwnerOpValue(context.Background(), nil, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp")) err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), nil, "/owner/key", "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), nil, "/owner/key") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) } @@ -248,11 +323,9 @@ func TestCluster(t *testing.T) { // Cancel the owner context, there is no owner. ownerMgr2.Close() - logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", "/owner/key", "useless id") - logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, tInfo.client, "/owner/key", "useless id") + _, _, err = owner.GetOwnerKeyInfo(context.Background(), tInfo.client, "/owner/key", "useless id") require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) - op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", logPrefix) + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key") require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) require.Equal(t, op, owner.OpNone) } @@ -284,7 +357,7 @@ func TestWatchOwner(t *testing.T) { require.NoError(t, err) // test the GetOwnerKeyInfo() - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, "/owner/key", id) + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, client, "/owner/key", id) require.NoError(t, err) // watch the ownerKey. @@ -346,7 +419,7 @@ func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) { require.NoError(t, err) // get the ownkey informations. - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, "/owner/key", id) + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, client, "/owner/key", id) require.NoError(t, err) // delete the ownerkey diff --git a/pkg/session/sync_upgrade.go b/pkg/session/sync_upgrade.go index 6f7f7fcfa497a..2430e5385013d 100644 --- a/pkg/session/sync_upgrade.go +++ b/pkg/session/sync_upgrade.go @@ -63,7 +63,7 @@ func SyncUpgradeState(s sessionctx.Context, timeout time.Duration) error { var op owner.OpType childCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - op, err = owner.GetOwnerOpValue(childCtx, dom.EtcdClient(), ddl.DDLOwnerKey, "upgrade bootstrap") + op, err = owner.GetOwnerOpValue(childCtx, dom.EtcdClient(), ddl.DDLOwnerKey) cancel() if err == nil && op.IsSyncedUpgradingState() { break From d29b19ed1201c0b1116b9d66118db083ab9e13cc Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 26 Nov 2024 15:01:10 +0800 Subject: [PATCH 2/2] change --- pkg/owner/manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 99d28380a64ec..c11ef66cf9bd1 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -503,7 +503,8 @@ func GetOwnerKeyInfo( return "", 0, errors.Trace(err) } if string(ownerID) != id { - logutil.BgLogger().Warn("is not the owner", zap.String("id", id), zap.String("ownerID", string(ownerID))) + logutil.BgLogger().Warn("is not the owner", zap.String("key", etcdKey), + zap.String("id", id), zap.String("ownerID", string(ownerID))) return "", 0, errors.New("ownerInfoNotMatch") }