From b1d96ebc119a6c0181e04c9acf77623fb2ebc218 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 21 Nov 2023 15:38:16 +0800 Subject: [PATCH 01/22] graceful shutdown of the websocket client --- client/internal/wsreceiver.go | 56 ++++++++++++++++----------------- client/internal/wssender.go | 27 +++++++++++++++- client/wsclient.go | 58 +++++++++++++++++++---------------- 3 files changed, 86 insertions(+), 55 deletions(-) diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index b53b54d3..f85a720d 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -17,6 +17,9 @@ type wsReceiver struct { sender *WSSender callbacks types.Callbacks processor receivedProcessor + + // Indicates that the receiver has fully stopped. + stopped chan struct{} } // NewWSReceiver creates a new Receiver that uses WebSocket to receive @@ -36,45 +39,42 @@ func NewWSReceiver( sender: sender, callbacks: callbacks, processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities), + stopped: make(chan struct{}), } return w } -// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context. -func (r *wsReceiver) ReceiverLoop(ctx context.Context) { - type receivedMessage struct { - message *protobufs.ServerToAgent - err error - } +// Start starts the receiver loop. +func (r *wsReceiver) Start(ctx context.Context) { + go r.ReceiverLoop(ctx) +} - for { - select { - case <-ctx.Done(): - return - default: - result := make(chan receivedMessage, 1) +// IsStopped returns a channel that's closed when the receiver is stopped. +func (r *wsReceiver) IsStopped() <-chan struct{} { + return r.stopped +} - go func() { - var message protobufs.ServerToAgent - err := r.receiveMessage(&message) - result <- receivedMessage{&message, err} - }() +// ReceiverLoop runs the receiver loop. +// To stop the receiver cancel the context and close the websocket connection +func (r *wsReceiver) ReceiverLoop(ctx context.Context) { + processorCtx, stopProcessor := context.WithCancel(ctx) - select { - case <-ctx.Done(): - return - case res := <-result: - if res.err != nil { - if !websocket.IsCloseError(res.err, websocket.CloseNormalClosure) { - r.logger.Errorf(ctx, "Unexpected error while receiving: %v", res.err) - } - return - } - r.processor.ProcessReceivedMessage(ctx, res.message) +out: + for { + var message protobufs.ServerToAgent + if err := r.receiveMessage(&message); err != nil { + if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) { + r.logger.Errorf(ctx, "Unexpected error while receiving: %v", err) } + break out + } else { + r.processor.ProcessReceivedMessage(processorCtx, &message) } } + + stopProcessor() + close(r.stopped) } func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error { diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 40ac937b..c8b20371 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -2,6 +2,7 @@ package internal import ( "context" + "time" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" @@ -11,13 +12,19 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) +const ( + defaultSendCloseMessageTimeout = 5 * time.Second +) + // WSSender implements the WebSocket client's sending portion of OpAMP protocol. type WSSender struct { SenderCommon conn *websocket.Conn logger types.Logger + // Indicates that the sender has fully stopped. stopped chan struct{} + err error } // NewSender creates a new Sender that uses WebSocket to send @@ -44,8 +51,17 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { // WaitToStop blocks until the sender is stopped. To stop the sender cancel the context // that was passed to Start(). -func (s *WSSender) WaitToStop() { +func (s *WSSender) WaitToStop() error { <-s.stopped + return s.err +} + +func (s *WSSender) IsStopped() <-chan struct{} { + return s.stopped +} + +func (s *WSSender) Err() error { + return s.err } func (s *WSSender) run(ctx context.Context) { @@ -56,6 +72,7 @@ out: s.sendNextMessage(ctx) case <-ctx.Done(): + s.err = s.sendCloseMessage() break out } } @@ -63,6 +80,14 @@ out: close(s.stopped) } +func (s *WSSender) sendCloseMessage() error { + return s.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Normal closure"), + time.Now().Add(defaultSendCloseMessageTimeout), + ) +} + func (s *WSSender) sendNextMessage(ctx context.Context) error { msgToSend := s.nextMessage.PopPending() if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) { diff --git a/client/wsclient.go b/client/wsclient.go index 2b5ab5ad..fa6b2655 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -18,6 +18,10 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) +const ( + defaultShutdownTimeout = 5 * time.Second +) + // wsClient is an OpAMP Client implementation for WebSocket transport. // See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport type wsClient struct { @@ -85,15 +89,7 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } func (c *wsClient) Stop(ctx context.Context) error { - // Close connection if any. - c.connMutex.RLock() - conn := c.conn - c.connMutex.RUnlock() - - if conn != nil { - _ = conn.Close() - } - + // stop the runner return c.common.Stop(ctx) } @@ -234,9 +230,10 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // are being stopped. return } + // Close the underlying connection. + defer c.conn.Close() if c.common.IsStopping() { - _ = c.conn.Close() return } @@ -248,15 +245,14 @@ func (c *wsClient) runOneCycle(ctx context.Context) { } // Create a cancellable context for background processors. - procCtx, procCancel := context.WithCancel(ctx) + senderCtx, stopSender := context.WithCancel(ctx) + defer stopSender() // Connected successfully. Start the sender. This will also send the first // status report. - if err := c.sender.Start(procCtx, c.conn); err != nil { - c.common.Logger.Errorf(procCtx, "Failed to send first status report: %v", err) + if err := c.sender.Start(senderCtx, c.conn); err != nil { + c.common.Logger.Errorf(senderCtx, "Failed to send first status report: %v", err) // We could not send the report, the only thing we can do is start over. - _ = c.conn.Close() - procCancel() return } @@ -270,19 +266,29 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.PackagesStateProvider, c.common.Capabilities, ) - r.ReceiverLoop(ctx) - - // Stop the background processors. - procCancel() - - // If we exited receiverLoop it means there is a connection error, we cannot - // read messages anymore. We need to start over. + r.Start(ctx) + + select { + case <-c.sender.IsStopped(): + // sender will send close message to initiate + if err := c.sender.Err(); err != nil && err != websocket.ErrCloseSent { + c.common.Logger.Debugf("failed to send close message, close without the handshake.") + break + } - // Close the connection to unblock the WSSender as well. - _ = c.conn.Close() + c.common.Logger.Debugf("waiting for close message from server.") + select { + case <-r.IsStopped(): + c.common.Logger.Debugf("shutdown handshake complete.") + case <-time.After(defaultShutdownTimeout): + c.common.Logger.Debugf("timeout waiting for close message.") + } + case <-r.IsStopped(): + // If we exited receiverLoop it means there is a connection error, we cannot + // read messages anymore. We need to start over. - // Wait for WSSender to stop. - c.sender.WaitToStop() + // TODO: handle close message from server. + } } func (c *wsClient) runUntilStopped(ctx context.Context) { From b40a59818a6ca956880f9bcff73dad58249fa95c Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 21 Nov 2023 17:16:15 +0800 Subject: [PATCH 02/22] update the comments --- client/wsclient.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index fa6b2655..51f70e1d 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -89,7 +89,6 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } func (c *wsClient) Stop(ctx context.Context) error { - // stop the runner return c.common.Stop(ctx) } @@ -220,10 +219,15 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { // runOneCycle performs the following actions: // 1. connect (try until succeeds). // 2. send first status report. -// 3. receive and process messages until error happens. +// 3. start the sender to wait for scheduled message and send it to the server. +// 4. start the receiver to receive and process messages until error happens. +// 5. wait until either sender or receiver stops. // -// If it encounters an error it closes the connection and returns. -// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set). +// runOneCycle will close the connection it created before it return. +// +// When Stop() is called (ctx is cancelled, isStopping is set), wsClient will shutdown gracefully: +// 1. sender will be cancelled by the ctx, send the close message to server and return the error via sender.Err(). +// 2. runOneCycle will handle that error and wait for the close message from server until timeout. func (c *wsClient) runOneCycle(ctx context.Context) { if err := c.ensureConnected(ctx); err != nil { // Can't connect, so can't move forward. This currently happens when we @@ -270,7 +274,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { select { case <-c.sender.IsStopped(): - // sender will send close message to initiate + // sender will send close message to initiate the close handshake if err := c.sender.Err(); err != nil && err != websocket.ErrCloseSent { c.common.Logger.Debugf("failed to send close message, close without the handshake.") break From 43db9555dcb8df6cfaa3c040434a34f64bc43125 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 21 Nov 2023 17:18:20 +0800 Subject: [PATCH 03/22] remove unsed code --- client/internal/wssender.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index c8b20371..689693f7 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -49,13 +49,6 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { return err } -// WaitToStop blocks until the sender is stopped. To stop the sender cancel the context -// that was passed to Start(). -func (s *WSSender) WaitToStop() error { - <-s.stopped - return s.err -} - func (s *WSSender) IsStopped() <-chan struct{} { return s.stopped } From 90b99446e19d1098555f163dfb049fd93453aa75 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 21 Nov 2023 19:39:14 +0800 Subject: [PATCH 04/22] fix: wait until both sender and receiver are stopped --- client/internal/wssender.go | 1 + client/wsclient.go | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 689693f7..1a922773 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -44,6 +44,7 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { // Run the sender in the background. s.stopped = make(chan struct{}) + s.err = nil go s.run(ctx) return err diff --git a/client/wsclient.go b/client/wsclient.go index 51f70e1d..96b52aec 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -221,7 +221,7 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { // 2. send first status report. // 3. start the sender to wait for scheduled message and send it to the server. // 4. start the receiver to receive and process messages until error happens. -// 5. wait until either sender or receiver stops. +// 5. wait until both sender and receiver are stopped. // // runOneCycle will close the connection it created before it return. // @@ -286,12 +286,16 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Logger.Debugf("shutdown handshake complete.") case <-time.After(defaultShutdownTimeout): c.common.Logger.Debugf("timeout waiting for close message.") + // not receive close message from the server, close the connection to force the receive loop to stop + _ = c.conn.Close() + <-r.IsStopped() } case <-r.IsStopped(): // If we exited receiverLoop it means there is a connection error, we cannot // read messages anymore. We need to start over. - // TODO: handle close message from server. + stopSender() + <-c.sender.IsStopped() } } From 8dde9d9099030c478974c5a5af6b24c16b9dbc63 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 21 Nov 2023 23:24:30 +0800 Subject: [PATCH 05/22] add testcase from #205 --- client/wsclient.go | 11 ++- client/wsclient_test.go | 207 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 3 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 96b52aec..67a940fb 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -44,6 +44,10 @@ type wsClient struct { // last non-nil internal error that was encountered in the conn retry loop, // currently used only for testing. lastInternalErr atomic.Pointer[error] + + // Network connection timeout used for the WebSocket closing handshake. + // This field is currently only modified during testing. + connShutdownTimeout time.Duration } // NewWebSocket creates a new OpAMP Client that uses WebSocket transport. @@ -54,8 +58,9 @@ func NewWebSocket(logger types.Logger) *wsClient { sender := internal.NewSender(logger) w := &wsClient{ - common: internal.NewClientCommon(logger, sender), - sender: sender, + common: internal.NewClientCommon(logger, sender), + sender: sender, + connShutdownTimeout: defaultShutdownTimeout, } return w } @@ -284,7 +289,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { select { case <-r.IsStopped(): c.common.Logger.Debugf("shutdown handshake complete.") - case <-time.After(defaultShutdownTimeout): + case <-time.After(c.connShutdownTimeout): c.common.Logger.Debugf("timeout waiting for close message.") // not receive close message from the server, close the connection to force the receive loop to stop _ = c.conn.Close() diff --git a/client/wsclient_test.go b/client/wsclient_test.go index d0b8b4e6..487d0b66 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -9,9 +9,11 @@ import ( "strings" "sync/atomic" "testing" + "time" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/internal" @@ -262,3 +264,208 @@ func TestRedirectWS(t *testing.T) { }) } } + +func TestHandlesStopBeforeStart(t *testing.T) { + client := NewWebSocket(nil) + require.Error(t, client.Stop(context.Background())) +} + +func TestPerformsClosingHandshake(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + closed := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + defHandler := wsConn.CloseHandler() + + wsConn.SetCloseHandler(func(code int, _ string) error { + require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") + + err := defHandler(code, "") + closed <- struct{}{} + return err + }) + + client.Stop(context.Background()) + + select { + case <-closed: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never closed") + } +} + +func TestHandlesSlowCloseMessageFromServer(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + closed := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + client.connShutdownTimeout = 100 * time.Millisecond + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + defHandler := wsConn.CloseHandler() + + wsConn.SetCloseHandler(func(code int, _ string) error { + require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") + + time.Sleep(200 * time.Millisecond) + err := defHandler(code, "") + closed <- struct{}{} + return err + }) + + client.Stop(context.Background()) + + select { + case <-closed: + case <-time.After(1 * time.Second): + require.Fail(t, "Connection never closed") + } +} + +func TestHandlesNoCloseMessageFromServer(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + closed := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + client.connShutdownTimeout = 100 * time.Millisecond + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + wsConn.SetCloseHandler(func(code int, _ string) error { + // Don't send close message + return nil + }) + + go func() { + client.Stop(context.Background()) + closed <- struct{}{} + }() + + select { + case <-closed: + case <-time.After(1 * time.Second): + require.Fail(t, "Connection never closed") + } +} + +func TestHandlesConnectionError(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + // Write an invalid message to the connection. The client + // will take this as an error and reconnect to the server. + writer, err := wsConn.NextWriter(websocket.BinaryMessage) + require.NoError(t, err) + n, err := writer.Write([]byte{99, 1, 2, 3, 4, 5}) + require.NoError(t, err) + require.Equal(t, 6, n) + err = writer.Close() + require.NoError(t, err) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never re-established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + err = client.Stop(context.Background()) + require.NoError(t, err) +} From 598cf2542b65e5109daa5a24ada3a8644bf34ab8 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 10 Jan 2024 12:24:28 +0800 Subject: [PATCH 06/22] Update client/wsclient.go Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- client/wsclient.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 67a940fb..da8edfb0 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -224,9 +224,9 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { // runOneCycle performs the following actions: // 1. connect (try until succeeds). // 2. send first status report. -// 3. start the sender to wait for scheduled message and send it to the server. -// 4. start the receiver to receive and process messages until error happens. -// 5. wait until both sender and receiver are stopped. +// 3. start the sender to wait for scheduled messages and send them to the server. +// 4. start the receiver to receive and process messages until an error happens. +// 5. wait until both the sender and receiver are stopped. // // runOneCycle will close the connection it created before it return. // From bbe2d3f57ad05558487548c213920ce46d88bbd3 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Tue, 16 Jan 2024 15:00:45 +0800 Subject: [PATCH 07/22] Update client/wsclient_test.go Co-authored-by: Srikanth Chekuri --- client/wsclient_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 487d0b66..0c392ed4 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -292,12 +292,12 @@ func TestPerformsClosingHandshake(t *testing.T) { require.Fail(t, "Connection never established") } - require.Eventually(t, func() bool { + eventually(t, func() bool { client.connMutex.RLock() conn := client.conn client.connMutex.RUnlock() return conn != nil - }, 2*time.Second, 250*time.Millisecond) + }) defHandler := wsConn.CloseHandler() From 4c03472dc687888d8976631eaf4b9b3aa5eb41c0 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 17 Jan 2024 01:21:18 +0800 Subject: [PATCH 08/22] Update client/wsclient.go Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> --- client/wsclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/wsclient.go b/client/wsclient.go index da8edfb0..478269c3 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -285,7 +285,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { break } - c.common.Logger.Debugf("waiting for close message from server.") + c.common.Logger.Debugf("Waiting for receiver to stop.") select { case <-r.IsStopped(): c.common.Logger.Debugf("shutdown handshake complete.") From 84113b9dbe8548cf63dd2d2674b19150f74c546a Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 17 Jan 2024 01:24:03 +0800 Subject: [PATCH 09/22] Update client/wsclient.go Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> --- client/wsclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/wsclient.go b/client/wsclient.go index 478269c3..75f1281b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -291,7 +291,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Logger.Debugf("shutdown handshake complete.") case <-time.After(c.connShutdownTimeout): c.common.Logger.Debugf("timeout waiting for close message.") - // not receive close message from the server, close the connection to force the receive loop to stop + // Close the connection to force the receive loop to stop. _ = c.conn.Close() <-r.IsStopped() } From 35ffed86270801c8f3ec0b3ab42c18f41dcf5f80 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 17 Jan 2024 01:24:44 +0800 Subject: [PATCH 10/22] Update client/wsclient.go Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> --- client/wsclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/wsclient.go b/client/wsclient.go index 75f1281b..27994fa5 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -281,7 +281,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { case <-c.sender.IsStopped(): // sender will send close message to initiate the close handshake if err := c.sender.Err(); err != nil && err != websocket.ErrCloseSent { - c.common.Logger.Debugf("failed to send close message, close without the handshake.") + c.common.Logger.Debugf("Error stopping the sender: %v", err) break } From c6851ed7461b18f9c8e4bf40a2cc5018497fa160 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 17 Jan 2024 01:25:34 +0800 Subject: [PATCH 11/22] Apply suggestions from code review Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> --- client/wsclient.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 27994fa5..a7671020 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -288,9 +288,9 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Logger.Debugf("Waiting for receiver to stop.") select { case <-r.IsStopped(): - c.common.Logger.Debugf("shutdown handshake complete.") + c.common.Logger.Debugf(Receiver stopped.") case <-time.After(c.connShutdownTimeout): - c.common.Logger.Debugf("timeout waiting for close message.") + c.common.Logger.Debugf("Timeout waiting for receiver to stop.") // Close the connection to force the receive loop to stop. _ = c.conn.Close() <-r.IsStopped() From 35c8b8d6f5209ad115f1bc61c3dd3d50242f1483 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Wed, 17 Jan 2024 01:31:37 +0800 Subject: [PATCH 12/22] docs: add doc comments --- client/internal/wssender.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 1a922773..5e6ae9ca 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -50,10 +50,12 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { return err } +// IsStopped returns a channel that's closed when the sender is stopped. func (s *WSSender) IsStopped() <-chan struct{} { return s.stopped } +// Err returns the error of sending close message. func (s *WSSender) Err() error { return s.err } From 998431d51accd1f1b6b4af2f086c3b8162a4c926 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Wed, 17 Jan 2024 01:36:20 +0800 Subject: [PATCH 13/22] handle websocket.ErrCloseSent inside sender --- client/internal/wssender.go | 4 +++- client/wsclient.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 5e6ae9ca..4e756141 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -68,7 +68,9 @@ out: s.sendNextMessage(ctx) case <-ctx.Done(): - s.err = s.sendCloseMessage() + if err := s.sendCloseMessage(); err != nil && err != websocket.ErrCloseSent { + s.err = err + } break out } } diff --git a/client/wsclient.go b/client/wsclient.go index a7671020..4ca24e56 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -280,7 +280,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { select { case <-c.sender.IsStopped(): // sender will send close message to initiate the close handshake - if err := c.sender.Err(); err != nil && err != websocket.ErrCloseSent { + if err := c.sender.Err(); err != nil { c.common.Logger.Debugf("Error stopping the sender: %v", err) break } From 25644c230bd63557334d6ace1bf7d6fff4a15bd2 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Wed, 17 Jan 2024 02:10:05 +0800 Subject: [PATCH 14/22] fix typo --- client/wsclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/wsclient.go b/client/wsclient.go index 4ca24e56..e98a4f89 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -288,7 +288,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Logger.Debugf("Waiting for receiver to stop.") select { case <-r.IsStopped(): - c.common.Logger.Debugf(Receiver stopped.") + c.common.Logger.Debugf("Receiver stopped.") case <-time.After(c.connShutdownTimeout): c.common.Logger.Debugf("Timeout waiting for receiver to stop.") // Close the connection to force the receive loop to stop. From fb226bb2de262b8db06255de3a6b757b35027e2b Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 24 Jan 2024 14:52:50 +0800 Subject: [PATCH 15/22] Apply suggestions from code review Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> --- client/internal/wssender.go | 6 ++++-- client/wsclient.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 4e756141..4bff484a 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -55,8 +55,10 @@ func (s *WSSender) IsStopped() <-chan struct{} { return s.stopped } -// Err returns the error of sending close message. -func (s *WSSender) Err() error { +// StoppingErr returns an error if there was a problem with stopping the sender. +// If stopping was successful will return nil. +// StoppingErr() can be called only after IsStopped() is signalled. +func (s *WSSender) StoppingErr() error { return s.err } diff --git a/client/wsclient.go b/client/wsclient.go index e98a4f89..d62cc508 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -280,7 +280,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { select { case <-c.sender.IsStopped(): // sender will send close message to initiate the close handshake - if err := c.sender.Err(); err != nil { + if err := c.sender.StoppingErr(); err != nil { c.common.Logger.Debugf("Error stopping the sender: %v", err) break } From 6e88dba7f55c5257449132a49ccd6492212bcc01 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Wed, 24 Jan 2024 15:13:27 +0800 Subject: [PATCH 16/22] close connections explicitly --- client/wsclient.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/wsclient.go b/client/wsclient.go index d62cc508..e0bec5e7 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -282,6 +282,10 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // sender will send close message to initiate the close handshake if err := c.sender.StoppingErr(); err != nil { c.common.Logger.Debugf("Error stopping the sender: %v", err) + + // Close the connection to stop the receiver. + _ = c.conn.Close() + <-r.IsStopped() break } From 1446c8aa35d5c4002f19fc35e29bf6b865af8ea6 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 6 Feb 2024 18:40:25 +0800 Subject: [PATCH 17/22] update logger --- client/wsclient.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index e0bec5e7..02604ded 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -281,7 +281,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { case <-c.sender.IsStopped(): // sender will send close message to initiate the close handshake if err := c.sender.StoppingErr(); err != nil { - c.common.Logger.Debugf("Error stopping the sender: %v", err) + c.common.Logger.Debugf(ctx, "Error stopping the sender: %v", err) // Close the connection to stop the receiver. _ = c.conn.Close() @@ -289,12 +289,12 @@ func (c *wsClient) runOneCycle(ctx context.Context) { break } - c.common.Logger.Debugf("Waiting for receiver to stop.") + c.common.Logger.Debugf(ctx, "Waiting for receiver to stop.") select { case <-r.IsStopped(): - c.common.Logger.Debugf("Receiver stopped.") + c.common.Logger.Debugf(ctx, "Receiver stopped.") case <-time.After(c.connShutdownTimeout): - c.common.Logger.Debugf("Timeout waiting for receiver to stop.") + c.common.Logger.Debugf(ctx, "Timeout waiting for receiver to stop.") // Close the connection to force the receive loop to stop. _ = c.conn.Close() <-r.IsStopped() From 6ec846cd7edc6c157a3403e90ba269e71149e733 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 6 Feb 2024 18:41:43 +0800 Subject: [PATCH 18/22] update tests --- client/internal/wsreceiver_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index c929f8fa..01f70592 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -212,6 +212,7 @@ func TestReceiverLoopStop(t *testing.T) { receiverLoopStopped.Store(true) }() cancel() + conn.Close() assert.Eventually(t, func() bool { return receiverLoopStopped.Load() From 574baa6b04500efbb12fabe1a0f0cba7a29695be Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 23 Jan 2024 19:35:12 +0530 Subject: [PATCH 19/22] ReceiverLoop should stop on cancelling context (#240) Here is test run that shows it doesn't stop without fix https://github.com/open-telemetry/opamp-go/actions/runs/7512286887/job/20452817400 Fixes https://github.com/open-telemetry/opamp-go/issues/239 --- client/internal/wsreceiver.go | 39 +++++++++++++++++++++--------- client/internal/wsreceiver_test.go | 1 - 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index f85a720d..939f9ad5 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -58,23 +58,38 @@ func (r *wsReceiver) IsStopped() <-chan struct{} { // ReceiverLoop runs the receiver loop. // To stop the receiver cancel the context and close the websocket connection func (r *wsReceiver) ReceiverLoop(ctx context.Context) { - processorCtx, stopProcessor := context.WithCancel(ctx) + type receivedMessage struct { + message *protobufs.ServerToAgent + err error + } -out: for { - var message protobufs.ServerToAgent - if err := r.receiveMessage(&message); err != nil { - if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) { - r.logger.Errorf(ctx, "Unexpected error while receiving: %v", err) + select { + case <-ctx.Done(): + return + default: + result := make(chan receivedMessage, 1) + + go func() { + var message protobufs.ServerToAgent + err := r.receiveMessage(&message) + result <- receivedMessage{&message, err} + }() + + select { + case <-ctx.Done(): + return + case res := <-result: + if res.err != nil { + if !websocket.IsCloseError(res.err, websocket.CloseNormalClosure) { + r.logger.Errorf(ctx, "Unexpected error while receiving: %v", res.err) + } + return + } + r.processor.ProcessReceivedMessage(ctx, res.message) } - break out - } else { - r.processor.ProcessReceivedMessage(processorCtx, &message) } } - - stopProcessor() - close(r.stopped) } func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error { diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index 01f70592..c929f8fa 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -212,7 +212,6 @@ func TestReceiverLoopStop(t *testing.T) { receiverLoopStopped.Store(true) }() cancel() - conn.Close() assert.Eventually(t, func() bool { return receiverLoopStopped.Load() From d5c6e3a28f3fa1325dd47e0546d8700666852491 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Thu, 7 Mar 2024 11:34:39 +0800 Subject: [PATCH 20/22] fix: close `stopped` channel --- client/internal/wsreceiver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index 939f9ad5..634830d6 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -63,6 +63,8 @@ func (r *wsReceiver) ReceiverLoop(ctx context.Context) { err error } + defer func() { close(r.stopped) }() + for { select { case <-ctx.Done(): @@ -70,6 +72,7 @@ func (r *wsReceiver) ReceiverLoop(ctx context.Context) { default: result := make(chan receivedMessage, 1) + // To stop this goroutine, close the websocket connection go func() { var message protobufs.ServerToAgent err := r.receiveMessage(&message) From eb606259c40acf25a0f55437012ceaa86b3c7283 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Thu, 7 Mar 2024 14:23:50 +0800 Subject: [PATCH 21/22] test: ensure the wsclient waits for a close message from server --- client/wsclient_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 0c392ed4..839463a4 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -275,6 +275,7 @@ func TestPerformsClosingHandshake(t *testing.T) { var wsConn *websocket.Conn connected := make(chan struct{}) closed := make(chan struct{}) + acked := make(chan struct{}) srv.OnWSConnect = func(conn *websocket.Conn) { wsConn = conn @@ -299,6 +300,14 @@ func TestPerformsClosingHandshake(t *testing.T) { return conn != nil }) + { + defhandler := client.conn.CloseHandler() + client.conn.SetCloseHandler(func(code int, msg string) error { + close(acked) + return defhandler(code, msg) + }) + } + defHandler := wsConn.CloseHandler() wsConn.SetCloseHandler(func(code int, _ string) error { @@ -313,6 +322,11 @@ func TestPerformsClosingHandshake(t *testing.T) { select { case <-closed: + select { + case <-acked: + case <-time.After(2 * time.Second): + require.Fail(t, "Close connection without waiting for a close message from server") + } case <-time.After(2 * time.Second): require.Fail(t, "Connection never closed") } From 0c4dab08a951c1df5b35acf0cab3a824f63fb5d8 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Thu, 7 Mar 2024 14:55:55 +0800 Subject: [PATCH 22/22] fix: wsclient should wait for a close message from server --- client/wsclient.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 02604ded..95082006 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -275,7 +275,13 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.PackagesStateProvider, c.common.Capabilities, ) - r.Start(ctx) + + // When the wsclient is closed, the context passed to runOneCycle will be canceled. + // The receiver should keep running and processing messages + // until it received a Close message from the server which means the server has no more messages. + receiverCtx, stopReceiver := context.WithCancel(context.Background()) + defer stopReceiver() + r.Start(receiverCtx) select { case <-c.sender.IsStopped(): @@ -283,8 +289,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { if err := c.sender.StoppingErr(); err != nil { c.common.Logger.Debugf(ctx, "Error stopping the sender: %v", err) - // Close the connection to stop the receiver. - _ = c.conn.Close() + stopReceiver() <-r.IsStopped() break } @@ -295,8 +300,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Logger.Debugf(ctx, "Receiver stopped.") case <-time.After(c.connShutdownTimeout): c.common.Logger.Debugf(ctx, "Timeout waiting for receiver to stop.") - // Close the connection to force the receive loop to stop. - _ = c.conn.Close() + stopReceiver() <-r.IsStopped() } case <-r.IsStopped():