Skip to content

Commit

Permalink
Bug/stabilise-ws-fetch-method (#5152)
Browse files Browse the repository at this point in the history
* just poll

* remove cts and eventlog channel
  • Loading branch information
jackkav committed Sep 9, 2022
1 parent 2a5d71f commit a85a25a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 122 deletions.
69 changes: 1 addition & 68 deletions packages/insomnia/src/main/network/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,6 @@ const WebSocketConnections = new Map<string, WebSocket>();
const eventLogFileStreams = new Map<string, fs.WriteStream>();
const timelineFileStreams = new Map<string, fs.WriteStream>();

// Flow control state.

// CTS flag; When set, the renderer thread is accepting new WebSocket events.
let clearToSend = true;

// Send queue map; holds batches of events for each event channel, to be sent upon receiving a CTS signal.
const sendQueueMap = new Map<string, WebSocketEventLog>();

/**
* Dispatches a websocket event to a renderer, using batching control flow logic.
* When CTS is set, the events are sent immediately.
* If CTS is cleared, the events are batched into the send queue.
*/
const dispatchWebSocketEvent = (target: Electron.WebContents, eventChannel: string, wsEvent: WebSocketEvent): void => {
// If the CTS flag is already set, just send immediately.
if (clearToSend) {
target.send(eventChannel, [wsEvent]);
clearToSend = false;
return;
}

// Otherwise, append to send queue for this event channel.
const sendQueue = sendQueueMap.get(eventChannel);
if (sendQueue) {
// Add the event to the top of queue so that the latest message is first.
sendQueue.unshift(wsEvent);
} else {
sendQueueMap.set(eventChannel, [wsEvent]);
}
};

const parseResponseAndBuildTimeline = (url: string, incomingMessage: IncomingMessage, clientRequestHeaders: string) => {
const statusMessage = incomingMessage.statusMessage || '';
const statusCode = incomingMessage.statusCode || 0;
Expand Down Expand Up @@ -155,7 +124,6 @@ const createWebSocketConnection = async (
const responseEnvironmentId = environment ? environment._id : null;

try {
const eventChannel = `webSocket.${responseId}.event`;
const readyStateChannel = `webSocket.${request._id}.readyState`;

const reduceArrayToLowerCaseKeyedDictionary = (acc: { [key: string]: string }, { name, value }: BaseWebSocketRequest['headers'][0]) =>
Expand Down Expand Up @@ -274,7 +242,6 @@ const createWebSocketConnection = async (

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(openEvent) + '\n');
timelineFileStreams.get(options.requestId)?.write(JSON.stringify({ value: 'WebSocket connection established', name: 'Text', timestamp: Date.now() }) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, openEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -289,7 +256,6 @@ const createWebSocketConnection = async (
};

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(messageEvent) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, messageEvent);
});

ws.addEventListener('close', ({ code, reason, wasClean }) => {
Expand All @@ -303,11 +269,8 @@ const createWebSocketConnection = async (
timestamp: Date.now(),
};

sendQueueMap.delete(eventChannel);
const message = `Closing connection with code ${code}`;
deleteRequestMaps(request._id, message, closeEvent);

dispatchWebSocketEvent(event.sender, eventChannel, closeEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -324,10 +287,7 @@ const createWebSocketConnection = async (
};

deleteRequestMaps(request._id, message, errorEvent);

dispatchWebSocketEvent(event.sender, eventChannel, errorEvent);
event.sender.send(readyStateChannel, ws.readyState);

createErrorResponse(responseId, request._id, responseEnvironmentId, timelinePath, message || 'Something went wrong');
});
} catch (e) {
Expand Down Expand Up @@ -371,7 +331,6 @@ const getWebSocketReadyState = async (
};

const sendWebSocketEvent = async (
event: Electron.IpcMainInvokeEvent,
options: { message: string; requestId: string }
): Promise<void> => {
const ws = WebSocketConnections.get(options.requestId);
Expand Down Expand Up @@ -406,8 +365,6 @@ const sendWebSocketEvent = async (
console.error('something went wrong');
return;
}
const eventChannel = `webSocket.${response._id}.event`;
dispatchWebSocketEvent(event.sender, eventChannel, lastMessage);
};

const closeWebSocketConnection = async (
Expand Down Expand Up @@ -439,28 +396,6 @@ const findMany = async (
.reverse() || [];
};

/**
* Sets the CTS flag; sent when the UI is ready for more events.
*/
const signalClearToSend = (event: Electron.IpcMainInvokeEvent): void => {
const nextChannel = sendQueueMap.keys().next();

// There are no pending events; just set the CTS flag.
if (nextChannel.done) {
clearToSend = true;
return;
}

// We have batched events; immediately send one batch.
const sendQueue = sendQueueMap.get(nextChannel.value);
if (!sendQueue) {
return;
}

event.sender.send(nextChannel.value, sendQueue);
sendQueueMap.delete(nextChannel.value);
};

export interface WebSocketBridgeAPI {
create: (options: {
requestId: string;
Expand All @@ -477,13 +412,11 @@ export interface WebSocketBridgeAPI {
event: {
findMany: typeof findMany;
send: (options: { requestId: string; message: string }) => void;
clearToSend: () => void;
};
}
export const registerWebSocketHandlers = () => {
ipcMain.handle('webSocket.create', createWebSocketConnection);
ipcMain.handle('webSocket.event.send', sendWebSocketEvent);
ipcMain.handle('webSocket.clearToSend', signalClearToSend);
ipcMain.handle('webSocket.event.send', (_, options: Parameters<typeof sendWebSocketEvent>[0]) => sendWebSocketEvent(options));
ipcMain.handle('webSocket.close', (_, options: Parameters<typeof closeWebSocketConnection>[0]) => closeWebSocketConnection(options));
ipcMain.handle('webSocket.closeAll', closeAllWebSocketConnections);
ipcMain.handle('webSocket.readyState', (_, options: Parameters<typeof getWebSocketReadyState>[0]) => getWebSocketReadyState(options));
Expand Down
1 change: 0 additions & 1 deletion packages/insomnia/src/preload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const webSocket: WebSocketBridgeAPI = {
event: {
findMany: options => ipcRenderer.invoke('webSocket.event.findMany', options),
send: options => ipcRenderer.invoke('webSocket.event.send', options),
clearToSend: () => ipcRenderer.invoke('webSocket.clearToSend'),
},
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,25 @@
import { useEffect, useState } from 'react';
import { useState } from 'react';
import { useInterval } from 'react-use';

import { WebSocketEvent } from '../../../main/network/websocket';

export function useWebSocketConnectionEvents({ responseId }: { responseId: string }) {
// @TODO - This list can grow to thousands of events in a chatty websocket connection.
// It's worth investigating an LRU cache that keeps the last X number of messages.
// We'd also need to expand the findMany API to support pagination.
const [events, setEvents] = useState<WebSocketEvent[]>([]);

useEffect(() => {
let isMounted = true;
let unsubscribe = () => {
setEvents([]);
};

// @TODO - There is a possible race condition here.
// Subscribe should probably ask for events after a given event.id so we can make sure
// we don't lose any.
async function fetchAndSubscribeToEvents() {
// Fetch all existing events for this connection
const allEvents = await window.main.webSocket.event.findMany({ responseId });
if (isMounted) {
setEvents(allEvents);
}

const afterLatestEvent = (event: WebSocketEvent, prevEvents: WebSocketEvent[]) => {
if (prevEvents.length === 0) {
return true;
useInterval(
() => {
let isMounted = true;
const fn = async () => {
const allEvents = await window.main.webSocket.event.findMany({ responseId });
if (isMounted) {
setEvents(allEvents);
}

return event.timestamp > prevEvents[0]?.timestamp;
};

// Subscribe to new events and update the state.
unsubscribe = window.main.on(`webSocket.${responseId}.event`,
(_, events: WebSocketEvent[]) => {
console.log('received events', events);
if (isMounted) {
setEvents(prevEvents => events.filter(event => afterLatestEvent(event, prevEvents)).concat(prevEvents));
}

// Wait to give the CTS signal until we've rendered a frame.
// This gives the UI a chance to render and respond to user interactions between receiving events.
// Note that we do this even if the component isn't mounted, to ensure that CTS gets set even if a race occurs.
window.requestAnimationFrame(window.main.webSocket.event.clearToSend);
}
);

window.main.webSocket.event.clearToSend();
}

fetchAndSubscribeToEvents();

return () => {
isMounted = false;
unsubscribe();
};
}, [responseId]);

fn();
return () => {
isMounted = false;
};
},
500
);
return events;
}

0 comments on commit a85a25a

Please sign in to comment.