From 7f52c0c8350d91a880034a2d2da6d32e141386bc Mon Sep 17 00:00:00 2001 From: Phani Devavarapu Date: Mon, 7 Nov 2022 16:07:02 -0700 Subject: [PATCH 1/2] Websocket server compression implementation --- client/wsclient_test.go | 8 +- .../testhelpers}/tcpproxy.go | 2 +- server/server.go | 5 + server/serverimpl.go | 5 +- server/serverimpl_test.go | 108 +++++++++++++++++- 5 files changed, 119 insertions(+), 9 deletions(-) rename {client/internal => internal/testhelpers}/tcpproxy.go (99%) diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 2de35b4f..715b140f 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -8,11 +8,13 @@ import ( "testing" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opamp-go/client/internal" "github.com/open-telemetry/opamp-go/client/types" + "github.com/open-telemetry/opamp-go/internal/testhelpers" "github.com/open-telemetry/opamp-go/protobufs" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" ) func TestDisconnectWSByServer(t *testing.T) { @@ -72,7 +74,7 @@ func TestVerifyWSCompress(t *testing.T) { // We use a transparent TCP proxy to be able to count the actual bytes transferred so that // we can test the number of actual bytes vs number of expected bytes with and without compression. - proxy := internal.NewProxy(srv.Endpoint) + proxy := testhelpers.NewProxy(srv.Endpoint) assert.NoError(t, proxy.Start()) // Start an OpAMP/WebSocket client. diff --git a/client/internal/tcpproxy.go b/internal/testhelpers/tcpproxy.go similarity index 99% rename from client/internal/tcpproxy.go rename to internal/testhelpers/tcpproxy.go index c371ab83..1166a865 100644 --- a/client/internal/tcpproxy.go +++ b/internal/testhelpers/tcpproxy.go @@ -1,4 +1,4 @@ -package internal +package testhelpers import ( "log" diff --git a/server/server.go b/server/server.go index ce4e3293..d28f9b67 100644 --- a/server/server.go +++ b/server/server.go @@ -11,6 +11,11 @@ import ( type Settings struct { // Callbacks that the Server will call after successful Attach/Start. Callbacks types.Callbacks + + // EnableCompression can be set to true to enable the compression. Note that for WebSocket transport + // the compression is only effectively enabled if the client also supports compression. + // The data will be compressed in both directions. + EnableCompression bool } type StartSettings struct { diff --git a/server/serverimpl.go b/server/serverimpl.go index f1a3d340..cbb0f1c2 100644 --- a/server/serverimpl.go +++ b/server/serverimpl.go @@ -52,8 +52,9 @@ func New(logger types.Logger) *server { func (s *server) Attach(settings Settings) (HTTPHandlerFunc, error) { s.settings = settings - // TODO: Add support for compression using Upgrader.EnableCompression field. - s.wsUpgrader = websocket.Upgrader{} + s.wsUpgrader = websocket.Upgrader{ + EnableCompression: settings.EnableCompression, + } return s.httpHandler, nil } diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index 11c7f897..82445a4d 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -3,21 +3,24 @@ package server import ( "bytes" "context" + "fmt" "io" "net/http" "net/http/httptest" + "strings" "sync/atomic" "testing" "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + sharedinternal "github.com/open-telemetry/opamp-go/internal" "github.com/open-telemetry/opamp-go/internal/testhelpers" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" ) func startServer(t *testing.T, settings *StartSettings) *server { @@ -38,6 +41,8 @@ func startServer(t *testing.T, settings *StartSettings) *server { func dialClient(serverSettings *StartSettings) (*websocket.Conn, *http.Response, error) { srvUrl := "ws://" + serverSettings.ListenEndpoint + serverSettings.ListenPath + dailer := websocket.DefaultDialer + dailer.EnableCompression = serverSettings.EnableCompression return websocket.DefaultDialer.Dial(srvUrl, nil) } @@ -224,6 +229,103 @@ func TestServerReceiveSendMessage(t *testing.T) { assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities) } +func TestServerReceiveSendMessageWithCompression(t *testing.T) { + // Use highly compressible config body. + uncompressedCfg := []byte(strings.Repeat("test", 10000)) + tests := []bool{false, true} + for _, withCompression := range tests { + t.Run(fmt.Sprintf("%v", withCompression), func(t *testing.T) { + var rcvMsg atomic.Value + callbacks := CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) types.ConnectionResponse { + return types.ConnectionResponse{Accept: true} + }, + OnMessageFunc: func(conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + // Remember received message. + rcvMsg.Store(message) + + // Send a response. + response := protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + Capabilities: uint64(protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus), + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: uncompressedCfg}, + }, + }, + }, + } + return &response + }, + } + + // Start a Server. + settings := &StartSettings{Settings: Settings{Callbacks: callbacks, EnableCompression: withCompression}} + srv := startServer(t, settings) + defer srv.Stop(context.Background()) + + // We use a transparent TCP proxy to be able to count the actual bytes transferred so that + // we can test the number of actual bytes vs number of expected bytes with and without compression. + proxy := testhelpers.NewProxy(settings.ListenEndpoint) + assert.NoError(t, proxy.Start()) + + clientSettings := *settings + clientSettings.ListenEndpoint = proxy.IncomingEndpoint() + // Connect using a WebSocket client. + conn, _, _ := dialClient(&clientSettings) + require.NotNil(t, conn) + defer conn.Close() + + // Send a message to the Server. + sendMsg := protobufs.AgentToServer{ + InstanceUid: "10000000", + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: uncompressedCfg}, + }, + }, + }, + } + bytes, err := proto.Marshal(&sendMsg) + require.NoError(t, err) + err = conn.WriteMessage(websocket.BinaryMessage, bytes) + require.NoError(t, err) + + // Wait until Server receives the message. + eventually(t, func() bool { return rcvMsg.Load() != nil }) + assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg)) + + // Read Server's response. + mt, bytes, err := conn.ReadMessage() + require.NoError(t, err) + require.EqualValues(t, websocket.BinaryMessage, mt) + + // Decode the response. + var response protobufs.ServerToAgent + err = proto.Unmarshal(bytes, &response) + require.NoError(t, err) + + fmt.Printf("sent %d, received %d\n", proxy.ClientToServerBytes(), proxy.ServerToClientBytes()) + + // Verify the response. + assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid) + assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities) + if withCompression { + // With compression the entire bytes exchanged should be less than the config body. + // This is only possible if there is any compression happening. + assert.Less(t, proxy.ClientToServerBytes(), len(uncompressedCfg)) + assert.Less(t, proxy.ServerToClientBytes(), len(uncompressedCfg)) + } else { + // Without compression the entire bytes exchanged should be more than the config body. + assert.Greater(t, proxy.ClientToServerBytes(), len(uncompressedCfg)) + assert.Greater(t, proxy.ServerToClientBytes(), len(uncompressedCfg)) + } + }) + } +} + func TestServerReceiveSendMessagePlainHTTP(t *testing.T) { var rcvMsg atomic.Value var onConnectedCalled, onCloseCalled int32 From d304ed6d44005097c1c669b6ee0925d55fefba7e Mon Sep 17 00:00:00 2001 From: Phani Devavarapu Date: Tue, 8 Nov 2022 13:06:45 -0700 Subject: [PATCH 2/2] PR comment --- server/serverimpl_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index 82445a4d..03cb8b5d 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -270,10 +270,10 @@ func TestServerReceiveSendMessageWithCompression(t *testing.T) { proxy := testhelpers.NewProxy(settings.ListenEndpoint) assert.NoError(t, proxy.Start()) - clientSettings := *settings - clientSettings.ListenEndpoint = proxy.IncomingEndpoint() + serverSettings := *settings + serverSettings.ListenEndpoint = proxy.IncomingEndpoint() // Connect using a WebSocket client. - conn, _, _ := dialClient(&clientSettings) + conn, _, _ := dialClient(&serverSettings) require.NotNil(t, conn) defer conn.Close()