Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: refine package server error log format in RFC #2713

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,19 @@ var (
ErrLoadValue = errors.Normalize("load value from etcd failed", errors.RFCCodeText("PD:etcdutil:ErrLoadValue"))
ErrGetCluster = errors.Normalize("get cluster from remote peer failed", errors.RFCCodeText("PD:etcdutil:ErrGetCluster"))
)

// server errors
var (
ErrHeartBeatLeaderNil = errors.Normalize("heartbeat nil leader found", errors.RFCCodeText("PD:server:ErrHeartBeatLeaderNil"))
ErrSchedulerOperation = errors.Normalize("scheduler operation failed", errors.RFCCodeText("PD:server:ErrSchedulerOperation"))
ErrStorageClose = errors.Normalize("close storage failed", errors.RFCCodeText("PD:server:ErrStorageClose"))
ErrDecodeConfig = errors.Normalize("decode config failed", errors.RFCCodeText("PD:server:ErrDecodeConfig"))
ErrSavePersistOptions = errors.Normalize("save persist options failed", errors.RFCCodeText("PD:server:ErrLoadPersistOptions"))
ErrLoadPersistOptions = errors.Normalize("load persist options failed", errors.RFCCodeText("PD:server:ErrSavePersistOptions"))
ErrCampaignLeader = errors.Normalize("campaign leader failed", errors.RFCCodeText("PD:server:ErrCampaignLeader"))
ErrUpdateTso = errors.Normalize("update timestamp to tso failed", errors.RFCCodeText("PD:server:ErrSyncTso"))
ErrStoreNotFound = errors.Normalize("store specified not found", errors.RFCCodeText("PD:server:ErrStoreNotFound"))
ErrCreateCluster = errors.Normalize("create cluster error", errors.RFCCodeText("PD:server:ErrCreateCluster"))
ErrGRPCHeartbeat = errors.Normalize("grpc heartbeat error", errors.RFCCodeText("PD:server:ErrGRPCHeartbeat"))
ErrSystemTime = errors.Normalize("system time error", errors.RFCCodeText("PD:server:ErrSystemTime"))
)
11 changes: 8 additions & 3 deletions pkg/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ import (
"go.uber.org/zap/zapcore"
)

