Skip to content

Commit

Permalink
Add replication metrics (#10073)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler authored Apr 22, 2021
1 parent 8f3223a commit ecbccdc
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
25 changes: 22 additions & 3 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,10 +806,19 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error {
}

if err != nil {
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
0,
)
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
} else {
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
float32(index),
)
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
Expand Down Expand Up @@ -867,23 +876,23 @@ type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteInde
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun())
policyLogger.Info("started ACL Policy replication")
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies)
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies")
}

// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun())
roleLogger.Info("started ACL Role replication")
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles)
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles")
}

// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun())
tokenLogger.Info("started ACL Token replication")
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens)
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens")
}

// This function is only intended to be run as a managed go routine, it will block until
Expand All @@ -893,6 +902,7 @@ func (s *Server) runACLReplicator(
logger hclog.Logger,
replicationType structs.ACLReplicationType,
replicateFunc replicateFunc,
metricName string,
) error {
var failedAttempts uint
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
Expand All @@ -913,6 +923,9 @@ func (s *Server) runACLReplicator(
}

if err != nil {
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
0,
)
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
logger.Warn("ACL replication error (will retry if still leader)",
Expand All @@ -929,6 +942,12 @@ func (s *Server) runACLReplicator(
// do nothing
}
} else {
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", metricName, "index"},
float32(index),
)
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(replicationType, index)
logger.Debug("ACL replication completed through remote index",
Expand Down
20 changes: 20 additions & 0 deletions agent/consul/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (

type ReplicatorDelegate interface {
Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (index uint64, exit bool, err error)
MetricName() string
}

type ReplicatorConfig struct {
Expand Down Expand Up @@ -100,6 +101,9 @@ func (r *Replicator) Run(ctx context.Context) error {
return nil
}
if err != nil {
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
0,
)
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
// the next round of replication
atomic.StoreUint64(&r.lastRemoteIndex, 0)
Expand All @@ -114,6 +118,13 @@ func (r *Replicator) Run(ctx context.Context) error {
continue
}

metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "index"},
float32(index),
)

atomic.StoreUint64(&r.lastRemoteIndex, index)
r.logger.Debug("replication completed through remote index", "index", index)
r.waiter.Reset()
Expand All @@ -128,6 +139,11 @@ type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64, logger hcl

type FunctionReplicator struct {
ReplicateFn ReplicatorFunc
Name string
}

func (r *FunctionReplicator) MetricName() string {
return r.Name
}

func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
Expand Down Expand Up @@ -171,6 +187,10 @@ type IndexReplicator struct {
Logger hclog.Logger
}

func (r *IndexReplicator) MetricName() string {
return r.Delegate.MetricName()
}

func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, _ hclog.Logger) (uint64, bool, error) {
fetchStart := time.Now()
lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex)
Expand Down
1 change: 1 addition & 0 deletions agent/consul/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestReplicationRestart(t *testing.T) {
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
return 1, false, nil
},
Name: "foo",
},

Rate: 1,
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {

configReplicatorConfig := ReplicatorConfig{
Name: logging.ConfigEntry,
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig},
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig, Name: "config-entries"},
Rate: s.config.ConfigReplicationRate,
Burst: s.config.ConfigReplicationBurst,
Logger: s.logger,
Expand Down

0 comments on commit ecbccdc

Please sign in to comment.