Skip to content

Commit

Permalink
Fix potential stuck session due to ws client leak (#492)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 authored Aug 21, 2023
1 parent 5f107f9 commit 10f9f4b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 41 deletions.
7 changes: 4 additions & 3 deletions server/rtcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,14 +583,15 @@ func (m *rtcdClientManager) handleClientMsg(msg rtcd.ClientMessage) error {
return fmt.Errorf("missing sessionID")
}
m.ctx.LogDebug("received close message from rtcd", "sessionID", sessionID)
m.ctx.mut.RLock()
us := m.ctx.sessions[sessionID]
m.ctx.mut.RUnlock()
us := m.ctx.getSessionByOriginalID(sessionID)
if us != nil && atomic.CompareAndSwapInt32(&us.rtcClosed, 0, 1) {
m.ctx.LogDebug("closing rtc close channel", "sessionID", sessionID)
close(us.rtcCloseCh)
return m.ctx.removeSession(us)
}

m.ctx.LogDebug("session not found or rtc conn already closed", "sessionID", sessionID)

return nil
}

Expand Down
25 changes: 25 additions & 0 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,28 @@ func (p *Plugin) removeSession(us *session) error {
}
return nil
}

// getSessionByOriginalID retrieves a session by its original connection ID
// which is also the session ID matching the RTC connection.
func (p *Plugin) getSessionByOriginalID(sessionID string) *session {
p.mut.RLock()
defer p.mut.RUnlock()

// We first try to see if the session is mapped by its original ID since
// it's more efficient and the most probable case.
us := p.sessions[sessionID]
if us != nil {
return us
}

// If we can't find one, we resort to looping through all the sessions to
// check against the originalConnID field. This would be necessary only if
// the session reconnected throughout the call with a new ws connection ID.
for _, s := range p.sessions {
if s.originalConnID == sessionID {
return s
}
}

return nil
}
74 changes: 36 additions & 38 deletions webapp/src/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ export class WebSocketClient extends EventEmitter {
};

this.ws.onclose = ({code}) => {
this.ws = null;
this.emit('close', code);
if (!this.closed) {
this.close(code);
this.reconnect();
}
};

Expand Down Expand Up @@ -156,48 +156,46 @@ export class WebSocketClient extends EventEmitter {
this.ws.send(JSON.stringify(msg));
}
} else {
logWarn('failed to send message, connection is not open');
logWarn('failed to send message, connection is not open', msg);
}
}

close(code?: number) {
if (this.ws) {
this.closed = true;
this.ws.close();
this.ws = null;
this.seqNo = 1;
this.serverSeqNo = 0;
this.connID = '';
this.originalConnID = '';

this.removeAllListeners('open');
this.removeAllListeners('event');
this.removeAllListeners('join');
this.removeAllListeners('close');
this.removeAllListeners('error');
this.removeAllListeners('message');
} else {
this.emit('close', code);
close() {
this.closed = true;
this.ws?.close();
this.ws = null;
this.seqNo = 1;
this.serverSeqNo = 0;
this.connID = '';
this.originalConnID = '';

this.removeAllListeners('open');
this.removeAllListeners('event');
this.removeAllListeners('join');
this.removeAllListeners('close');
this.removeAllListeners('error');
this.removeAllListeners('message');
}

const now = Date.now();
if (this.lastDisconnect === 0) {
this.lastDisconnect = now;
}
reconnect() {
const now = Date.now();
if (this.lastDisconnect === 0) {
this.lastDisconnect = now;
}

if ((now - this.lastDisconnect) >= wsReconnectionTimeout) {
this.closed = true;
this.emit('error', new WebSocketError(WebSocketErrorType.ReconnectTimeout, 'max disconnected time reached'));
return;
}
if ((now - this.lastDisconnect) >= wsReconnectionTimeout) {
this.closed = true;
this.emit('error', new WebSocketError(WebSocketErrorType.ReconnectTimeout, 'max disconnected time reached'));
return;
}

setTimeout(() => {
if (!this.ws && !this.closed) {
logInfo('ws: reconnecting');
this.init(true);
}
}, this.reconnectRetryTime);
setTimeout(() => {
if (!this.closed) {
logInfo('ws: reconnecting');
this.init(true);
}
}, this.reconnectRetryTime);

this.reconnectRetryTime += wsReconnectTimeIncrement;
}
this.reconnectRetryTime += wsReconnectTimeIncrement;
}
}

0 comments on commit 10f9f4b

Please sign in to comment.