// ZapError is used to make the log output eaiser.
func ZapError(err *errors.Error, causeError error) zap.Field {
e := err.Wrap(causeError).FastGenWithCause()
// ZapError is used to make the log output easier.
func ZapError(err *errors.Error, causeError ...error) zap.Field {
var e interface{}
if len(causeError) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the len > 1?

e = err.Wrap(causeError[0]).FastGenWithCause()
} else {
e = err.FastGenByArgs()
}
return zap.Field{Key: "error", Type: zapcore.ErrorType, Interface: e}
}
2 changes: 1 addition & 1 deletion pkg/errs/errs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *testErrorSuite) TestError(c *C) {
log.ReplaceGlobals(lg.Logger, nil)

rfc := `[error="[PD:tso:ErrInvalidTimestamp] invalid timestamp"]`
log.Error("test", zap.Error(ErrInvalidTimestamp.FastGenByArgs()))
log.Error("test", ZapError(ErrInvalidTimestamp))
c.Assert(strings.Contains(lg.Message(), rfc), IsTrue)
err := errors.New("test error")
log.Error("test", ZapError(ErrInvalidTimestamp, err))
Expand Down
3 changes: 2 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {

region := core.RegionFromHeartbeat(request)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("reqeust", request))
log.Error("invalid request, the leader is nil", zap.Reflect("reqeust", request), errs.ZapError(errs.ErrHeartBeatLeaderNil))
continue
}
if region.GetID() == 0 {
Expand Down
11 changes: 6 additions & 5 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -209,9 +210,9 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
if err = c.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the scheduler-name?

log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(errs.ErrSchedulerOperation, err))
} else if err = h.opt.Persist(c.GetStorage()); err != nil {
log.Error("can not persist scheduler config", zap.Error(err))
log.Error("can not persist scheduler config", errs.ZapError(errs.ErrSavePersistOptions, err))
}
return err
}
Expand All @@ -223,7 +224,7 @@ func (h *Handler) RemoveScheduler(name string) error {
return err
}
if err = c.RemoveScheduler(name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err))
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerOperation, err))
}
return err
}
Expand All @@ -238,9 +239,9 @@ func (h *Handler) PauseOrResumeScheduler(name string, t int64) error {
}
if err = c.PauseOrResumeScheduler(name, t); err != nil {
if t == 0 {
log.Error("can not resume scheduler", zap.String("scheduler-name", name), zap.Error(err))
log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerOperation, err))
} else {
log.Error("can not pause scheduler", zap.String("scheduler-name", name), zap.Error(err))
log.Error("can not pause scheduler", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerOperation, err))
}
}
return err
Expand Down
8 changes: 5 additions & 3 deletions server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -87,15 +88,16 @@ func (s *heartbeatStreams) run() {
if store == nil {
log.Error("failed to get store",
zap.Uint64("region-id", msg.RegionId),
zap.Uint64("store-id", storeID))
zap.Uint64("store-id", storeID),
errs.ZapError(errs.ErrStoreNotFound))
delete(s.streams, storeID)
continue
}
storeAddress := store.GetAddress()
if stream, ok := s.streams[storeID]; ok {
if err := stream.Send(msg); err != nil {
log.Error("send heartbeat message fail",
zap.Uint64("region-id", msg.RegionId), zap.Error(err))
zap.Uint64("region-id", msg.RegionId), errs.ZapError(errs.ErrGRPCHeartbeat, err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc()
} else {
Expand All @@ -111,7 +113,7 @@ func (s *heartbeatStreams) run() {
for storeID, stream := range s.streams {
store := s.cluster.GetStore(storeID)
if store == nil {
log.Error("failed to get store", zap.Uint64("store-id", storeID))
log.Error("failed to get store", zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrStoreNotFound))
delete(s.streams, storeID)
continue
}
Expand Down
37 changes: 19 additions & 18 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/logutil"
Expand Down Expand Up @@ -419,7 +420,7 @@ func (s *Server) Close() {
s.hbStreams.Close()
}
if err := s.storage.Close(); err != nil {
log.Error("close storage meet error", zap.Error(err))
log.Error("close storage meet error", errs.ZapError(errs.ErrStorageClose, err))
}

// Run callbacks
Expand All @@ -438,7 +439,7 @@ func (s *Server) IsClosed() bool {
// Run runs the pd server.
func (s *Server) Run() error {
go StartMonitor(s.ctx, time.Now, func() {
log.Error("system time jumps backward")
log.Error("system time jumps backward", errs.ZapError(errs.ErrSystemTime))
timeJumpBackCounter.Inc()
})
if err := s.startEtcd(s.ctx); err != nil {
Expand Down Expand Up @@ -717,7 +718,7 @@ func (s *Server) GetConfig() *config.Config {
log.Error("failed to decode scheduler config",
zap.String("config", configs[i]),
zap.String("scheduler", sche),
zap.Error(err))
errs.ZapError(errs.ErrDecodeConfig, err))
continue
}
payload[sche] = config
Expand Down Expand Up @@ -749,7 +750,7 @@ func (s *Server) SetScheduleConfig(cfg config.ScheduleConfig) error {
log.Error("failed to update schedule config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand Down Expand Up @@ -795,7 +796,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
log.Error("failed to update replication config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand Down Expand Up @@ -833,7 +834,7 @@ func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error {
log.Error("failed to update PDServer config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand All @@ -849,7 +850,7 @@ func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error {
log.Error("failed to update label property config",
zap.Reflect("new", cfg),
zap.Reflect("old", &old),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("label property config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand All @@ -867,7 +868,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error {
zap.String("labelKey", labelKey),
zap.String("labelValue", labelValue),
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}

Expand All @@ -886,7 +887,7 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error {
zap.String("labelKey", labelKey),
zap.String("labelValue", labelValue),
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}

Expand All @@ -913,7 +914,7 @@ func (s *Server) SetClusterVersion(v string) error {
log.Error("failed to update cluster version",
zap.String("old-version", old.String()),
zap.String("new-version", v),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("cluster version is updated", zap.String("new-version", v))
Expand Down Expand Up @@ -1020,7 +1021,7 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro
log.Error("failed to update replication mode config",
zap.Reflect("new", cfg),
zap.Reflect("old", &old),
zap.Error(err))
errs.ZapError(errs.ErrSavePersistOptions, err))
return err
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand All @@ -1038,7 +1039,7 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro
s.persistOptions.SetReplicationModeConfig(old)
revertErr := s.persistOptions.Persist(s.storage)
if revertErr != nil {
log.Error("failed to revert replication mode persistent config", zap.Error(err))
log.Error("failed to revert replication mode persistent config", errs.ZapError(errs.ErrSavePersistOptions, err))
}
}
return err
Expand All @@ -1064,7 +1065,7 @@ func (s *Server) leaderLoop() {
if leader != nil {
err := s.reloadConfigFromKV()
if err != nil {
log.Error("reload config failed", zap.Error(err))
log.Error("reload config failed", errs.ZapError(errs.ErrLoadPersistOptions, err))
continue
}
syncer := s.cluster.GetRegionSyncer()
Expand Down Expand Up @@ -1094,7 +1095,7 @@ func (s *Server) campaignLeader() {
zap.String("campaign-pd-leader-name", s.Name()),
zap.String("purpose", s.member.Leadership.Purpose))
if err := s.member.Leadership.Campaign(s.cfg.LeaderLease, s.member.GetLeaderPath(), s.member.MemberValue()); err != nil {
log.Error("campaign pd leader meet error", zap.Error(err))
log.Error("campaign pd leader meet error", errs.ZapError(errs.ErrCampaignLeader, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this is already a RFC error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It is a local error caused by lease or etcd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need to wrap the third error to RFC error in member package rather than in the server package. if the error is not returned by a third-party package, just print the log directly or return the error directly.

return
}

Expand All @@ -1112,18 +1113,18 @@ func (s *Server) campaignLeader() {

log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
log.Error("failed to initialize the global TSO allocator", errs.ZapError(errs.ErrCreateTSOStream, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz directly use RFC error.

return
}
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
log.Error("failed to reload configuration", errs.ZapError(errs.ErrLoadPersistOptions, err))
return
}
// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", zap.Error(err))
log.Error("failed to create raft cluster", errs.ZapError(errs.ErrCreateCluster, err))
return
}
defer s.stopRaftCluster()
Expand Down Expand Up @@ -1153,7 +1154,7 @@ func (s *Server) campaignLeader() {
}
case <-tsTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
log.Error("failed to update timestamp", errs.ZapError(errs.ErrUpdateTso, err))
return
}
case <-ctx.Done():
Expand Down
Empty file modified server/tso/global_allocator.go
100644 → 100755
Empty file.
Empty file modified server/tso/tso.go
100644 → 100755
Empty file.