Skip to content

Commit

Permalink
Add binlog_status for http api and TIDB_SERVERS_INFO table (#13025)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and sre-bot committed Nov 6, 2019
1 parent aaaf254 commit 903cd1f
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 31 deletions.
36 changes: 19 additions & 17 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,29 +301,31 @@ timezone.*
```shell
$curl http://127.0.0.1:10080/info/all
{
"servers_num": 2,
"owner_id": "29a65ec0-d931-4f9e-a212-338eaeffab96",
"is_all_server_version_consistent": true,
"all_servers_info": {
"275a19ae-d248-4dc0-b78c-6613a7509423": {
"ddl_id": "275a19ae-d248-4dc0-b78c-6613a7509423",
"git_hash": "f572e33854e1c0f942f031e9656d0004f99995c6",
"ip": "192.168.197.206",
"29a65ec0-d931-4f9e-a212-338eaeffab96": {
"version": "5.7.25-TiDB-v4.0.0-alpha-669-g8f2a09a52-dirty",
"git_hash": "8f2a09a52fdcaf9d9bfd775d2c6023f363dc121e",
"ddl_id": "29a65ec0-d931-4f9e-a212-338eaeffab96",
"ip": "",
"listening_port": 4000,
"status_port": 10080,
"lease": "45s",
"binlog_status": "Off"
},
"cd13c9eb-c3ee-4887-af9b-e64f3162d92c": {
"version": "5.7.25-TiDB-v4.0.0-alpha-669-g8f2a09a52-dirty",
"git_hash": "8f2a09a52fdcaf9d9bfd775d2c6023f363dc121e",
"ddl_id": "cd13c9eb-c3ee-4887-af9b-e64f3162d92c",
"ip": "",
"listening_port": 4001,
"status_port": 10081,
"version": "5.7.25-TiDB-v2.1.0-rc.3-355-gf572e3385-dirty"
},
"f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77": {
"ddl_id": "f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77",
"git_hash": "f572e33854e1c0f942f031e9656d0004f99995c6",
"ip": "192.168.197.206",
"lease": "45s",
"listening_port": 4000,
"status_port": 10080,
"version": "5.7.25-TiDB-v2.1.0-rc.3-355-gf572e3385-dirty"
"binlog_status": "Off"
}
},
"is_all_server_version_consistent": true,
"owner_id": "f7e73ed5-63b4-4cb4-ba7c-42b32dc74e77",
"servers_num": 2
}
}
```

Expand Down
33 changes: 21 additions & 12 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
util2 "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -67,11 +68,12 @@ type InfoSyncer struct {
// It will not be updated when tidb-server running. So please only put static information in ServerInfo struct.
type ServerInfo struct {
ServerVersionInfo
ID string `json:"ddl_id"`
IP string `json:"ip"`
Port uint `json:"listening_port"`
StatusPort uint `json:"status_port"`
Lease string `json:"lease"`
ID string `json:"ddl_id"`
IP string `json:"ip"`
Port uint `json:"listening_port"`
StatusPort uint `json:"status_port"`
Lease string `json:"lease"`
BinlogStatus string `json:"binlog_status"`
}

// ServerVersionInfo is the server version and git_hash.
Expand Down Expand Up @@ -263,7 +265,11 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt
return err
}
is.session = session

