Skip to content

Commit

Permalink
Merge pull request #8603 from hashicorp/feature/usage-metrics
Browse files Browse the repository at this point in the history
Track node and service counts in the state store and emit them periodically as metrics
  • Loading branch information
crhino authored Sep 2, 2020
2 parents d0f74cd + 40cbd5a commit 28f163c
Show file tree
Hide file tree
Showing 16 changed files with 849 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .changelog/8603.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
telemetry: track node and service counts and emit them as metrics
```
19 changes: 14 additions & 5 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ type Config struct {
// dead servers.
AutopilotInterval time.Duration

// MetricsReportingInterval is the frequency with which the server will
// report usage metrics to the configured go-metrics Sinks.
MetricsReportingInterval time.Duration

// ConnectEnabled is whether to enable Connect features such as the CA.
ConnectEnabled bool

Expand Down Expand Up @@ -589,11 +593,16 @@ func DefaultConfig() *Config {
},
},

ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,
EnterpriseConfig: DefaultEnterpriseConfig(),
// Stay under the 10 second aggregation interval of
// go-metrics. This ensures we always report the
// usage metrics in each cycle.
MetricsReportingInterval: 9 * time.Second,
ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,

EnterpriseConfig: DefaultEnterpriseConfig(),
}

// Increase our reap interval to 3 days instead of 24h.
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/fsm/snapshot_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, fedState2, fedStateLoaded2)

// Verify usage data is correctly updated
idx, nodeCount, err := fsm2.state.NodeCount()
require.NoError(t, err)
require.Equal(t, len(nodes), nodeCount)
require.NotZero(t, idx)

// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)
Expand Down
14 changes: 14 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
Expand Down Expand Up @@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
return nil, err
}

reporter, err := usagemetrics.NewUsageMetricsReporter(
new(usagemetrics.Config).
WithStateProvider(s.fsm).
WithLogger(s.logger).
WithDatacenter(s.config.Datacenter).
WithReportingInterval(s.config.MetricsReportingInterval),
)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err)
}
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
Expand Down
45 changes: 32 additions & 13 deletions agent/consul/state/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type ReadTxn interface {
Abort()
}

// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
Insert(table string, obj interface{}) error
Commit() error
}

// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
Expand All @@ -24,8 +31,9 @@ type Changes struct {
}

// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
// all write transactions. When the transaction is committed the changes are:
// 1. Used to update our internal usage tracking
// 2. Sent to the eventPublisher which will create and emit change events
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
Expand Down Expand Up @@ -89,17 +97,21 @@ func (c *changeTrackerDB) publish(changes Changes) error {
return nil
}

// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// written across many indexes.
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really
// occur at one index - the effect is to write many values that were previously
// written across many indexes. WriteTxnRestore also does not publish any
// change events to subscribers.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
t := &txn{
Txn: c.db.Txn(true),
Index: 0,
}

// We enable change tracking so that usage data is correctly populated.
t.Txn.TrackChanges()
return t
}

// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
Expand All @@ -125,14 +137,21 @@ type txn struct {
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}

if len(changes.Changes) > 0 {
if err := updateUsage(tx, changes); err != nil {
return err
}
}

// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if err := tx.publish(changes); err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions agent/consul/state/operations_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@ import (
"github.com/hashicorp/go-memdb"
)

func firstWithTxn(tx *txn,
func firstWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) {

return tx.First(table, index, idxVal)
}

func firstWatchWithTxn(tx *txn,
func firstWatchWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {

return tx.FirstWatch(table, index, idxVal)
}

func firstWatchCompoundWithTxn(tx *txn,
func firstWatchCompoundWithTxn(tx ReadTxn,
table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVals...)
}

func getWithTxn(tx *txn,
func getWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

return tx.Get(table, index, idxVal)
}

func getCompoundWithTxn(tx *txn, table, index string,
func getCompoundWithTxn(tx ReadTxn, table, index string,
_ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) {

return tx.Get(table, index, idxVals...)
Expand Down
Loading

0 comments on commit 28f163c

Please sign in to comment.