Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Live resource view #267

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ setup-e2e2:
.PHONY: start-e2e2
start-e2e2:
hack/dev-env/gen-creds.sh
hack/dev-env/gen-tls.sh
goreman -f hack/dev-env/Procfile.e2e start

.PHONY: test-e2e2
Expand Down
19 changes: 10 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ type Agent struct {
emitter *event.EventSource
// At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback.
watchLock sync.RWMutex
version *version.Version

eventWriter *event.EventWriter
version *version.Version
kubeClient *kube.KubernetesClient
}

const defaultQueueName = "default"
Expand Down Expand Up @@ -104,12 +105,12 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
}
}

appclient := client.ApplicationsClientset

if a.remote == nil {
return nil, fmt.Errorf("remote not defined")
}

a.kubeClient = client

// Initial state of the agent is disconnected
a.connected.Store(false)

Expand All @@ -132,10 +133,10 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri

// appListFunc and watchFunc are anonymous functions for the informer
appListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
return appclient.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts)
}
appWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return appclient.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts)
}

appInformerOptions := []informer.InformerOption[*v1alpha1.Application]{
Expand Down Expand Up @@ -165,10 +166,10 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
}

projListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts)
}
projWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts)
return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts)
}

projInformerOptions := []informer.InformerOption[*v1alpha1.AppProject]{
Expand All @@ -183,7 +184,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri

// The agent only supports Kubernetes as application backend
a.appManager, err = application.NewApplicationManager(
kubeapp.NewKubernetesBackend(appclient, a.namespace, appInformer, true),
kubeapp.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, appInformer, true),
a.namespace,
application.WithAllowUpsert(allowUpsert),
application.WithRole(manager.ManagerRoleAgent),
Expand All @@ -194,7 +195,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
}

a.projectManager, err = appproject.NewAppProjectManager(
kubeappproject.NewKubernetesBackend(appclient, a.namespace, projInformer, true),
kubeappproject.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, projInformer, true),
a.namespace,
appProjectManagerOption...)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,18 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
logCtx.Tracef("Queue shutdown in progress")
return nil
}
logCtx.Tracef("Grabbed an item")
logCtx.Trace("Grabbed an item")
if ev == nil {
// TODO: Is this really the right thing to do?
return nil
}

logCtx.WithField("resource_id", event.ResourceID(ev)).WithField("event_id", event.EventID(ev)).Trace("Adding an event to the event writer")
logCtx = logCtx.WithFields(logrus.Fields{
"event_target": ev.DataSchema(),
"event_type": ev.Type(),
"resource_id": event.ResourceID(ev),
"event_id": event.EventID(ev),
})
logCtx.Trace("Adding an event to the event writer")
a.eventWriter.Add(ev)

return nil
Expand Down
110 changes: 108 additions & 2 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@
package agent

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/argoproj-labs/argocd-agent/internal/backend"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

/*
Expand All @@ -32,20 +39,109 @@ Inbound events are those coming through our gRPC interface, e.g. those that
were received from a server.
*/

const defaultResourceRequestTimeout = 5 * time.Second

func (a *Agent) processIncomingEvent(ev *event.Event) error {
var err error
switch ev.Target() {
case event.TargetApplication:
err = a.processIncomingApplication(ev)
case event.TargetAppProject:
err = a.processIncomingAppProject(ev)
case event.TargetResource:
err = a.processIncomingResourceRequest(ev)
default:
err = fmt.Errorf("unknown event target: %s", ev.Target())
}

return err
}

// processIncomingResourceRequest processes an incoming event that requests
// to retrieve information from the Kubernetes API.
//
// There can be multiple forms of requests. Currently supported are:
//
// - Request for a particular resource, both namespace and cluster scoped
// - Request for a list of resources of a particular kind (e.g. configmaps,
// pods, etc), both namespace and custer scoped
// - Request for a list of available APIs and

func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
rreq, err := ev.ResourceRequest()
if err != nil {
return err
}
logCtx := log().WithFields(logrus.Fields{
"method": "processIncomingEvents",
"uuid": rreq.UUID,
})
logCtx.Tracef("Start processing %v", rreq)