binloginfo.RegisterStatusListener(func(status binloginfo.BinlogStatus) error {
is.info.BinlogStatus = status.String()
err := is.storeServerInfo(ctx)
return errors.Trace(err)
})
err = is.storeServerInfo(ctx)
return err
}
Expand All @@ -289,7 +295,9 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
continue
}
for _, kv := range resp.Kvs {
info := &ServerInfo{}
info := &ServerInfo{
BinlogStatus: binloginfo.BinlogStatusUnknown.String(),
}
err = json.Unmarshal(kv.Value, info)
if err != nil {
logutil.BgLogger().Info("get key failed", zap.String("key", string(kv.Key)), zap.ByteString("value", kv.Value),
Expand All @@ -307,11 +315,12 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
func getServerInfo(id string) *ServerInfo {
cfg := config.GetGlobalConfig()
info := &ServerInfo{
ID: id,
IP: cfg.AdvertiseAddress,
Port: cfg.Port,
StatusPort: cfg.Status.StatusPort,
Lease: cfg.Lease,
ID: id,
IP: cfg.AdvertiseAddress,
Port: cfg.Port,
StatusPort: cfg.Status.StatusPort,
Lease: cfg.Lease,
BinlogStatus: binloginfo.GetStatus().String(),
}
info.Version = mysql.ServerVersion
info.GitHash = printer.TiDBGitHash
Expand Down
2 changes: 2 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ var tableTiDBServersInfoCols = []columnInfo{
{"LEASE", mysql.TypeVarchar, 64, 0, nil, nil},
{"VERSION", mysql.TypeVarchar, 64, 0, nil, nil},
{"GIT_HASH", mysql.TypeVarchar, 64, 0, nil, nil},
{"BINLOG_STATUS", mysql.TypeVarchar, 64, 0, nil, nil},
}

func dataForTiKVRegionStatus(ctx sessionctx.Context) (records [][]types.Datum, err error) {
Expand Down Expand Up @@ -1837,6 +1838,7 @@ func dataForServersInfo() ([][]types.Datum, error) {
info.Lease, // LEASE
info.Version, // VERSION
info.GitHash, // GIT_HASH
info.BinlogStatus, // BINLOG_STATUS
)
rows = append(rows, row)
}
Expand Down
62 changes: 60 additions & 2 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -51,6 +52,33 @@ type BinlogInfo struct {
Client *pumpcli.PumpsClient
}

// BinlogStatus is the status of binlog
type BinlogStatus int

const (
//BinlogStatusUnknown stands for unknown binlog status
BinlogStatusUnknown BinlogStatus = iota
//BinlogStatusOn stands for the binlog is enabled
BinlogStatusOn
//BinlogStatusOff stands for the binlog is disabled
BinlogStatusOff
//BinlogStatusSkipping stands for the binlog status
BinlogStatusSkipping
)

// String implements String function in fmt.Stringer
func (s BinlogStatus) String() string {
switch s {
case BinlogStatusOn:
return "On"
case BinlogStatusOff:
return "Off"
case BinlogStatusSkipping:
return "Skipping"
}
return "Unknown"
}

// GetPumpsClient gets the pumps client instance.
func GetPumpsClient() *pumpcli.PumpsClient {
pumpsClientLock.RLock()
Expand Down Expand Up @@ -80,6 +108,9 @@ func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.Pr

var skipBinlog uint32
var ignoreError uint32
var statusListener = func(_ BinlogStatus) error {
return nil
}

// DisableSkipBinlogFlag disable the skipBinlog flag.
func DisableSkipBinlogFlag() {
Expand All @@ -97,6 +128,24 @@ func SetIgnoreError(on bool) {
}
}

// GetStatus gets the status of binlog
func GetStatus() BinlogStatus {
conf := config.GetGlobalConfig()
if !conf.Binlog.Enable {
return BinlogStatusOff
}
skip := atomic.LoadUint32(&skipBinlog)
if skip > 0 {
return BinlogStatusSkipping
}
return BinlogStatusOn
}

// RegisterStatusListener registers a listener function to watch binlog status
func RegisterStatusListener(listener func(BinlogStatus) error) {
statusListener = listener
}

// WriteBinlog writes a binlog to Pump.
func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
skip := atomic.LoadUint32(&skipBinlog)
Expand All @@ -112,12 +161,21 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
// it will retry in PumpsClient if write binlog fail.
err := info.Client.WriteBinlog(info.Data)
if err != nil {
logutil.BgLogger().Error("write binlog failed", zap.Error(err))
logutil.BgLogger().Error("write binlog failed",
zap.String("binlog_type", info.Data.Tp.String()),
zap.Uint64("binlog_start_ts", uint64(info.Data.StartTs)),
zap.Uint64("binlog_commit_ts", uint64(info.Data.CommitTs)),
zap.Error(err))
if atomic.LoadUint32(&ignoreError) == 1 {
logutil.BgLogger().Error("write binlog fail but error ignored")
metrics.CriticalErrorCounter.Add(1)
// If error happens once, we'll stop writing binlog.
atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
swapped := atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
if swapped && skip == 0 {
if err := statusListener(BinlogStatusSkipping); err != nil {
logutil.BgLogger().Warn("update binlog status failed", zap.Error(err))
}
}
return nil
}

Expand Down

0 comments on commit 903cd1f

Please sign in to comment.