Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoandredinis committed Oct 15, 2024
1 parent 1ee8889 commit 5c59d0f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 7 deletions.
9 changes: 9 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,15 @@ const (
// NotificationUserCreatedWarningSubKind is the subkind for a user-created warning notification.
NotificationUserCreatedWarningSubKind = "user-created-warning"

// NotificationUserTaskIntegrationSubKind is the subkind for a notification that warns the user about pending User Tasks for a given integration.
NotificationUserTaskIntegrationSubKind = "user-task-integration"
// NotificationIntegrationSubKindLabel is the label which contains the subkind of the integration.
// To be used with NotificationUserTaskIntegrationSubKind.
NotificationIntegrationSubKindLabel = "integration-sub-kind"
// NotificationIntegrationLabel is the label which contains the name of the integration.
// To be used with NotificationUserTaskIntegrationSubKind.
NotificationIntegrationLabel = "integration-name"

// NotificationAccessRequestPendingSubKind is the subkind for a notification for an access request pending review.
NotificationAccessRequestPendingSubKind = "access-request-pending"
// NotificationAccessRequestApprovedSubKind is the subkind for a notification for a user's access request being approved.
Expand Down
17 changes: 17 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
Expand Down Expand Up @@ -809,6 +810,12 @@ type DiscoveryAccessPoint interface {

// UpsertUserTask creates or updates an User Task
UpsertUserTask(ctx context.Context, req *usertasksv1.UserTask) (*usertasksv1.UserTask, error)

// CreateGlobalNotification creates a global notification.
CreateGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error)

// DeleteGlobalNotification deletes a global notification.
DeleteGlobalNotification(ctx context.Context, notificationId string) error
}

// ReadOktaAccessPoint is a read only API interface to be
Expand Down Expand Up @@ -1459,6 +1466,16 @@ func (w *DiscoveryWrapper) UpsertUserTask(ctx context.Context, req *usertasksv1.
return w.NoCache.UpsertUserTask(ctx, req)
}

// CreateGlobalNotification upserts a global notification.
func (w *DiscoveryWrapper) CreateGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error) {
return w.NoCache.CreateGlobalNotification(ctx, globalNotification)
}

// CreateGlobalNotification upserts a global notification.
func (w *DiscoveryWrapper) DeleteGlobalNotification(ctx context.Context, notificationId string) error {
return w.NoCache.DeleteGlobalNotification(ctx, notificationId)
}

// Close closes all associated resources
func (w *DiscoveryWrapper) Close() error {
err := w.NoCache.Close()
Expand Down
1 change: 1 addition & 0 deletions lib/authz/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ func definitionForBuiltinRole(clusterName string, recConfig readonly.SessionReco
types.NewRule(types.KindIntegration, append(services.RO(), types.VerbUse)),
types.NewRule(types.KindSemaphore, services.RW()),
types.NewRule(types.KindUserTask, services.RW()),
types.NewRule(types.KindNotification, services.RW()),
},
// Discovery service should only access kubes/apps/dbs that originated from discovery.
KubernetesLabels: types.Labels{types.OriginLabel: []string{types.OriginCloud}},
Expand Down
74 changes: 67 additions & 7 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1"
headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -429,7 +431,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error {
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance, taskExpiration time.Time) error {
userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
Expand All @@ -453,10 +455,6 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta
failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances)
}

// If the DiscoveryService is stopped, or the issue does not happen again
// the task is removed to prevent users from working on issues that are no longer happening.
taskExpiration := s.clock.Now().Add(2 * s.PollInterval)

task, err := usertasks.NewDiscoverEC2UserTask(
&usertasksv1.UserTaskSpec{
Integration: taskGroup.integration,
Expand Down Expand Up @@ -515,17 +513,79 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
continue
}

if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil {
// If the DiscoveryService is stopped, or the issue does not happen again
// the task is removed to prevent users from working on issues that are no longer happening.
taskExpiration := s.clock.Now().Add(2 * s.PollInterval)

if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID, taskExpiration); err != nil {
s.Log.WithError(err).WithFields(logrus.Fields{
"integration": g.integration,
"issue_type": g.issueType,
"aws_account_id": g.accountID,
"aws_region": g.region,
},
).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region)
).Warning("Failed to create discover ec2 user task.")
continue
}

delete(s.awsEC2Tasks.issuesSyncQueue, g)

if err := s.notifyUserAboutPendingTasksForIntegration(g.integration, taskExpiration); err != nil {
s.Log.WithError(err).WithFields(logrus.Fields{
"integration": g.integration,
"issue_type": g.issueType,
"aws_account_id": g.accountID,
"aws_region": g.region,
},
).Warning("Failed to send notification for UserTask.")
continue
}
}
}

func (s *Server) notifyUserAboutPendingTasksForIntegration(integrationName string, expires time.Time) error {
// check if integration status already has a notification uuid
integration, err := s.AccessPoint.GetIntegration(s.ctx, integrationName)
if err != nil {
return trace.Wrap(err)
}

// TODO(marco): add status.pending_tasks_notification_uuid so that we can check if there's an active Notification
// if there is, delete it
// Then create a new one.
notificationID := integration.GetAllLabels()["notificationuuid"]
if notificationID != "" {

if err := s.AccessPoint.DeleteGlobalNotification(s.ctx, notificationID); err != nil {
return trace.Wrap(err)
}
}

_, err = s.AccessPoint.CreateGlobalNotification(s.ctx, &notificationsv1.GlobalNotification{
Spec: &notificationsv1.GlobalNotificationSpec{
Matcher: &notificationsv1.GlobalNotificationSpec_ByPermissions{
ByPermissions: &notificationsv1.ByPermissions{
RoleConditions: []*types.RoleConditions{{
Rules: []types.Rule{{
Resources: []string{types.KindIntegration},
Verbs: []string{types.VerbList, types.VerbRead},
}},
}},
},
},
Notification: &notificationsv1.Notification{
Spec: &notificationsv1.NotificationSpec{},
SubKind: types.NotificationAccessRequestPendingSubKind,
Metadata: &headerv1.Metadata{
Labels: map[string]string{
types.NotificationTitleLabel: "Your integration needs attention.",
types.NotificationIntegrationLabel: integrationName,
types.NotificationIntegrationSubKindLabel: types.IntegrationSubKindAWSOIDC,
},
Expires: timestamppb.New(expires),
},
},
},
})
return trace.Wrap(err)
}

0 comments on commit 5c59d0f

Please sign in to comment.