// TODO(jannfis): The connection to fetch resources should support some
jannfis marked this conversation as resolved.
Show resolved Hide resolved
// form of impersonation in the future.

// Create a dynamic kubernetes client and retrieve the resource from the
// cluster.
dynClient, err := dynamic.NewForConfig(a.kubeClient.RestConfig)
if err != nil {
return fmt.Errorf("could not create a dynamic client: %w", err)
}

// Some of GVR may be empty, and that is ok.
gvk := schema.GroupVersionResource{Group: rreq.Group, Version: rreq.Version, Resource: rreq.Resource}
rif := dynClient.Resource(gvk)

ctx, cancel := context.WithTimeout(a.context, defaultResourceRequestTimeout)
defer cancel()

var jsonres []byte
var unres *unstructured.Unstructured
var unlist *unstructured.UnstructuredList
var status error

// If we have a request for a named resource, we fetch that particular
// resource. If the name is empty, we fetch a list of resources instead.
if rreq.Name != "" {
if rreq.Namespace != "" {
unres, err = rif.Namespace(rreq.Namespace).Get(ctx, rreq.Name, v1.GetOptions{})
} else {
unres, err = rif.Get(ctx, rreq.Name, v1.GetOptions{})
}
} else {
if rreq.Namespace != "" {
unlist, err = rif.Namespace(rreq.Namespace).List(ctx, v1.ListOptions{})
} else {
unlist, err = rif.List(ctx, v1.ListOptions{})
}
}
if err != nil {
logCtx.Errorf("could not request resource: %v", err)
status = err
Comment on lines +118 to +120
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the err here is not nil, based on my understanding, we were not able to fetch any resources. Should we be returning the error from here itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. In my thought process, it is not an error condition for processIncomingResourceRequest itself when we could not fetch the resource from the local cluster. We rather need to let the other side know, that resource fetching failed (and the reason, e.g. resource not found or access forbidden).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Continuing on the same scenario, would the json marshaling (line 129 below) be considered more of an internal error or should we be returning that to the caller as well? My thought is that, we might loose sending the response in scenarios where we find errors in json marshaling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, and one that could be asked about any error condition in that particular method.

For now, I think the answer is: We want to transport eventual status codes from the communication between the agent and Kubernetes API back to the principal, but nothing else. This is because the principal can then pass that status code on to the consumer of the resource proxy.

We could go ahead and send HTTP 500 status codes back to the principal whenever something goes wrong in processIncomingResourceRequest, then the question is, should we? I have not yet found a good answer to that question. The principal will notice that no reply has been sent from the agent anyway, and will let its consumer know.

} else {
// Marshal the unstructured resource to JSON for submission
if unres != nil {
jsonres, err = json.Marshal(unres)
} else if unlist != nil {
jsonres, err = json.Marshal(unlist)
}
if err != nil {
return fmt.Errorf("could not marshal resource to json: %w", err)
}
logCtx.Tracef("marshaled resource")
}

q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
logCtx.Error("Remote queue disappeared")
return nil
}
q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, event.HttpStatusFromError(status), string(jsonres)))
logCtx.Tracef("Emitted resource response")

return nil
}

func (a *Agent) processIncomingApplication(ev *event.Event) error {
logCtx := log().WithFields(logrus.Fields{
"method": "processIncomingEvents",
Expand Down Expand Up @@ -207,7 +303,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// In modes other than "managed", we don't process new application events
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
logCtx.Info("Discarding this event, because agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode")
}

Expand All @@ -216,7 +312,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
//
// TODO(jannfis): Handle this situation properly instead of throwing an error.
if a.appManager.IsManaged(incoming.QualifiedName()) {
logCtx.Trace("Discarding this event, because application is already managed on this agent")
logCtx.Info("Discarding this event, because application is already managed on this agent")
return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName())
}

Expand All @@ -228,6 +324,11 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")
}

// Set target cluster to a sensible value
jgwest marked this conversation as resolved.
Show resolved Hide resolved
// TODO(jannfis): Make this actually configurable per agent
incoming.Spec.Destination.Server = ""
incoming.Spec.Destination.Name = "in-cluster"

created, err := a.appManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("application already exists")
Expand All @@ -252,6 +353,11 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}

