Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of feat: set up reporting agent into release/1.15.x #17031

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/consul/acl_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type serverACLResolverBackend struct {
}

func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool {
mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID)
mgmt, err := s.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
if err != nil {
s.logger.Debug("failed to fetch server management token: %w", err)
return false
Expand Down
8 changes: 7 additions & 1 deletion agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ func (s *Server) establishLeadership(ctx context.Context) error {
s.startLogVerification(ctx)
}

if s.config.Reporting.License.Enabled && s.reportingManager != nil {
s.reportingManager.StartReportingAgent()
}

s.logger.Debug("successfully established leadership", "duration", time.Since(start))
return nil
}
Expand Down Expand Up @@ -374,6 +378,8 @@ func (s *Server) revokeLeadership() {
s.resetConsistentReadReady()

s.autopilot.DisableReconciliation()

s.reportingManager.StopReportingAgent()
}

// initializeACLs is used to setup the ACLs if we are the leader
Expand Down Expand Up @@ -522,7 +528,7 @@ func (s *Server) initializeACLs(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err)
}
if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil {
if err := s.SetSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil {
return fmt.Errorf("failed to persist server management token: %w", err)
}

Expand Down
8 changes: 4 additions & 4 deletions agent/consul/leader_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *Server) setVirtualIPFlags() (bool, error) {
}

func (s *Server) setVirtualIPVersionFlag() (bool, error) {
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
val, err := s.GetSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return false, err
}
Expand All @@ -217,15 +217,15 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) {
minVirtualIPVersion.String())
}

if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
if err := s.SetSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
return false, nil
}

return true, nil
}

func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
val, err := s.GetSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
if err != nil {
return false, err
}
Expand All @@ -238,7 +238,7 @@ func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
minVirtualIPTerminatingGatewayVersion.String())
}

if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
if err := s.SetSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
return false, nil
}

Expand Down
4 changes: 2 additions & 2 deletions agent/consul/leader_intentions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error {

// Check for the system metadata first, as that's the most trustworthy in
// both the primary and secondaries.
intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *Server) legacyIntentionMigrationInSecondaryDC(ctx context.Context) erro
// error.
for {
// Check for the system metadata first, as that's the most trustworthy.
intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_intentions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {

// Wait until the migration routine is complete.
retry.Run(t, func(r *retry.R) {
intentionFormat, err := s1.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
intentionFormat, err := s1.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
require.NoError(r, err)
if intentionFormat != structs.SystemMetadataIntentionFormatConfigValue {
r.Fatal("intention migration is not yet complete")
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ func TestLeader_ACL_Initialization(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, policy)

serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID)
serverToken, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
require.NoError(t, err)
require.NotEmpty(t, serverToken)

Expand Down Expand Up @@ -1338,14 +1338,14 @@ func TestLeader_ACL_Initialization_SecondaryDC(t *testing.T) {
testrpc.WaitForTestAgent(t, s2.RPC, "dc2")

// Check dc1's management token
serverToken1, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID)
serverToken1, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
require.NoError(t, err)
require.NotEmpty(t, serverToken1)
_, err = uuid.ParseUUID(serverToken1)
require.NoError(t, err)

// Check dc2's management token
serverToken2, err := s2.getSystemMetadata(structs.ServerManagementTokenAccessorID)
serverToken2, err := s2.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
require.NoError(t, err)
require.NotEmpty(t, serverToken2)
_, err = uuid.ParseUUID(serverToken2)
Expand Down
38 changes: 38 additions & 0 deletions agent/consul/reporting/reporting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package reporting

import (
"sync"

"github.com/hashicorp/go-hclog"
)

type ReportingManager struct {
logger hclog.Logger
server ServerDelegate
EntDeps
sync.RWMutex
}

const (
SystemMetadataReportingProcessID = "reporting-process-id"
)

//go:generate mockery --name ServerDelegate --inpackage
type ServerDelegate interface {
GetSystemMetadata(key string) (string, error)
SetSystemMetadataKey(key, val string) error
IsLeader() bool
}

func NewReportingManager(logger hclog.Logger, deps EntDeps, server ServerDelegate) *ReportingManager {
rm := &ReportingManager{
logger: logger.Named("reporting"),
server: server,
}
err := rm.initEnterpriseReporting(deps)
if err != nil {
rm.logger.Error("Error initializing reporting manager", "error", err)
return nil
}
return rm
}
21 changes: 21 additions & 0 deletions agent/consul/reporting/reporting_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//go:build !consulent
// +build !consulent

package reporting

type EntDeps struct{}

func (rm *ReportingManager) initEnterpriseReporting(entDeps EntDeps) error {
// no op
return nil
}

func (rm *ReportingManager) StartReportingAgent() error {
// no op
return nil
}

func (rm *ReportingManager) StopReportingAgent() error {
// no op
return nil
}
6 changes: 6 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/multilimiter"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/reporting"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
Expand Down Expand Up @@ -409,6 +410,9 @@ type Server struct {
// embedded struct to hold all the enterprise specific data
EnterpriseServer
operatorServer *operator.Server

// handles metrics reporting to HashiCorp
reportingManager *reporting.ReportingManager
}

type connHandler interface {
Expand Down Expand Up @@ -728,6 +732,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s)

// Initialize external gRPC server - register services on external gRPC server.
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/server_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/grpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/reporting"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
Expand Down Expand Up @@ -178,3 +179,8 @@ func addSerfMetricsLabels(conf *serf.Config, wan bool, segment string, partition
func (s *Server) updateReportingConfig(config ReloadableConfig) {
// no-op
}

func getEnterpriseReportingDeps(deps Deps) reporting.EntDeps {
// no-op
return reporting.EntDeps{}
}
4 changes: 2 additions & 2 deletions agent/consul/system_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
)

func (s *Server) getSystemMetadata(key string) (string, error) {
func (s *Server) GetSystemMetadata(key string) (string, error) {
_, entry, err := s.fsm.State().SystemMetadataGet(nil, key)
if err != nil {
return "", err
Expand All @@ -16,7 +16,7 @@ func (s *Server) getSystemMetadata(key string) (string, error) {
return entry.Value, nil
}

func (s *Server) setSystemMetadataKey(key, val string) error {
func (s *Server) SetSystemMetadataKey(key, val string) error {
args := &structs.SystemMetadataRequest{
Op: structs.SystemMetadataUpsert,
Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
Expand Down
8 changes: 4 additions & 4 deletions agent/consul/system_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
require.Len(t, entries, 0)

// Create 3
require.NoError(t, srv.setSystemMetadataKey("key1", "val1"))
require.NoError(t, srv.setSystemMetadataKey("key2", "val2"))
require.NoError(t, srv.setSystemMetadataKey("key3", ""))
require.NoError(t, srv.SetSystemMetadataKey("key1", "val1"))
require.NoError(t, srv.SetSystemMetadataKey("key2", "val2"))
require.NoError(t, srv.SetSystemMetadataKey("key3", ""))

mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
m := make(map[string]string)
Expand All @@ -62,7 +62,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
}, mapify(entries))

// Update one and delete one.
require.NoError(t, srv.setSystemMetadataKey("key3", "val3"))
require.NoError(t, srv.SetSystemMetadataKey("key3", "val3"))
require.NoError(t, srv.deleteSystemMetadataKey("key1"))

_, entries, err = state.SystemMetadataList(nil)
Expand Down