Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis committed Jan 7, 2025
1 parent 3044170 commit 262d0e2
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 22 deletions.
18 changes: 14 additions & 4 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,17 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
})
logCtx.Tracef("Start processing %v", rreq)

// TODO(jannfis): The connection to fetch resources should support some
// form of impersonation in the future.

// Create a dynamic kubernetes client and retrieve the resource from the
// cluster.
dynClient, err := dynamic.NewForConfig(a.kubeClient.RestConfig)
if err != nil {
return fmt.Errorf("could not create a dynamic client: %w", err)
}

// Some of GVR may be empty, that's ok
// Some of GVR may be empty, and that is ok.
gvk := schema.GroupVersionResource{Group: rreq.Group, Version: rreq.Version, Resource: rreq.Resource}
rif := dynClient.Resource(gvk)

Expand All @@ -95,7 +98,10 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
var jsonres []byte
var unres *unstructured.Unstructured
var unlist *unstructured.UnstructuredList
status := ""
var status error

// If we have a request for a named resource, we fetch that particular
// resource. If the name is empty, we fetch a list of resources instead.
if rreq.Name != "" {
if rreq.Namespace != "" {
unres, err = rif.Namespace(rreq.Namespace).Get(ctx, rreq.Name, v1.GetOptions{})
Expand All @@ -111,7 +117,7 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
}
if err != nil {
logCtx.Errorf("could not request resource: %v", err)
status = "failure"
status = err
} else {
// Marshal the unstructured resource to JSON for submission
if unres != nil {
Expand All @@ -126,7 +132,11 @@ func (a *Agent) processIncomingResourceRequest(ev *event.Event) error {
}

q := a.queues.SendQ(a.remote.ClientID())
q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, status, string(jsonres)))
if q == nil {
logCtx.Error("Remote queue disappeared")
return nil
}
q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, event.HttpStatusFromError(status), string(jsonres)))
logCtx.Tracef("Emitted resource response")

return nil
Expand Down
32 changes: 27 additions & 5 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

Expand Down Expand Up @@ -159,19 +161,39 @@ func (evs EventSource) AppProjectEvent(evType EventType, appProject *v1alpha1.Ap
return &cev
}

// ResourceRequest is an event that holds a request for a resource. It is
// usually emitted from the resource proxy, and is sent from the principal
// to an agent.
type ResourceRequest struct {
UUID string `json:"uuid"`
// A unique UUID for this request
UUID string `json:"uuid"`
// Namespace of the requested resource
Namespace string `json:"namespace,omitempty"`
Name string `json:"name"`
// Name of the requested resource
Name string `json:"name"`
// The group and version of the requested resource
v1.GroupVersionResource
}

// ResourceResponse is an event that holds the response to a resource request.
// It is usually sent by an agent to the princiapl in response to a prior
// resource request.
type ResourceResponse struct {
UUID string `json:"uuid"`
Status string `json:"status"`
// UUID is the unique ID of the request this response is targeted at
UUID string `json:"uuid"`
// Status contains the HTTP status of the request
Status int `json:"status"`
// Resource is the body of the requested resource
Resource string `json:"resource,omitempty"`
}

func HttpStatusFromError(err error) int {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
return int(status.Status().Code)
}
return http.StatusOK
}

// NewResourceRequestEvent creates a cloud event for requesting a resource from
// an agent. The resource will be specified by the GroupVersionResource id in
// gvr, and its name and namespace. If namespace is empty, the request will
Expand All @@ -198,7 +220,7 @@ func (evs EventSource) NewResourceRequestEvent(gvr v1.GroupVersionResource, name
return &cev, nil
}