// Set target cluster to a sensible value
// TODO(jannfis): Make this actually configurable per agent
incoming.Spec.Destination.Server = ""
incoming.Spec.Destination.Name = "in-cluster"

logCtx.Infof("Updating application")

var err error
Expand Down
32 changes: 16 additions & 16 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strings"

"github.com/argoproj-labs/argocd-agent/agent"
"github.com/argoproj-labs/argocd-agent/cmd/cmd"
"github.com/argoproj-labs/argocd-agent/cmd/cmdutil"
"github.com/argoproj-labs/argocd-agent/internal/auth"
"github.com/argoproj-labs/argocd-agent/internal/auth/userpass"
"github.com/argoproj-labs/argocd-agent/internal/env"
Expand Down Expand Up @@ -56,7 +56,7 @@ func NewAgentRunCommand() *cobra.Command {
Short: "Run the argocd-agent agent component",
Run: func(c *cobra.Command, args []string) {
if showVersion {
cmd.PrintVersion(version.New("argocd-agent", "agent"), versionFormat)
cmdutil.PrintVersion(version.New("argocd-agent", "agent"), versionFormat)
os.Exit(0)
}
ctx, cancelFn := context.WithCancel(context.Background())
Expand All @@ -69,21 +69,21 @@ func NewAgentRunCommand() *cobra.Command {
logLevel = "info"
}
if logLevel != "" {
lvl, err := cmd.StringToLoglevel(logLevel)
lvl, err := cmdutil.StringToLoglevel(logLevel)
if err != nil {
cmd.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmd.AvailableLogLevels())
cmdutil.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmdutil.AvailableLogLevels())
}
logrus.SetLevel(lvl)
}
if formatter, err := cmd.LogFormatter(logFormat); err != nil {
cmd.Fatal("%s", err.Error())
if formatter, err := cmdutil.LogFormatter(logFormat); err != nil {
cmdutil.Fatal("%s", err.Error())
} else {
logrus.SetFormatter(formatter)
}
if creds != "" {
authMethod, authCreds, err := parseCreds(creds)
if err != nil {
cmd.Fatal("Error setting up creds: %v", err)
cmdutil.Fatal("Error setting up creds: %v", err)
}
remoteOpts = append(remoteOpts, client.WithAuth(authMethod, authCreds))
}
Expand All @@ -102,29 +102,29 @@ func NewAgentRunCommand() *cobra.Command {
if serverAddress != "" && serverPort > 0 && serverPort < 65536 {
remote, err = client.NewRemote(serverAddress, serverPort, remoteOpts...)
if err != nil {
cmd.Fatal("Error creating remote: %v", err)
cmdutil.Fatal("Error creating remote: %v", err)
}
}
if remote == nil {
cmd.Fatal("No remote specified")
cmdutil.Fatal("No remote specified")
}

if namespace == "" {
cmd.Fatal("namespace value is empty and must be specified")
cmdutil.Fatal("namespace value is empty and must be specified")
}

kubeConfig, err := cmd.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext)
kubeConfig, err := cmdutil.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext)
if err != nil {
cmd.Fatal("Could not load Kubernetes config: %v", err)
cmdutil.Fatal("Could not load Kubernetes config: %v", err)
}
agentOpts = append(agentOpts, agent.WithRemote(remote))
agentOpts = append(agentOpts, agent.WithMode(agentMode))
ag, err := agent.NewAgent(ctx, kubeConfig, namespace, agentOpts...)
if err != nil {
cmd.Fatal("Could not create a new agent instance: %v", err)
cmdutil.Fatal("Could not create a new agent instance: %v", err)
}
if err := ag.Start(ctx); err != nil {
cmd.Fatal("Could not start agent: %v", err)
cmdutil.Fatal("Could not start agent: %v", err)
}
<-ctx.Done()
},
Expand Down Expand Up @@ -215,10 +215,10 @@ func loadCreds(path string) (auth.Credentials, error) {
}

func main() {
cmd.InitLogging()
cmdutil.InitLogging()
c := NewAgentRunCommand()
err := c.Execute()
if err != nil {
cmd.Fatal("ERROR: %v", err)
cmdutil.Fatal("ERROR: %v", err)
}
}
Loading
Loading