Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/stabilise-ws-fetch-method #5152

Merged
merged 2 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 1 addition & 68 deletions packages/insomnia/src/main/network/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,6 @@ const WebSocketConnections = new Map<string, WebSocket>();
const eventLogFileStreams = new Map<string, fs.WriteStream>();
const timelineFileStreams = new Map<string, fs.WriteStream>();

// Flow control state.

// CTS flag; When set, the renderer thread is accepting new WebSocket events.
let clearToSend = true;

// Send queue map; holds batches of events for each event channel, to be sent upon receiving a CTS signal.
const sendQueueMap = new Map<string, WebSocketEventLog>();

/**
* Dispatches a websocket event to a renderer, using batching control flow logic.
* When CTS is set, the events are sent immediately.
* If CTS is cleared, the events are batched into the send queue.
*/
const dispatchWebSocketEvent = (target: Electron.WebContents, eventChannel: string, wsEvent: WebSocketEvent): void => {
// If the CTS flag is already set, just send immediately.
if (clearToSend) {
target.send(eventChannel, [wsEvent]);
clearToSend = false;
return;
}

// Otherwise, append to send queue for this event channel.
const sendQueue = sendQueueMap.get(eventChannel);
if (sendQueue) {
// Add the event to the top of queue so that the latest message is first.
sendQueue.unshift(wsEvent);
} else {
sendQueueMap.set(eventChannel, [wsEvent]);
}
};

