From 262d0e2cc81698cfd06ca3cd29a978239e1cc9d0 Mon Sep 17 00:00:00 2001 From: jannfis Date: Tue, 7 Jan 2025 20:30:43 +0000 Subject: [PATCH] Add tests Signed-off-by: jannfis --- agent/inbound.go | 18 ++++-- internal/event/event.go | 32 +++++++++-- principal/resource.go | 33 +++++++---- principal/resource_test.go | 113 +++++++++++++++++++++++++++++++++++++ principal/server.go | 2 +- 5 files changed, 176 insertions(+), 22 deletions(-) create mode 100644 principal/resource_test.go diff --git a/agent/inbound.go b/agent/inbound.go index 1b75126..94a3bb9 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -78,6 +78,9 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { }) logCtx.Tracef("Start processing %v", rreq) + // TODO(jannfis): The connection to fetch resources should support some + // form of impersonation in the future. + // Create a dynamic kubernetes client and retrieve the resource from the // cluster. dynClient, err := dynamic.NewForConfig(a.kubeClient.RestConfig) @@ -85,7 +88,7 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { return fmt.Errorf("could not create a dynamic client: %w", err) } - // Some of GVR may be empty, that's ok + // Some of GVR may be empty, and that is ok. gvk := schema.GroupVersionResource{Group: rreq.Group, Version: rreq.Version, Resource: rreq.Resource} rif := dynClient.Resource(gvk) @@ -95,7 +98,10 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { var jsonres []byte var unres *unstructured.Unstructured var unlist *unstructured.UnstructuredList - status := "" + var status error + + // If we have a request for a named resource, we fetch that particular + // resource. If the name is empty, we fetch a list of resources instead. if rreq.Name != "" { if rreq.Namespace != "" { unres, err = rif.Namespace(rreq.Namespace).Get(ctx, rreq.Name, v1.GetOptions{}) @@ -111,7 +117,7 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { } if err != nil { logCtx.Errorf("could not request resource: %v", err) - status = "failure" + status = err } else { // Marshal the unstructured resource to JSON for submission if unres != nil { @@ -126,7 +132,11 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { } q := a.queues.SendQ(a.remote.ClientID()) - q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, status, string(jsonres))) + if q == nil { + logCtx.Error("Remote queue disappeared") + return nil + } + q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, event.HttpStatusFromError(status), string(jsonres))) logCtx.Tracef("Emitted resource response") return nil diff --git a/internal/event/event.go b/internal/event/event.go index 6f67d8c..561ea06 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "net/http" "sync" "time" @@ -26,6 +27,7 @@ import ( "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/google/uuid" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -159,19 +161,39 @@ func (evs EventSource) AppProjectEvent(evType EventType, appProject *v1alpha1.Ap return &cev } +// ResourceRequest is an event that holds a request for a resource. It is +// usually emitted from the resource proxy, and is sent from the principal +// to an agent. type ResourceRequest struct { - UUID string `json:"uuid"` + // A unique UUID for this request + UUID string `json:"uuid"` + // Namespace of the requested resource Namespace string `json:"namespace,omitempty"` - Name string `json:"name"` + // Name of the requested resource + Name string `json:"name"` + // The group and version of the requested resource v1.GroupVersionResource } +// ResourceResponse is an event that holds the response to a resource request. +// It is usually sent by an agent to the princiapl in response to a prior +// resource request. type ResourceResponse struct { - UUID string `json:"uuid"` - Status string `json:"status"` + // UUID is the unique ID of the request this response is targeted at + UUID string `json:"uuid"` + // Status contains the HTTP status of the request + Status int `json:"status"` + // Resource is the body of the requested resource Resource string `json:"resource,omitempty"` } +func HttpStatusFromError(err error) int { + if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) { + return int(status.Status().Code) + } + return http.StatusOK +} + // NewResourceRequestEvent creates a cloud event for requesting a resource from // an agent. The resource will be specified by the GroupVersionResource id in // gvr, and its name and namespace. If namespace is empty, the request will @@ -198,7 +220,7 @@ func (evs EventSource) NewResourceRequestEvent(gvr v1.GroupVersionResource, name return &cev, nil } -func (evs EventSource) NewResourceResponseEvent(reqUUID string, status string, data string) *cloudevents.Event { +func (evs EventSource) NewResourceResponseEvent(reqUUID string, status int, data string) *cloudevents.Event { resUUID := uuid.NewString() rr := &ResourceResponse{ UUID: reqUUID, diff --git a/principal/resource.go b/principal/resource.go index 45d0a9b..751e221 100644 --- a/principal/resource.go +++ b/principal/resource.go @@ -32,6 +32,8 @@ const resourceRequestRegexp = `^/(?:api|apis/(?P[^\/]+))/(?Pv[^\ // requestTimeout is the timeout that's being applied to requests for any live // resource. +// +// TODO(jannfis): Make the timeout configurable const requestTimeout = 10 * time.Second // resourceRequester is being executed by the resource proxy once it received a @@ -42,9 +44,10 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param logCtx := log().WithField("function", "resourceRequester") // Make sure our request carries a client certificate - if len(r.TLS.PeerCertificates) < 1 { + if r.TLS == nil || len(r.TLS.PeerCertificates) < 1 { logCtx.Errorf("Unauthenticated request from client %s", r.RemoteAddr) - w.WriteHeader(http.StatusUnauthorized) + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("no authorization found")) return } @@ -54,18 +57,19 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param // usually configured by us. cert := r.TLS.PeerCertificates[0] - // Make sure the agent name in the certificate is good + // Make sure the agent name in the certificate is properly formatted agentName := cert.Subject.CommonName errs := validation.NameIsDNSLabel(agentName, false) if len(errs) > 0 { logCtx.Errorf("CRITICAL: Invalid agent name in client certificate: %v", errs) - w.WriteHeader(http.StatusUnauthorized) + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("invalid client certificate")) return } logCtx = logCtx.WithField("agent", cert.Subject.CommonName) - // Agent is not connected. Return early. + // If the agent is not connected, return early if !s.queues.HasQueuePair(agentName) { logCtx.Debugf("Agent is not connected, stop proxying") w.WriteHeader(http.StatusBadGateway) @@ -153,13 +157,18 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param return } - // We are good to send the response to the caller - log().Info("Writing resource to caller") - w.Header().Set("Content-type", "application/json") - w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte(resp.Resource)) - if err != nil { - log().Errorf("Could not write response to client: %v", err) + log().Infof("Status: %d", resp.Status) + if resp.Status == http.StatusOK { + // We are good to send the response to the caller + log().Info("Writing resource to caller") + w.Header().Set("Content-type", "application/json") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(resp.Resource)) + if err != nil { + log().Errorf("Could not write response to client: %v", err) + } + } else { + w.WriteHeader(resp.Status) } return default: diff --git a/principal/resource_test.go b/principal/resource_test.go new file mode 100644 index 0000000..e5ed5e2 --- /dev/null +++ b/principal/resource_test.go @@ -0,0 +1,113 @@ +package principal + +import ( + "context" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/argoproj-labs/argocd-agent/internal/event" + "github.com/argoproj-labs/argocd-agent/internal/resourceproxy" + "github.com/argoproj-labs/argocd-agent/test/fake/kube" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newResourceTestServer(t *testing.T) *Server { + t.Helper() + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), "argocd", + WithGeneratedTLS("principal"), + WithGeneratedTokenSigningKey()) + require.NoError(t, err) + s.queues.Create("agent") + s.events = event.NewEventSource("principal") + rp, err := resourceproxy.New("127.0.0.1:0") + require.NoError(t, err) + s.resourceProxy = rp + return s +} + +func Test_resourceRequester(t *testing.T) { + t.Run("Successfully request a resource", func(t *testing.T) { + s := newResourceTestServer(t) + r := httptest.NewRequest("GET", "/", nil) + r.TLS = &tls.ConnectionState{ + PeerCertificates: []*x509.Certificate{ + { + Subject: pkix.Name{CommonName: "agent"}, + }, + }, + } + w := httptest.NewRecorder() + ch := make(chan interface{}) + go func() { + s.resourceRequester(w, r, resourceproxy.NewParams()) + ch <- 1 + }() + + // We need the submission channel from the resource tracker, which is bound + // to the event's UUID. So we just take the event the resource proxy has + // created from the send queue. + sendq := s.queues.SendQ("agent") + assert.NotNil(t, sendq) + ev, shutdown := sendq.Get() + assert.False(t, shutdown) + assert.NotNil(t, ev) + agent, sendCh := s.resourceProxy.Tracked(event.EventID(ev)) + assert.Equal(t, "agent", agent) + require.NotNil(t, sendCh) + rev := s.events.NewResourceResponseEvent(event.EventID(ev), 200, "foo") + sendCh <- rev + <-ch + assert.Equal(t, 200, w.Result().StatusCode) + defer w.Result().Body.Close() + body, err := io.ReadAll(w.Result().Body) + require.NoError(t, err) + assert.Equal(t, "foo", string(body)) + }) + + t.Run("No TLS data in request", func(t *testing.T) { + s := newResourceTestServer(t) + r := httptest.NewRequest("GET", "/", nil) + w := httptest.NewRecorder() + ch := make(chan interface{}) + go func() { + s.resourceRequester(w, r, resourceproxy.NewParams()) + ch <- 1 + }() + <-ch + assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode) + defer w.Result().Body.Close() + body, err := io.ReadAll(w.Result().Body) + require.NoError(t, err) + assert.Equal(t, "no authorization found", string(body)) + }) + t.Run("Weird agent name", func(t *testing.T) { + s := newResourceTestServer(t) + r := httptest.NewRequest("GET", "/", nil) + w := httptest.NewRecorder() + r.TLS = &tls.ConnectionState{ + PeerCertificates: []*x509.Certificate{ + { + Subject: pkix.Name{CommonName: "lob/bo"}, + }, + }, + } + ch := make(chan interface{}) + go func() { + s.resourceRequester(w, r, resourceproxy.NewParams()) + ch <- 1 + }() + <-ch + assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode) + defer w.Result().Body.Close() + body, err := io.ReadAll(w.Result().Body) + require.NoError(t, err) + assert.Equal(t, "invalid client certificate", string(body)) + + }) +} diff --git a/principal/server.go b/principal/server.go index 945a28c..3e47a5f 100644 --- a/principal/server.go +++ b/principal/server.go @@ -238,10 +238,10 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace // Instantiate our KubeProxy to intercept Kubernetes requests from Argo CD's // API server. if s.resourceProxyEnabled { + // TODO(jannfis): Enable fetching APIs and resource counts s.resourceProxy, err = resourceproxy.New(s.resourceProxyListenAddr, // For matching resource requests from the Argo CD API resourceproxy.WithRequestMatcher( - // `^/api(?:(?:/(?P[^\/]+))?/(?Pv[^\/]+)/namespaces/(?P[^\/]+)/(?P[^\/]+)/(?P[^\/]+))?$`, resourceRequestRegexp, []string{"get"}, s.resourceRequester,