Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add global-state-syncer for handling data to etcd. #43113

Merged
merged 5 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ type DDL interface {
RegisterStatsHandle(*handle.Handle)
// SchemaSyncer gets the schema syncer.
SchemaSyncer() syncer.SchemaSyncer
// StateSyncer gets the cluster state syncer.
StateSyncer() syncer.StateSyncer
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager
// GetID gets the ddl ID.
Expand Down Expand Up @@ -324,6 +326,7 @@ type ddlCtx struct {
store kv.Storage
ownerManager owner.Manager
schemaSyncer syncer.SchemaSyncer
stateSyncer syncer.StateSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -617,6 +620,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
id := uuid.New().String()
var manager owner.Manager
var schemaSyncer syncer.SchemaSyncer
var stateSyncer syncer.StateSyncer
var deadLockCkr util.DeadTableLockChecker
if etcdCli := opt.EtcdCli; etcdCli == nil {
// The etcdCli is nil if the store is localstore which is only used for testing.
Expand All @@ -626,6 +630,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
} else {
manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
schemaSyncer = syncer.NewSchemaSyncer(etcdCli, id)
stateSyncer = syncer.NewStateSyncer(etcdCli, util.ServerGlobalState)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
}

Expand All @@ -645,6 +650,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: schemaSyncer,
stateSyncer: stateSyncer,
binlogCli: binloginfo.GetPumpsClient(),
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
Expand Down Expand Up @@ -906,6 +912,11 @@ func (d *ddl) SchemaSyncer() syncer.SchemaSyncer {
return d.schemaSyncer
}

// StateSyncer implements DDL.StateSyncer interface.
func (d *ddl) StateSyncer() syncer.StateSyncer {
return d.stateSyncer
}

// OwnerManager implements DDL.OwnerManager interface.
func (d *ddl) OwnerManager() owner.Manager {
return d.ownerManager
Expand Down
5 changes: 5 additions & 0 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,11 @@ func (d Checker) SchemaSyncer() syncer.SchemaSyncer {
return d.realDDL.SchemaSyncer()
}

// StateSyncer implements the DDL interface.
func (d Checker) StateSyncer() syncer.StateSyncer {
return d.realDDL.StateSyncer()
}

// OwnerManager implements the DDL interface.
func (d Checker) OwnerManager() owner.Manager {
return d.realDDL.OwnerManager()
Expand Down
5 changes: 5 additions & 0 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,11 @@ func (SchemaTracker) SchemaSyncer() syncer.SchemaSyncer {
return nil
}

// StateSyncer implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) StateSyncer() syncer.StateSyncer {
return nil
}

// OwnerManager implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) OwnerManager() owner.Manager {
return nil
Expand Down
11 changes: 9 additions & 2 deletions ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "syncer",
srcs = ["syncer.go"],
srcs = [
"state_syncer.go",
"syncer.go",
],
importpath = "github.com/pingcap/tidb/ddl/syncer",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -14,6 +17,7 @@ go_library(
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_api_v3//mvccpb",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_zap//:zap",
Expand All @@ -23,7 +27,10 @@ go_library(
go_test(
name = "syncer_test",
timeout = "short",
srcs = ["syncer_test.go"],
srcs = [
"state_syncer_test.go",
"syncer_test.go",
],
flaky = True,
deps = [
":syncer",
Expand Down
198 changes: 198 additions & 0 deletions ddl/syncer/state_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncer

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/metrics"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
)

const (
// keyOpDefaultTimeout is the default time out for etcd store.
keyOpDefaultTimeout = 1 * time.Second
statePrompt = "global-state-syncer"
// StateUpgrading represents the cluster global state is upgrading. It is exports for testing.
StateUpgrading = "upgrading"
// StateNormalRunning represents the cluster global state is normal running. It is exports for testing.
StateNormalRunning = ""
)

// StateSyncer is used to synchronize schema version between the DDL worker leader and followers through etcd.
type StateSyncer interface {
// Init sets the global schema version path to etcd if it isn't exist,
// then watch this path, and initializes the self schema version to etcd.
Init(ctx context.Context) error
// UpdateGlobalState updates the latest version to the global path on etcd until updating is successful or the ctx is done.
UpdateGlobalState(ctx context.Context, stateInfo *StateInfo) error
// GetGlobalState gets the global state from etcd.
GetGlobalState(ctx context.Context) (*StateInfo, error)
// IsUpgradingState returns whether the cluster state is upgrading.
IsUpgradingState() bool
WatchChan() clientv3.WatchChan
Rewatch(ctx context.Context)
}

// StateInfo is the tidb cluster state.
// It will not be updated when the tidb cluster upgrading.
type StateInfo struct {
State string `json:"state"`
Version string `json:"version"`
zimulala marked this conversation as resolved.
Show resolved Hide resolved
}

// Marshal `StateInfo` into bytes.
func (info *StateInfo) Marshal() ([]byte, error) {
infoBuf, err := json.Marshal(info)
if err != nil {
return nil, errors.Trace(err)
}
return infoBuf, nil
}

// Unmarshal `StateInfo` from bytes.
func (info *StateInfo) Unmarshal(v []byte) error {
return json.Unmarshal(v, info)
}

type serverStateSyncer struct {
etcdPath string
prompt string
etcdCli *clientv3.Client
session *concurrency.Session
clusterState *StateInfo
globalStateWatcher watcher
}

// NewStateSyncer creates a new StateSyncer.
func NewStateSyncer(etcdCli *clientv3.Client, etcdPath string) StateSyncer {
return &serverStateSyncer{
etcdCli: etcdCli,
etcdPath: etcdPath,
prompt: statePrompt,
}
}

// Init implements StateSyncer.Init interface.
func (s *serverStateSyncer) Init(ctx context.Context) error {
startTime := time.Now()
var err error
defer func() {
metrics.DeploySyncerHistogram.WithLabelValues(metrics.StateSyncerInit, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

logPrefix := fmt.Sprintf("[%s] %s", s.prompt, s.etcdPath)
s.session, err = tidbutil.NewSession(ctx, logPrefix, s.etcdCli, tidbutil.NewSessionDefaultRetryCnt, util.SessionTTL)
if err != nil {
return errors.Trace(err)
}

s.clusterState, err = s.GetGlobalState(ctx)
if err != nil {
return errors.Trace(err)
}
s.globalStateWatcher.Watch(ctx, s.etcdCli, s.etcdPath)

return errors.Trace(err)
}

// WatchChan implements StateSyncer.WatchChan interface.
func (s *serverStateSyncer) WatchChan() clientv3.WatchChan {
return s.globalStateWatcher.WatchChan()
}

// Rewatch implements StateSyncer.Rewatch interface.
func (s *serverStateSyncer) Rewatch(ctx context.Context) {
s.globalStateWatcher.Rewatch(ctx, s.etcdCli, s.etcdPath)
}

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *serverStateSyncer) IsUpgradingState() bool {
return s.clusterState.State == StateUpgrading
}

func (s *serverStateSyncer) getKeyValue(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt int, timeout time.Duration, opts ...clientv3.OpOption) ([]*mvccpb.KeyValue, error) {
var err error
var resp *clientv3.GetResponse
for i := 0; i < retryCnt; i++ {
select {
case <-ctx.Done():
err = errors.Trace(ctx.Err())
return nil, err
default:
}

childCtx, cancel := context.WithTimeout(ctx, timeout)
resp, err = etcdCli.Get(childCtx, key, opts...)
cancel()
if err != nil {
logutil.BgLogger().Info("get key failed", zap.String("key", key), zap.Error(err))
time.Sleep(200 * time.Millisecond)
continue
}
if len(resp.Kvs) == 0 {
return nil, nil
}
return resp.Kvs, nil
}
return nil, errors.Trace(err)
}

// GetGlobalState implements StateSyncer.GetGlobalState interface.
func (s *serverStateSyncer) GetGlobalState(ctx context.Context) (*StateInfo, error) {
startTime := time.Now()
kvs, err := s.getKeyValue(ctx, s.etcdCli, s.etcdPath, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
return nil, errors.Trace(err)
}
state := &StateInfo{}
if len(kvs) != 1 {
if len(kvs) > 0 {
return nil, errors.Errorf("get key value count:%d wrong", len(kvs))
}
s.clusterState = state
return state, nil
}
err = state.Unmarshal(kvs[0].Value)
if err != nil {
logutil.BgLogger().Warn("get global state failed", zap.String("key", s.etcdPath), zap.ByteString("value", kvs[0].Value), zap.Error(err))
return nil, errors.Trace(err)
}
s.clusterState = state
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.UpdateGlobalState, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return state, nil
}

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *serverStateSyncer) UpdateGlobalState(ctx context.Context, stateInfo *StateInfo) error {
startTime := time.Now()
stateStr, err := stateInfo.Marshal()
if err != nil {
return err
}
err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, s.etcdPath, string(stateStr))
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.UpdateGlobalState, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}
Loading