func (evs EventSource) NewResourceResponseEvent(reqUUID string, status string, data string) *cloudevents.Event {
func (evs EventSource) NewResourceResponseEvent(reqUUID string, status int, data string) *cloudevents.Event {
resUUID := uuid.NewString()
rr := &ResourceResponse{
UUID: reqUUID,
Expand Down
33 changes: 21 additions & 12 deletions principal/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const resourceRequestRegexp = `^/(?:api|apis/(?P<group>[^\/]+))/(?P<version>v[^\

// requestTimeout is the timeout that's being applied to requests for any live
// resource.
//
// TODO(jannfis): Make the timeout configurable
const requestTimeout = 10 * time.Second

// resourceRequester is being executed by the resource proxy once it received a
Expand All @@ -42,9 +44,10 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param
logCtx := log().WithField("function", "resourceRequester")

// Make sure our request carries a client certificate
if len(r.TLS.PeerCertificates) < 1 {
if r.TLS == nil || len(r.TLS.PeerCertificates) < 1 {
logCtx.Errorf("Unauthenticated request from client %s", r.RemoteAddr)
w.WriteHeader(http.StatusUnauthorized)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("no authorization found"))
return
}

Expand All @@ -54,18 +57,19 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param
// usually configured by us.
cert := r.TLS.PeerCertificates[0]

// Make sure the agent name in the certificate is good
// Make sure the agent name in the certificate is properly formatted
agentName := cert.Subject.CommonName
errs := validation.NameIsDNSLabel(agentName, false)
if len(errs) > 0 {
logCtx.Errorf("CRITICAL: Invalid agent name in client certificate: %v", errs)
w.WriteHeader(http.StatusUnauthorized)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("invalid client certificate"))
return
}

logCtx = logCtx.WithField("agent", cert.Subject.CommonName)

// Agent is not connected. Return early.
// If the agent is not connected, return early
if !s.queues.HasQueuePair(agentName) {
logCtx.Debugf("Agent is not connected, stop proxying")
w.WriteHeader(http.StatusBadGateway)
Expand Down Expand Up @@ -153,13 +157,18 @@ func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, param
return
}

// We are good to send the response to the caller
log().Info("Writing resource to caller")
w.Header().Set("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(resp.Resource))
if err != nil {
log().Errorf("Could not write response to client: %v", err)
log().Infof("Status: %d", resp.Status)
if resp.Status == http.StatusOK {
// We are good to send the response to the caller
log().Info("Writing resource to caller")
w.Header().Set("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(resp.Resource))
if err != nil {
log().Errorf("Could not write response to client: %v", err)
}
} else {
w.WriteHeader(resp.Status)
}
return
default:
Expand Down
113 changes: 113 additions & 0 deletions principal/resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package principal

import (
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/resourceproxy"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func newResourceTestServer(t *testing.T) *Server {
t.Helper()
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), "argocd",
WithGeneratedTLS("principal"),
WithGeneratedTokenSigningKey())
require.NoError(t, err)
s.queues.Create("agent")
s.events = event.NewEventSource("principal")
rp, err := resourceproxy.New("127.0.0.1:0")
require.NoError(t, err)
s.resourceProxy = rp
return s
}

func Test_resourceRequester(t *testing.T) {
t.Run("Successfully request a resource", func(t *testing.T) {
s := newResourceTestServer(t)
r := httptest.NewRequest("GET", "/", nil)
r.TLS = &tls.ConnectionState{
PeerCertificates: []*x509.Certificate{
{
Subject: pkix.Name{CommonName: "agent"},
},
},
}
w := httptest.NewRecorder()
ch := make(chan interface{})
go func() {
s.resourceRequester(w, r, resourceproxy.NewParams())
ch <- 1
}()

// We need the submission channel from the resource tracker, which is bound
// to the event's UUID. So we just take the event the resource proxy has
// created from the send queue.
sendq := s.queues.SendQ("agent")
assert.NotNil(t, sendq)
ev, shutdown := sendq.Get()
assert.False(t, shutdown)
assert.NotNil(t, ev)
agent, sendCh := s.resourceProxy.Tracked(event.EventID(ev))
assert.Equal(t, "agent", agent)
require.NotNil(t, sendCh)
rev := s.events.NewResourceResponseEvent(event.EventID(ev), 200, "foo")
sendCh <- rev
<-ch
assert.Equal(t, 200, w.Result().StatusCode)
defer w.Result().Body.Close()
body, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
assert.Equal(t, "foo", string(body))
})

t.Run("No TLS data in request", func(t *testing.T) {
s := newResourceTestServer(t)
r := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
ch := make(chan interface{})
go func() {
s.resourceRequester(w, r, resourceproxy.NewParams())
ch <- 1
}()
<-ch
assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode)
defer w.Result().Body.Close()
body, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
assert.Equal(t, "no authorization found", string(body))
})
t.Run("Weird agent name", func(t *testing.T) {
s := newResourceTestServer(t)
r := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
r.TLS = &tls.ConnectionState{
PeerCertificates: []*x509.Certificate{
{
Subject: pkix.Name{CommonName: "lob/bo"},
},
},
}
ch := make(chan interface{})
go func() {
s.resourceRequester(w, r, resourceproxy.NewParams())
ch <- 1
}()
<-ch
assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode)
defer w.Result().Body.Close()
body, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
assert.Equal(t, "invalid client certificate", string(body))

})
}
2 changes: 1 addition & 1 deletion principal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace
// Instantiate our KubeProxy to intercept Kubernetes requests from Argo CD's
// API server.
if s.resourceProxyEnabled {
// TODO(jannfis): Enable fetching APIs and resource counts
s.resourceProxy, err = resourceproxy.New(s.resourceProxyListenAddr,
// For matching resource requests from the Argo CD API
resourceproxy.WithRequestMatcher(
// `^/api(?:(?:/(?P<group>[^\/]+))?/(?P<version>v[^\/]+)/namespaces/(?P<namespace>[^\/]+)/(?P<resource>[^\/]+)/(?P<name>[^\/]+))?$`,
resourceRequestRegexp,
[]string{"get"},
s.resourceRequester,
Expand Down

0 comments on commit 262d0e2

Please sign in to comment.