Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Websocket server compression #145

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"testing"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/internal/testhelpers"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)

func TestDisconnectWSByServer(t *testing.T) {
Expand Down Expand Up @@ -72,7 +74,7 @@ func TestVerifyWSCompress(t *testing.T) {

// We use a transparent TCP proxy to be able to count the actual bytes transferred so that
// we can test the number of actual bytes vs number of expected bytes with and without compression.
proxy := internal.NewProxy(srv.Endpoint)
proxy := testhelpers.NewProxy(srv.Endpoint)
assert.NoError(t, proxy.Start())

// Start an OpAMP/WebSocket client.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package internal
package testhelpers

import (
"log"
Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
type Settings struct {
// Callbacks that the Server will call after successful Attach/Start.
Callbacks types.Callbacks

// EnableCompression can be set to true to enable the compression. Note that for WebSocket transport
// the compression is only effectively enabled if the client also supports compression.
// The data will be compressed in both directions.
EnableCompression bool
}

type StartSettings struct {
Expand Down
5 changes: 3 additions & 2 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func New(logger types.Logger) *server {

func (s *server) Attach(settings Settings) (HTTPHandlerFunc, error) {
s.settings = settings
// TODO: Add support for compression using Upgrader.EnableCompression field.
s.wsUpgrader = websocket.Upgrader{}
s.wsUpgrader = websocket.Upgrader{
EnableCompression: settings.EnableCompression,
}
return s.httpHandler, nil
}

Expand Down
108 changes: 105 additions & 3 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package server
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/internal/testhelpers"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func startServer(t *testing.T, settings *StartSettings) *server {
Expand All @@ -38,6 +41,8 @@ func startServer(t *testing.T, settings *StartSettings) *server {

func dialClient(serverSettings *StartSettings) (*websocket.Conn, *http.Response, error) {
srvUrl := "ws://" + serverSettings.ListenEndpoint + serverSettings.ListenPath
dailer := websocket.DefaultDialer
dailer.EnableCompression = serverSettings.EnableCompression
return websocket.DefaultDialer.Dial(srvUrl, nil)
}

Expand Down Expand Up @@ -224,6 +229,103 @@ func TestServerReceiveSendMessage(t *testing.T) {
assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities)
}

func TestServerReceiveSendMessageWithCompression(t *testing.T) {
// Use highly compressible config body.
uncompressedCfg := []byte(strings.Repeat("test", 10000))
tests := []bool{false, true}
for _, withCompression := range tests {
t.Run(fmt.Sprintf("%v", withCompression), func(t *testing.T) {
var rcvMsg atomic.Value
callbacks := CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{Accept: true}
},
OnMessageFunc: func(conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Remember received message.
rcvMsg.Store(message)

// Send a response.
response := protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
Capabilities: uint64(protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus),
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: uncompressedCfg},
},
},
},
}
return &response
},
}

// Start a Server.
settings := &StartSettings{Settings: Settings{Callbacks: callbacks, EnableCompression: withCompression}}
srv := startServer(t, settings)
defer srv.Stop(context.Background())

// We use a transparent TCP proxy to be able to count the actual bytes transferred so that
// we can test the number of actual bytes vs number of expected bytes with and without compression.
proxy := testhelpers.NewProxy(settings.ListenEndpoint)
assert.NoError(t, proxy.Start())

clientSettings := *settings
phanidevavarapu marked this conversation as resolved.
Show resolved Hide resolved
clientSettings.ListenEndpoint = proxy.IncomingEndpoint()
// Connect using a WebSocket client.
conn, _, _ := dialClient(&clientSettings)
require.NotNil(t, conn)
defer conn.Close()

// Send a message to the Server.
sendMsg := protobufs.AgentToServer{
InstanceUid: "10000000",
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: uncompressedCfg},
},
},
},
}
bytes, err := proto.Marshal(&sendMsg)
require.NoError(t, err)
err = conn.WriteMessage(websocket.BinaryMessage, bytes)
require.NoError(t, err)

// Wait until Server receives the message.
eventually(t, func() bool { return rcvMsg.Load() != nil })
assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg))

// Read Server's response.
mt, bytes, err := conn.ReadMessage()
require.NoError(t, err)
require.EqualValues(t, websocket.BinaryMessage, mt)

// Decode the response.
var response protobufs.ServerToAgent
err = proto.Unmarshal(bytes, &response)
require.NoError(t, err)

fmt.Printf("sent %d, received %d\n", proxy.ClientToServerBytes(), proxy.ServerToClientBytes())

// Verify the response.
assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid)
assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities)
if withCompression {
// With compression the entire bytes exchanged should be less than the config body.
// This is only possible if there is any compression happening.
assert.Less(t, proxy.ClientToServerBytes(), len(uncompressedCfg))
assert.Less(t, proxy.ServerToClientBytes(), len(uncompressedCfg))
} else {
// Without compression the entire bytes exchanged should be more than the config body.
assert.Greater(t, proxy.ClientToServerBytes(), len(uncompressedCfg))
assert.Greater(t, proxy.ServerToClientBytes(), len(uncompressedCfg))
}
})
}
}

func TestServerReceiveSendMessagePlainHTTP(t *testing.T) {
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
Expand Down