Skip to content

Commit

Permalink
DiscoveryConfigStatus: update even when no resource is found
Browse files Browse the repository at this point in the history
During Auto Discover, when using a DiscoveryConfig, if no resources are
found, the DiscoveryConfigStatus is not updated accordingly.

This PR ensures that, even when no resources are found, the status will
report so.
  • Loading branch information
marcoandredinis committed Dec 19, 2024
1 parent ba35baa commit 0b9f6d7
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 51 deletions.
44 changes: 33 additions & 11 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,7 @@ func (s *Server) startDatabaseWatchers() error {
Origin: types.OriginCloud,
Clock: s.clock,
PreFetchHookFn: func() {
discoveryConfigs := slices.FilterMapUnique(
s.getAllDatabaseFetchers(),
func(f common.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
s.databaseWatcherIterationStarted()
},
},
)
Expand Down Expand Up @@ -151,6 +143,38 @@ func (s *Server) startDatabaseWatchers() error {
return nil
}

func (s *Server) databaseWatcherIterationStarted() {
allFetchers := s.getAllDatabaseFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchersEvent(allFetchers)

awsResultGroups := slices.FilterMapUnique(
allFetchers,
func(f common.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)

for _, g := range awsResultGroups {
s.awsRDSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := slices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
}

func (s *Server) getAllDatabaseFetchers() []common.Fetcher {
allFetchers := make([]common.Fetcher, 0, len(s.databaseFetchers))

Expand All @@ -162,8 +186,6 @@ func (s *Server) getAllDatabaseFetchers() []common.Fetcher {

allFetchers = append(allFetchers, s.databaseFetchers...)

s.submitFetchersEvent(allFetchers)

return allFetchers
}

Expand Down
47 changes: 33 additions & 14 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getAllAWSServerFetchers(),
func(f server.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEC2ResourcesStatus.reset()
s.awsEC2Tasks.reset()
s.ec2WatcherIterationStarted()
}),
)
if err != nil {
Expand Down Expand Up @@ -575,6 +566,38 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
return nil
}

func (s *Server) ec2WatcherIterationStarted() {
allFetchers := s.getAllAWSServerFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2)

awsResultGroups := libslices.FilterMapUnique(
allFetchers,
func(f server.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEC2ResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)
s.awsEC2ResourcesStatus.reset()

s.awsEC2Tasks.reset()
}

func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error {
if len(matchers) == 0 {
return nil
Expand Down Expand Up @@ -1483,10 +1506,6 @@ func (s *Server) getAllAWSServerFetchers() []server.Fetcher {

allFetchers = append(allFetchers, s.staticServerAWSFetchers...)

if len(allFetchers) > 0 {
s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2)
}

return allFetchers
}

Expand Down
25 changes: 25 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,31 @@ func TestDiscoveryServer(t *testing.T) {
},
wantInstalledInstances: []string{"instance-id-1"},
},
{
name: "no nodes found using DiscoveryConfig and Integration, but DiscoveryConfig Status is still updated",
presentInstances: []types.Server{},
foundEC2Instances: []ec2types.Instance{},
ssm: &mockSSMClient{},
emitter: &mockEmitter{},
staticMatchers: Matchers{},
discoveryConfig: dcForEC2SSMWithIntegration,
wantDiscoveryConfigStatus: &discoveryconfig.Status{
State: "DISCOVERY_CONFIG_STATE_SYNCING",
ErrorMessage: nil,
DiscoveredResources: 0,
LastSyncTime: fakeClock.Now().UTC(),
IntegrationDiscoveredResources: map[string]*discoveryconfigv1.IntegrationDiscoveredSummary{
"my-integration": {
AwsEc2: &discoveryconfigv1.ResourcesDiscoveredSummary{
Found: 0,
Enrolled: 0,
Failed: 0,
},
},
},
},
wantInstalledInstances: []string{},
},
{
name: "one node found but SSM Run fails and DiscoverEC2 User Task is created",
presentInstances: []types.Server{},
Expand Down
43 changes: 33 additions & 10 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,7 @@ func (s *Server) startKubeIntegrationWatchers() error {
Origin: types.OriginCloud,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
PreFetchHookFn: func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getKubeIntegrationFetchers(),
func(f common.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
s.awsEKSTasks.reset()
s.kubernetesIntegrationWatcherIterationStarted()
},
})
if err != nil {
Expand Down Expand Up @@ -194,6 +185,38 @@ func (s *Server) startKubeIntegrationWatchers() error {
return nil
}

func (s *Server) kubernetesIntegrationWatcherIterationStarted() {
allFetchers := s.getKubeIntegrationFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchersEvent(allFetchers)

awsResultGroups := libslices.FilterMapUnique(
allFetchers,
func(f common.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEKSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
s.awsEKSTasks.reset()
}

func (s *Server) enrollEKSClusters(region, integration, discoveryConfigName string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) {
mu.Lock()
for _, c := range clusters {
Expand Down
9 changes: 9 additions & 0 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ func (ars *awsResourcesStatus) incrementFailed(g awsResourceGroup, count int) {
ars.awsResourcesResults[g] = groupStats
}

func (ars *awsResourcesStatus) iterationStarted(g awsResourceGroup) {
ars.mu.Lock()
defer ars.mu.Unlock()
if ars.awsResourcesResults == nil {
ars.awsResourcesResults = make(map[awsResourceGroup]awsResourceGroupResult)
}
ars.awsResourcesResults[g] = awsResourceGroupResult{}
}

func (ars *awsResourcesStatus) incrementFound(g awsResourceGroup, count int) {
ars.mu.Lock()
defer ars.mu.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions lib/srv/server/azure_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type azureFetcherConfig struct {
ResourceGroup string
AzureClientGetter azureClientGetter
DiscoveryConfigName string
Integration string
}

type azureInstanceFetcher struct {
Expand All @@ -132,6 +133,7 @@ type azureInstanceFetcher struct {
Parameters map[string]string
ClientID string
DiscoveryConfigName string
Integration string
}

func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher {
Expand All @@ -142,6 +144,7 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher {
ResourceGroup: cfg.ResourceGroup,
Labels: cfg.Matcher.ResourceTags,
DiscoveryConfigName: cfg.DiscoveryConfigName,
Integration: cfg.Integration,
}

if cfg.Matcher.Params != nil {
Expand All @@ -164,6 +167,12 @@ func (f *azureInstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *azureInstanceFetcher) IntegrationName() string {
return f.Integration
}

// GetInstances fetches all Azure virtual machines matching configured filters.
func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) {
client, err := f.AzureClientGetter.GetAzureVirtualMachinesClient(f.Subscription)
Expand Down
6 changes: 6 additions & 0 deletions lib/srv/server/ec2_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,9 @@ func (f *ec2InstanceFetcher) GetInstances(ctx context.Context, rotation bool) ([
func (f *ec2InstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *ec2InstanceFetcher) IntegrationName() string {
return f.Integration
}
22 changes: 16 additions & 6 deletions lib/srv/server/gcp_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type gcpFetcherConfig struct {
GCPClient gcp.InstancesClient
projectsClient gcp.ProjectsClient
DiscoveryConfigName string
Integration string
}

type gcpInstanceFetcher struct {
Expand All @@ -123,16 +124,19 @@ type gcpInstanceFetcher struct {
Parameters map[string]string
projectsClient gcp.ProjectsClient
DiscoveryConfigName string
Integration string
}

func newGCPInstanceFetcher(cfg gcpFetcherConfig) *gcpInstanceFetcher {
fetcher := &gcpInstanceFetcher{
GCP: cfg.GCPClient,
Zones: cfg.Matcher.Locations,
ProjectIDs: cfg.Matcher.ProjectIDs,
ServiceAccounts: cfg.Matcher.ServiceAccounts,
Labels: cfg.Matcher.GetLabels(),
projectsClient: cfg.projectsClient,
GCP: cfg.GCPClient,
Zones: cfg.Matcher.Locations,
ProjectIDs: cfg.Matcher.ProjectIDs,
ServiceAccounts: cfg.Matcher.ServiceAccounts,
Labels: cfg.Matcher.GetLabels(),
projectsClient: cfg.projectsClient,
Integration: cfg.Integration,
DiscoveryConfigName: cfg.DiscoveryConfigName,
}
if cfg.Matcher.Params != nil {
fetcher.Parameters = map[string]string{
Expand All @@ -152,6 +156,12 @@ func (f *gcpInstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *gcpInstanceFetcher) IntegrationName() string {
return f.Integration
}

// GetInstances fetches all GCP virtual machines matching configured filters.
func (f *gcpInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) {
// Key by project ID, then by zone.
Expand Down
3 changes: 3 additions & 0 deletions lib/srv/server/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Fetcher interface {
// GetDiscoveryConfigName returns the DiscoveryConfig name that created this fetcher.
// Empty for Fetchers created from `teleport.yaml/discovery_service.aws.<Matcher>` matchers.
GetDiscoveryConfigName() string
// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
IntegrationName() string
}

// WithTriggerFetchC sets a poll trigger to manual start a resource polling.
Expand Down
23 changes: 13 additions & 10 deletions lib/utils/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@

package slices

import (
"cmp"
"slices"
)

// FilterMapUnique applies a function to all elements of a slice and collects them.
// The function returns the value to collect and whether the current element should be included.
// Returned values are sorted and deduplicated.
func FilterMapUnique[T any, S cmp.Ordered](ts []T, fn func(T) (s S, include bool)) []S {
ss := make([]S, 0, len(ts))
func FilterMapUnique[T any, S any](ts []T, fn func(T) (s S, include bool)) []S {
ss := make([]S, 0)
seen := make(map[key[S]]struct{})
for _, t := range ts {
if s, include := fn(t); include {
ss = append(ss, s)
if _, ok := seen[key[S]{val: s}]; !ok {
seen[key[S]{val: s}] = struct{}{}
ss = append(ss, s)
}
}
}
slices.Sort(ss)
return slices.Compact(ss)

return ss
}

type key[S any] struct {
val any
}

0 comments on commit 0b9f6d7

Please sign in to comment.