const parseResponseAndBuildTimeline = (url: string, incomingMessage: IncomingMessage, clientRequestHeaders: string) => {
const statusMessage = incomingMessage.statusMessage || '';
const statusCode = incomingMessage.statusCode || 0;
Expand Down Expand Up @@ -155,7 +124,6 @@ const createWebSocketConnection = async (
const responseEnvironmentId = environment ? environment._id : null;

try {
const eventChannel = `webSocket.${responseId}.event`;
const readyStateChannel = `webSocket.${request._id}.readyState`;

const reduceArrayToLowerCaseKeyedDictionary = (acc: { [key: string]: string }, { name, value }: BaseWebSocketRequest['headers'][0]) =>
Expand Down Expand Up @@ -274,7 +242,6 @@ const createWebSocketConnection = async (

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(openEvent) + '\n');
timelineFileStreams.get(options.requestId)?.write(JSON.stringify({ value: 'WebSocket connection established', name: 'Text', timestamp: Date.now() }) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, openEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -289,7 +256,6 @@ const createWebSocketConnection = async (
};

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(messageEvent) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, messageEvent);
});

ws.addEventListener('close', ({ code, reason, wasClean }) => {
Expand All @@ -303,11 +269,8 @@ const createWebSocketConnection = async (
timestamp: Date.now(),
};

sendQueueMap.delete(eventChannel);
const message = `Closing connection with code ${code}`;
deleteRequestMaps(request._id, message, closeEvent);

dispatchWebSocketEvent(event.sender, eventChannel, closeEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -324,10 +287,7 @@ const createWebSocketConnection = async (
};

deleteRequestMaps(request._id, message, errorEvent);

dispatchWebSocketEvent(event.sender, eventChannel, errorEvent);
event.sender.send(readyStateChannel, ws.readyState);

createErrorResponse(responseId, request._id, responseEnvironmentId, timelinePath, message || 'Something went wrong');
});
} catch (e) {
Expand Down Expand Up @@ -371,7 +331,6 @@ const getWebSocketReadyState = async (
};

const sendWebSocketEvent = async (
event: Electron.IpcMainInvokeEvent,
options: { message: string; requestId: string }
): Promise<void> => {
const ws = WebSocketConnections.get(options.requestId);
Expand Down Expand Up @@ -406,8 +365,6 @@ const sendWebSocketEvent = async (
console.error('something went wrong');
return;
}
const eventChannel = `webSocket.${response._id}.event`;
dispatchWebSocketEvent(event.sender, eventChannel, lastMessage);
};

const closeWebSocketConnection = async (
Expand Down Expand Up @@ -439,28 +396,6 @@ const findMany = async (
.reverse() || [];
};

/**
* Sets the CTS flag; sent when the UI is ready for more events.
*/
const signalClearToSend = (event: Electron.IpcMainInvokeEvent): void => {
const nextChannel = sendQueueMap.keys().next();

// There are no pending events; just set the CTS flag.
if (nextChannel.done) {
clearToSend = true;
return;
}

// We have batched events; immediately send one batch.
const sendQueue = sendQueueMap.get(nextChannel.value);
if (!sendQueue) {
return;
}

event.sender.send(nextChannel.value, sendQueue);
sendQueueMap.delete(nextChannel.value);
};

export interface WebSocketBridgeAPI {
create: (options: {
requestId: string;
Expand All @@ -477,13 +412,11 @@ export interface WebSocketBridgeAPI {
event: {
findMany: typeof findMany;
send: (options: { requestId: string; message: string }) => void;
clearToSend: () => void;
};
}
export const registerWebSocketHandlers = () => {
ipcMain.handle('webSocket.create', createWebSocketConnection);
ipcMain.handle('webSocket.event.send', sendWebSocketEvent);
ipcMain.handle('webSocket.clearToSend', signalClearToSend);
ipcMain.handle('webSocket.event.send', (_, options: Parameters<typeof sendWebSocketEvent>[0]) => sendWebSocketEvent(options));
ipcMain.handle('webSocket.close', (_, options: Parameters<typeof closeWebSocketConnection>[0]) => closeWebSocketConnection(options));
ipcMain.handle('webSocket.closeAll', closeAllWebSocketConnections);
ipcMain.handle('webSocket.readyState', (_, options: Parameters<typeof getWebSocketReadyState>[0]) => getWebSocketReadyState(options));
Expand Down
1 change: 0 additions & 1 deletion packages/insomnia/src/preload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const webSocket: WebSocketBridgeAPI = {
event: {
findMany: options => ipcRenderer.invoke('webSocket.event.findMany', options),
send: options => ipcRenderer.invoke('webSocket.event.send', options),
clearToSend: () => ipcRenderer.invoke('webSocket.clearToSend'),
},
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,25 @@
import { useEffect, useState } from 'react';
import { useState } from 'react';
import { useInterval } from 'react-use';

import { WebSocketEvent } from '../../../main/network/websocket';

export function useWebSocketConnectionEvents({ responseId }: { responseId: string }) {
// @TODO - This list can grow to thousands of events in a chatty websocket connection.
// It's worth investigating an LRU cache that keeps the last X number of messages.
// We'd also need to expand the findMany API to support pagination.
const [events, setEvents] = useState<WebSocketEvent[]>([]);

useEffect(() => {
let isMounted = true;
let unsubscribe = () => {
setEvents([]);
};

// @TODO - There is a possible race condition here.
// Subscribe should probably ask for events after a given event.id so we can make sure
// we don't lose any.
async function fetchAndSubscribeToEvents() {
// Fetch all existing events for this connection
const allEvents = await window.main.webSocket.event.findMany({ responseId });
if (isMounted) {
setEvents(allEvents);
}

const afterLatestEvent = (event: WebSocketEvent, prevEvents: WebSocketEvent[]) => {
if (prevEvents.length === 0) {
return true;
useInterval(
() => {
let isMounted = true;
const fn = async () => {
const allEvents = await window.main.webSocket.event.findMany({ responseId });
if (isMounted) {
setEvents(allEvents);
}

return event.timestamp > prevEvents[0]?.timestamp;
};

// Subscribe to new events and update the state.
unsubscribe = window.main.on(`webSocket.${responseId}.event`,
(_, events: WebSocketEvent[]) => {
console.log('received events', events);
if (isMounted) {
setEvents(prevEvents => events.filter(event => afterLatestEvent(event, prevEvents)).concat(prevEvents));
}

// Wait to give the CTS signal until we've rendered a frame.
// This gives the UI a chance to render and respond to user interactions between receiving events.
// Note that we do this even if the component isn't mounted, to ensure that CTS gets set even if a race occurs.
window.requestAnimationFrame(window.main.webSocket.event.clearToSend);
}
);

window.main.webSocket.event.clearToSend();
}

fetchAndSubscribeToEvents();

return () => {
isMounted = false;
unsubscribe();
};
}, [responseId]);

fn();
return () => {
isMounted = false;
};
},
500
);
return events;
}