Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Ping mechanism for Pusher #55326

Merged
merged 25 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contributingGuides/PERFORMANCE_METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Project is using Firebase for tracking these metrics. However, not all of them a
| `open_report_from_preview` | ✅ | Time taken to open a report from preview.<br><br>(previously `switch_report_from_preview`)<br><br>**Platforms:** All | Starts when the user presses the Report Preview. | Stops when the `ReportActionsList` finishes laying out. |
| `open_report_thread` | ✅ | Time taken to open a thread in a report.<br><br>**Platforms:** All | Starts when user presses Report Action Item. | Stops when the `ReportActionsList` finishes laying out. |
| `send_message` | ✅ | Time taken to send a message.<br><br>**Platforms:** All | Starts when the new message is sent. | Stops when the message is being rendered in the chat. |
| `pusher_ping_pong` | ✅ | The time it takes to receive a PONG event through Pusher.<br><br>**Platforms:** All | Starts every minute and repeats on the minute. | Stops when the event is received from the server. |

## Documentation Maintenance

Expand Down
1 change: 1 addition & 0 deletions src/CONST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,7 @@ const CONST = {
SEARCH_FILTER_OPTIONS: 'search_filter_options',
USE_DEBOUNCED_STATE_DELAY: 300,
LIST_SCROLLING_DEBOUNCE_TIME: 200,
PUSHER_PING_PONG: 'pusher_ping_pong',
},
PRIORITY_MODE: {
GSD: 'gsd',
Expand Down
6 changes: 6 additions & 0 deletions src/libs/API/parameters/PusherPingParams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type PusherPingParams = {
pingID: string;
pingTimestamp: number;
};

export default PusherPingParams;
1 change: 1 addition & 0 deletions src/libs/API/parameters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type {default as OpenReimbursementAccountPageParams} from './OpenReimburs
export type {default as OpenReportParams} from './OpenReportParams';
export type {default as OpenRoomMembersPageParams} from './OpenRoomMembersPageParams';
export type {default as PaymentCardParams} from './PaymentCardParams';
export type {default as PusherPingParams} from './PusherPingParams';
export type {default as ReconnectAppParams} from './ReconnectAppParams';
export type {default as ReferTeachersUniteVolunteerParams} from './ReferTeachersUniteVolunteerParams';
export type {default as ReportVirtualExpensifyCardFraudParams} from './ReportVirtualExpensifyCardFraudParams';
Expand Down
2 changes: 2 additions & 0 deletions src/libs/API/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ const WRITE_COMMANDS = {
ADD_EMOJI_REACTION: 'AddEmojiReaction',
REMOVE_EMOJI_REACTION: 'RemoveEmojiReaction',
LEAVE_ROOM: 'LeaveRoom',
PUSHER_PING: 'PusherPing',
LEAVE_GROUP_CHAT: 'LeaveGroupChat',
INVITE_TO_ROOM: 'InviteToRoom',
INVITE_TO_GROUP_CHAT: 'InviteToGroupChat',
Expand Down Expand Up @@ -560,6 +561,7 @@ type WriteCommandParameters = {
[WRITE_COMMANDS.INVITE_TO_ROOM]: Parameters.InviteToRoomParams;
[WRITE_COMMANDS.INVITE_TO_GROUP_CHAT]: Parameters.InviteToGroupChatParams;
[WRITE_COMMANDS.UPDATE_GROUP_CHAT_AVATAR]: Parameters.UpdateGroupChatAvatarParams;
[WRITE_COMMANDS.PUSHER_PING]: Parameters.PusherPingParams;
[WRITE_COMMANDS.LEAVE_GROUP_CHAT]: Parameters.LeaveGroupChatParams;
[WRITE_COMMANDS.REMOVE_FROM_GROUP_CHAT]: Parameters.RemoveFromGroupChatParams;
[WRITE_COMMANDS.UPDATE_GROUP_CHAT_MEMBER_ROLES]: Parameters.UpdateGroupChatMemberRolesParams;
Expand Down
3 changes: 3 additions & 0 deletions src/libs/Pusher/EventType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export default {
USER_IS_LEAVING_ROOM: 'client-userIsLeavingRoom',
USER_IS_TYPING: 'client-userIsTyping',
MULTIPLE_EVENTS: 'multipleEvents',

// An event that the server sends back to the client in response to a "ping" API command
PONG: 'pong',
MULTIPLE_EVENT_TYPE: {
ONYX_API_UPDATE: 'onyxApiUpdate',
RECONNECT_APP: 'reconnectApp',
Expand Down
8 changes: 7 additions & 1 deletion src/libs/Pusher/pusher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ type UserIsLeavingRoomEvent = Record<string, boolean> & {
userLogin?: string;
};

type PingPongEvent = Record<string, string | number> & {
pingID: string;
timestamp: number;
};

type PusherEventMap = {
[TYPE.USER_IS_TYPING]: UserIsTypingEvent;
[TYPE.USER_IS_LEAVING_ROOM]: UserIsLeavingRoomEvent;
[TYPE.PONG]: PingPongEvent;
};

type EventData<EventName extends string> = {chunk?: string; id?: string; index?: number; final?: boolean} & (EventName extends keyof PusherEventMap
Expand Down Expand Up @@ -441,4 +447,4 @@ export {
getPusherSocketID,
};

export type {EventCallbackError, States, UserIsTypingEvent, UserIsLeavingRoomEvent};
export type {EventCallbackError, States, UserIsTypingEvent, UserIsLeavingRoomEvent, PingPongEvent};
17 changes: 11 additions & 6 deletions src/libs/PusherUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import CONST from '@src/CONST';
import type {OnyxUpdatesFromServer} from '@src/types/onyx';
import Log from './Log';
import NetworkConnection from './NetworkConnection';
import * as Pusher from './Pusher/pusher';
import {subscribe} from './Pusher/pusher';
import type {PingPongEvent} from './Pusher/pusher';

type Callback = (data: OnyxUpdate[]) => Promise<void>;

// Keeps track of all the callbacks that need triggered for each event type
const multiEventCallbackMapping: Record<string, Callback> = {};

function getUserChannelName(accountID: string) {
return `${CONST.PUSHER.PRIVATE_USER_CHANNEL_PREFIX}${accountID}${CONFIG.PUSHER.SUFFIX}` as const;
}

function subscribeToMultiEvent(eventType: string, callback: Callback) {
multiEventCallbackMapping[eventType] = callback;
}
Expand All @@ -26,26 +31,26 @@ function triggerMultiEventHandler(eventType: string, data: OnyxUpdate[]): Promis
/**
* Abstraction around subscribing to private user channel events. Handles all logs and errors automatically.
*/
function subscribeToPrivateUserChannelEvent(eventName: string, accountID: string, onEvent: (pushJSON: OnyxUpdatesFromServer) => void) {
const pusherChannelName = `${CONST.PUSHER.PRIVATE_USER_CHANNEL_PREFIX}${accountID}${CONFIG.PUSHER.SUFFIX}` as const;
function subscribeToPrivateUserChannelEvent(eventName: string, accountID: string, onEvent: (pushJSON: OnyxUpdatesFromServer | PingPongEvent) => void) {
const pusherChannelName = getUserChannelName(accountID);

function logPusherEvent(pushJSON: OnyxUpdatesFromServer) {
function logPusherEvent(pushJSON: OnyxUpdatesFromServer | PingPongEvent) {
Log.info(`[Report] Handled ${eventName} event sent by Pusher`, false, pushJSON);
}

function onPusherResubscribeToPrivateUserChannel() {
NetworkConnection.triggerReconnectionCallbacks('Pusher re-subscribed to private user channel');
}

function onEventPush(pushJSON: OnyxUpdatesFromServer) {
function onEventPush(pushJSON: OnyxUpdatesFromServer | PingPongEvent) {
logPusherEvent(pushJSON);
onEvent(pushJSON);
}

function onSubscriptionFailed(error: Error) {
Log.hmmm('Failed to subscribe to Pusher channel', {error, pusherChannelName, eventName});
}
Pusher.subscribe(pusherChannelName, eventName, onEventPush, onPusherResubscribeToPrivateUserChannel).catch(onSubscriptionFailed);
subscribe(pusherChannelName, eventName, onEventPush, onPusherResubscribeToPrivateUserChannel).catch(onSubscriptionFailed);
}

export default {
Expand Down
148 changes: 137 additions & 11 deletions src/libs/actions/User.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
CloseAccountParams,
DeleteContactMethodParams,
GetStatementPDFParams,
PusherPingParams,
RequestContactMethodValidateCodeParams,
SetContactMethodAsDefaultParams,
SetNameValuePairParams,
Expand All @@ -27,7 +28,10 @@ import * as ErrorUtils from '@libs/ErrorUtils';
import type Platform from '@libs/getPlatform/types';
import Log from '@libs/Log';
import Navigation from '@libs/Navigation/Navigation';
import {isOffline} from '@libs/Network/NetworkStore';
import * as SequentialQueue from '@libs/Network/SequentialQueue';
import NetworkConnection from '@libs/NetworkConnection';
import * as NumberUtils from '@libs/NumberUtils';
import * as PersonalDetailsUtils from '@libs/PersonalDetailsUtils';
import * as Pusher from '@libs/Pusher/pusher';
import PusherUtils from '@libs/PusherUtils';
Expand All @@ -42,16 +46,17 @@ import ONYXKEYS from '@src/ONYXKEYS';
import ROUTES from '@src/ROUTES';
import type {BlockedFromConcierge, CustomStatusDraft, LoginList, Policy} from '@src/types/onyx';
import type Login from '@src/types/onyx/Login';
import type {OnyxServerUpdate} from '@src/types/onyx/OnyxUpdatesFromServer';
import type {OnyxServerUpdate, OnyxUpdatesFromServer} from '@src/types/onyx/OnyxUpdatesFromServer';
import type OnyxPersonalDetails from '@src/types/onyx/PersonalDetails';
import type {Status} from '@src/types/onyx/PersonalDetails';
import type ReportAction from '@src/types/onyx/ReportAction';
import {isEmptyObject} from '@src/types/utils/EmptyObject';
import * as App from './App';
import {reconnectApp} from './App';
import applyOnyxUpdatesReliably from './applyOnyxUpdatesReliably';
import * as Link from './Link';
import * as Report from './Report';
import * as Session from './Session';
import {openOldDotLink} from './Link';
import {showReportActionNotification} from './Report';
import {resendValidateCode as sessionResendValidateCode} from './Session';
import Timing from './Timing';

let currentUserAccountID = -1;
let currentEmail = '';
Expand Down Expand Up @@ -116,7 +121,7 @@ function closeAccount(reason: string) {
* Resends a validation link to a given login
*/
function resendValidateCode(login: string) {
Session.resendValidateCode(login);
sessionResendValidateCode(login);
}

/**
Expand Down Expand Up @@ -784,7 +789,7 @@ function triggerNotifications(onyxUpdates: OnyxServerUpdate[]) {
const reportID = update.key.replace(ONYXKEYS.COLLECTION.REPORT_ACTIONS, '');
const reportActions = Object.values((update.value as OnyxCollection<ReportAction>) ?? {});

reportActions.forEach((action) => action && ReportActionsUtils.isNotifiableReportAction(action) && Report.showReportActionNotification(reportID, action));
reportActions.forEach((action) => action && ReportActionsUtils.isNotifiableReportAction(action) && showReportActionNotification(reportID, action));
});
}

Expand Down Expand Up @@ -888,6 +893,124 @@ function playSoundForMessageType(pushJSON: OnyxServerUpdate[]) {
});
}

// Holds a map of all the PINGs that have been sent to the server and when they were sent
// Once a PONG is received, the event data will be removed from this map.
type PingPongTimestampMap = Record<string, number>;
let pingIDsAndTimestamps: PingPongTimestampMap = {};

function subscribeToPusherPong() {
// If there is no user accountID yet (because the app isn't fully setup yet), the channel can't be subscribed to so return early
if (currentUserAccountID === -1) {
return;
}

PusherUtils.subscribeToPrivateUserChannelEvent(Pusher.TYPE.PONG, currentUserAccountID.toString(), (pushJSON) => {
Log.info(`[Pusher PINGPONG] Received a PONG event from the server`, false, pushJSON);
const pongEvent = pushJSON as Pusher.PingPongEvent;
// First, check to see if the PONG event is in the pingIDsAndTimestamps map
// It's OK if it doesn't exist. The client was maybe refreshed while still waiting for a PONG event, in which case it might
// receive the PONG event but has already lost it's memory of the PING.
const pingEventTimestamp = pingIDsAndTimestamps[pongEvent.pingID];
if (!pingEventTimestamp) {
return;
}

// Calculate the latency between the client and the server
const latency = Date.now() - Number(pingEventTimestamp);
Log.info(`[Pusher PINGPONG] The event took ${latency} ms`);

// Remove the event from the map
delete pingIDsAndTimestamps[pongEvent.pingID];
Timing.end(CONST.TIMING.PUSHER_PING_PONG);
});
}

// Specify how long between each PING event to the server
const PING_INTERVAL_LENGTH_IN_SECONDS = 30;

// Specify how long between each check for missing PONG events
const CHECK_MISSING_PONG_INTERVAL_LENGTH_IN_SECONDS = 60;

// Specifiy how long before a PING event is considered to be missing a PONG event in order to put the application in offline mode
tgolen marked this conversation as resolved.
Show resolved Hide resolved
const NO_EVENT_RECEIVED_TO_BE_OFFLINE_THRESHOLD_IN_SECONDS = 2 * PING_INTERVAL_LENGTH_IN_SECONDS;

let lastTimestamp = Date.now();
function pingPusher() {
if (isOffline()) {
Log.info('[Pusher PINGPONG] Skipping ping because the client is offline');
return;
}
// Send a PING event to the server with a specific ID and timestamp
// The server will respond with a PONG event with the same ID and timestamp
// Then we can calculate the latency between the client and the server (or if the server never replies)
const pingID = NumberUtils.rand64();
const pingTimestamp = Date.now();

// In local development, there can end up being multiple intervals running because when JS code is replaced with hot module replacement, the old interval is not cleared
// and keeps running. This little bit of logic will attempt to keep multiple pings from happening.
if (pingTimestamp - lastTimestamp < PING_INTERVAL_LENGTH_IN_SECONDS * 1000) {
return;
}
lastTimestamp = pingTimestamp;

pingIDsAndTimestamps[pingID] = pingTimestamp;
const parameters: PusherPingParams = {pingID, pingTimestamp};
API.write(WRITE_COMMANDS.PUSHER_PING, parameters);
Log.info(`[Pusher PINGPONG] Sending a PING to the server: ${pingID} timestamp: ${pingTimestamp}`);
Timing.start(CONST.TIMING.PUSHER_PING_PONG);
}

function checkforMissingPongEvents() {
Log.info(`[Pusher PINGPONG] Checking for missing PONG events`);
// Get the oldest PING timestamp that is left in the event map
const oldestPingTimestamp = Math.min(...Object.values(pingIDsAndTimestamps));
const ageOfEventInMS = Date.now() - oldestPingTimestamp;

// Get the eventID of that timestamp
const eventID = Object.keys(pingIDsAndTimestamps).find((key) => pingIDsAndTimestamps[key] === oldestPingTimestamp);

// If the oldest timestamp is older than 2 * PING_INTERVAL_LENGTH_IN_SECONDS, then set the network status to offline
if (ageOfEventInMS > NO_EVENT_RECEIVED_TO_BE_OFFLINE_THRESHOLD_IN_SECONDS * 1000) {
Log.info(`[Pusher PINGPONG] The server has not replied to the PING event ${eventID} in ${ageOfEventInMS} ms so going offline`);
NetworkConnection.setOfflineStatus(true, 'The client never got a Pusher PONG event after sending a Pusher PING event');

// When going offline, reset the pingpong state so that when the network reconnects, the client will start fresh
lastTimestamp = Date.now();
pingIDsAndTimestamps = {};
}
}

let pingPongStarted = false;
function initializePusherPingPong() {
// Only run the ping pong from the leader client
if (!ActiveClientManager.isClientTheLeader()) {
Log.info("[Pusher PINGPONG] Not starting PING PONG because this instance isn't the leader client");
return;
}

// Ignore any additional calls to initialize the ping pong if it's already been started
if (pingPongStarted) {
return;
}
pingPongStarted = true;

Log.info(`[Pusher PINGPONG] Starting Pusher Ping Pong and pinging every ${PING_INTERVAL_LENGTH_IN_SECONDS} seconds`);

// Subscribe to the pong event from Pusher. Unfortunately, there is no way of knowing when the client is actually subscribed
// so there could be a little delay before the client is actually listening to this event.
subscribeToPusherPong();

// Send a ping to pusher on a regular interval
setInterval(pingPusher, PING_INTERVAL_LENGTH_IN_SECONDS * 1000);

// Delay the start of this by double the length of PING_INTERVAL_LENGTH_IN_SECONDS to give a chance for the first
// events to be sent and received
setTimeout(() => {
// Check for any missing pong events on a regular interval
setInterval(checkforMissingPongEvents, CHECK_MISSING_PONG_INTERVAL_LENGTH_IN_SECONDS * 1000);
}, PING_INTERVAL_LENGTH_IN_SECONDS * 2);
}

/**
* Handles the newest events from Pusher where a single mega multipleEvents contains
* an array of singular events all in one event
Expand All @@ -901,6 +1024,7 @@ function subscribeToUserEvents() {
// Handles the mega multipleEvents from Pusher which contains an array of single events.
// Each single event is passed to PusherUtils in order to trigger the callbacks for that event
PusherUtils.subscribeToPrivateUserChannelEvent(Pusher.TYPE.MULTIPLE_EVENTS, currentUserAccountID.toString(), (pushJSON) => {
const pushEventData = pushJSON as OnyxUpdatesFromServer;
// If this is not the main client, we shouldn't process any data received from pusher.
if (!ActiveClientManager.isClientTheLeader()) {
Log.info('[Pusher] Received updates, but ignoring it since this is not the active client');
Expand All @@ -910,8 +1034,8 @@ function subscribeToUserEvents() {
// Example: {lastUpdateID: 1, previousUpdateID: 0, updates: [{onyxMethod: 'whatever', key: 'foo', value: 'bar'}]}
const updates = {
type: CONST.ONYX_UPDATE_TYPES.PUSHER,
lastUpdateID: Number(pushJSON.lastUpdateID ?? CONST.DEFAULT_NUMBER_ID),
updates: pushJSON.updates ?? [],
lastUpdateID: Number(pushEventData.lastUpdateID ?? CONST.DEFAULT_NUMBER_ID),
updates: pushEventData.updates ?? [],
previousUpdateID: Number(pushJSON.previousUpdateID ?? CONST.DEFAULT_NUMBER_ID),
};
applyOnyxUpdatesReliably(updates);
Expand Down Expand Up @@ -945,9 +1069,11 @@ function subscribeToUserEvents() {
// We have an event to reconnect the App. It is triggered when we detect that the user passed updateID
// is not in the DB
PusherUtils.subscribeToMultiEvent(Pusher.TYPE.MULTIPLE_EVENT_TYPE.RECONNECT_APP, () => {
App.reconnectApp();
reconnectApp();
return Promise.resolve();
});

initializePusherPingPong();
}

/**
Expand Down Expand Up @@ -1053,7 +1179,7 @@ function clearScreenShareRequest() {
* @param roomName Name of the screen share room to join
*/
function joinScreenShare(accessToken: string, roomName: string) {
Link.openOldDotLink(`inbox?action=screenShare&accessToken=${accessToken}&name=${roomName}`);
openOldDotLink(`inbox?action=screenShare&accessToken=${accessToken}&name=${roomName}`);
clearScreenShareRequest();
}

Expand Down
Loading