diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 739425e..5899d2e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -38,6 +38,9 @@ jobs: - name: Integration Tests run: go test ./... -tags integration -v + - name: System Tests + run: go test ./tests -tags system -v + lint: runs-on: ubuntu-latest needs: build diff --git a/server/proxy/reverseproxy_test.go b/server/proxy/reverseproxy_test.go index 74f4f97..df0e956 100644 --- a/server/proxy/reverseproxy_test.go +++ b/server/proxy/reverseproxy_test.go @@ -16,21 +16,6 @@ import ( "github.com/stretchr/testify/assert" ) -// import ( -// "bytes" -// "encoding/json" -// "io" -// "net/http" -// "net/http/httptest" -// "strings" -// "testing" -// "time" -// -// "github.com/andydunstall/piko/agent/config" -// "github.com/andydunstall/piko/pkg/log" -// "github.com/stretchr/testify/assert" -// ) - type fakeManager struct { handler func(endpointID string, allowForward bool) (upstream.Upstream, bool) } diff --git a/tests/node/node.go b/tests/node/node.go new file mode 100644 index 0000000..44dddfb --- /dev/null +++ b/tests/node/node.go @@ -0,0 +1,123 @@ +package node + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/andydunstall/piko/pkg/log" + "github.com/andydunstall/piko/server/cluster" + "github.com/andydunstall/piko/server/config" + "github.com/andydunstall/piko/server/proxy" + "github.com/andydunstall/piko/server/upstream" +) + +type options struct { + logger log.Logger +} + +type loggerOption struct { + Logger log.Logger +} + +func (o loggerOption) apply(opts *options) { + opts.logger = o.Logger +} + +// WithLogger configures the logger. Defaults to no output. +func WithLogger(logger log.Logger) Option { + return loggerOption{Logger: logger} +} + +type Option interface { + apply(*options) +} + +type Node struct { + nodeID string + + proxyLn net.Listener + upstreamLn net.Listener + + proxyServer *proxy.Server + upstreamServer *upstream.Server + + options options + + wg sync.WaitGroup +} + +func New(opts ...Option) (*Node, error) { + options := options{ + logger: log.NewNopLogger(), + } + for _, o := range opts { + o.apply(&options) + } + + proxyLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("proxy listen: %w", err) + } + + upstreamLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("upstream listen: %w", err) + } + + return &Node{ + nodeID: "my-node", + proxyLn: proxyLn, + upstreamLn: upstreamLn, + options: options, + }, nil +} + +func (n *Node) Start() { + clusterState := cluster.NewState(&cluster.Node{ + ID: n.nodeID, + ProxyAddr: n.proxyLn.Addr().String(), + }, n.options.logger) + + upstreams := upstream.NewLoadBalancedManager(clusterState) + + n.proxyServer = proxy.NewServer( + upstreams, + config.ProxyConfig{}, + nil, + nil, + n.options.logger, + ) + n.wg.Add(1) + go func() { + defer n.wg.Done() + _ = n.proxyServer.Serve(n.proxyLn) + }() + + n.upstreamServer = upstream.NewServer( + upstreams, + nil, + nil, + n.options.logger, + ) + n.wg.Add(1) + go func() { + defer n.wg.Done() + _ = n.upstreamServer.Serve(n.upstreamLn) + }() +} + +func (n *Node) Stop() { + n.upstreamServer.Shutdown(context.Background()) + n.proxyServer.Shutdown(context.Background()) + n.wg.Wait() +} + +func (n *Node) ProxyAddr() string { + return n.proxyLn.Addr().String() +} + +func (n *Node) UpstreamAddr() string { + return n.upstreamLn.Addr().String() +} diff --git a/tests/proxy_test.go b/tests/proxy_test.go new file mode 100644 index 0000000..f995f7a --- /dev/null +++ b/tests/proxy_test.go @@ -0,0 +1,136 @@ +//go:build system + +package tests + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/andydunstall/piko/agent/client" + "github.com/andydunstall/piko/tests/node" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type errorMessage struct { + Error string `json:"error"` +} + +func TestProxy_HTTP(t *testing.T) { + t.Run("http", func(t *testing.T) { + node, err := node.New() + require.NoError(t, err) + node.Start() + defer node.Stop() + + // Add upstream listener with a HTTP server returning 200. + + upstreamURL := "http://" + node.UpstreamAddr() + pikoClient := client.New(client.WithURL(upstreamURL)) + ln, err := pikoClient.Listen(context.TODO(), "my-endpoint") + assert.NoError(t, err) + + server := httptest.NewUnstartedServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) {}, + )) + server.Listener = ln + go server.Start() + defer server.Close() + + // Send a request to the upstream via Piko. + + req, _ := http.NewRequest( + http.MethodGet, + "http://"+node.ProxyAddr(), + nil, + ) + req.Header.Add("x-piko-endpoint", "my-endpoint") + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + // TODO(andydunstall): Add HTTPS test. + + t.Run("websocket", func(t *testing.T) { + node, err := node.New() + require.NoError(t, err) + node.Start() + defer node.Stop() + + // Add upstream listener with a WebSocket server that echos back the + // first message. + + upstreamURL := "http://" + node.UpstreamAddr() + pikoClient := client.New(client.WithURL(upstreamURL)) + ln, err := pikoClient.Listen(context.TODO(), "my-endpoint") + assert.NoError(t, err) + + server := httptest.NewUnstartedServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + var upgrader = websocket.Upgrader{} + + c, err := upgrader.Upgrade(w, r, nil) + assert.NoError(t, err) + defer c.Close() + mt, message, err := c.ReadMessage() + assert.NoError(t, err) + + assert.NoError(t, c.WriteMessage(mt, message)) + }, + )) + server.Listener = ln + go server.Start() + defer server.Close() + + // Send a WebSocket message via Piko and wait for it to be echoed back. + + header := make(http.Header) + header.Add("x-piko-endpoint", "my-endpoint") + + c, _, err := websocket.DefaultDialer.Dial("ws://"+node.ProxyAddr(), header) + assert.NoError(t, err) + defer c.Close() + + assert.NoError(t, c.WriteMessage(websocket.TextMessage, []byte("echo"))) + + mt, message, err := c.ReadMessage() + assert.NoError(t, err) + + assert.Equal(t, websocket.TextMessage, mt) + assert.Equal(t, []byte("echo"), message) + }) + + // Tests sending a request to an endpoint with no listeners. + t.Run("no listeners", func(t *testing.T) { + node, err := node.New() + require.NoError(t, err) + node.Start() + defer node.Stop() + + // Send a request to endpoint 'my-endpoint' with no upstream listeners. + + req, _ := http.NewRequest( + http.MethodGet, + "http://"+node.ProxyAddr(), + nil, + ) + req.Header.Add("x-piko-endpoint", "my-endpoint") + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusBadGateway, resp.StatusCode) + m := errorMessage{} + assert.NoError(t, json.NewDecoder(resp.Body).Decode(&m)) + assert.Equal(t, "no available upstreams", m.Error) + }) +}