Skip to content

Commit

Permalink
Merge pull request #1723 from Bilb/poll-less-often-group-not-active
Browse files Browse the repository at this point in the history
Poll less often group not active
  • Loading branch information
Bilb authored Jun 29, 2021
2 parents 01f29df + 4a491e6 commit 19555b9
Show file tree
Hide file tree
Showing 16 changed files with 458 additions and 60 deletions.
4 changes: 0 additions & 4 deletions js/background.js
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,7 @@
}, window.CONSTANTS.NOTIFICATION_ENABLE_TIMEOUT_SECONDS * 1000);

window.NewReceiver.queueAllCached();
window
.getSwarmPollingInstance()
.addPubkey(window.libsession.Utils.UserUtils.getOurPubKeyStrFromCache());

window.getSwarmPollingInstance().start();
window.libsession.Utils.AttachmentDownloads.start({
logger: window.log,
});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"format-full": "prettier --list-different --write \"*.{css,js,json,scss,ts,tsx}\" \"./**/*.{css,js,json,scss,ts,tsx}\"",
"transpile": "tsc --incremental",
"transpile:watch": "tsc -w",
"clean-transpile": "rimraf ts/**/*.js ts/*.js ts/*.js.map ts/**/*.js.map && rimraf tsconfig.tsbuildinfo;",
"clean-transpile": "rimraf 'ts/**/*.js ts/*.js' 'ts/*.js.map' 'ts/**/*.js.map' && rimraf tsconfig.tsbuildinfo;",
"ready": "yarn clean-transpile; yarn grunt && yarn lint-full && yarn test"
},
"dependencies": {
Expand Down
4 changes: 1 addition & 3 deletions ts/components/session/ActionsPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ const doAppStartUp = () => {
debounce(triggerAvatarReUploadIfNeeded, 200);

// TODO: Investigate the case where we reconnect
const ourKey = UserUtils.getOurPubKeyStrFromCache();
getSwarmPollingInstance().addPubkey(ourKey);
getSwarmPollingInstance().start();
void getSwarmPollingInstance().start();
};

/**
Expand Down
2 changes: 1 addition & 1 deletion ts/data/data.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Electron from 'electron';

const { ipcRenderer } = Electron;
// tslint:disable: function-name no-require-imports no-var-requires one-variable-per-declaration no-void-expression
// tslint:disable: no-require-imports no-var-requires one-variable-per-declaration no-void-expression

import _ from 'lodash';
import { ConversationCollection, ConversationModel } from '../models/conversation';
Expand Down
1 change: 1 addition & 0 deletions ts/models/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { NotificationForConvoOption } from '../components/conversation/Conversat
import { useDispatch } from 'react-redux';
import { updateConfirmModal } from '../state/ducks/modalDialog';
import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout';
import { DURATION, SWARM_POLLING_TIMEOUT } from '../session/constants';

export enum ConversationTypeEnum {
GROUP = 'group',
Expand Down
6 changes: 6 additions & 0 deletions ts/session/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export const TTL_DEFAULT = {
TTL_MAX: 14 * DURATION.DAYS,
};

export const SWARM_POLLING_TIMEOUT = {
ACTIVE: DURATION.SECONDS * 5,
MEDIUM_ACTIVE: DURATION.SECONDS * 60,
INACTIVE: DURATION.MINUTES * 60,
};

export const PROTOCOLS = {
// tslint:disable-next-line: no-http-string
HTTP: 'http:',
Expand Down
6 changes: 3 additions & 3 deletions ts/session/conversations/ConversationController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class ConversationController {

conversation.initialPromise = create();
conversation.initialPromise.then(async () => {
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(
conversationActions.conversationAdded(conversation.id, conversation.getProps())
);
Expand Down Expand Up @@ -242,7 +242,7 @@ export class ConversationController {
window.log.info(`deleteContact !isPrivate, convo removed from DB: ${id}`);

this.conversations.remove(conversation);
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(conversationActions.conversationRemoved(conversation.id));
window.inboxStore?.dispatch(
conversationActions.conversationChanged(conversation.id, conversation.getProps())
Expand Down Expand Up @@ -310,7 +310,7 @@ export class ConversationController {
public reset() {
this._initialPromise = Promise.resolve();
this._initialFetchComplete = false;
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(conversationActions.removeAllConversations());
}
this.conversations.reset([]);
Expand Down
1 change: 0 additions & 1 deletion ts/session/messages/MessageController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ export class MessageController {
});
}

// tslint:disable-next-line: function-name
public get(identifier: string) {
return this.messageLookup.get(identifier);
}
Expand Down
160 changes: 120 additions & 40 deletions ts/session/snode_api/swarmPolling.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { PubKey } from '../types';
import { getSwarmFor } from './snodePool';
import * as snodePool from './snodePool';
import { retrieveNextMessages } from './SNodeAPI';
import { SignalService } from '../../protobuf';
import * as Receiver from '../../receiver/receiver';
Expand All @@ -12,9 +12,10 @@ import {
updateLastHash,
} from '../../../ts/data/data';

import { StringUtils } from '../../session/utils';
import { getConversationController } from '../conversations';
import { StringUtils, UserUtils } from '../../session/utils';
import { ConversationModel } from '../../models/conversation';
import { DURATION, SWARM_POLLING_TIMEOUT } from '../constants';
import { getConversationController } from '../conversations';

type PubkeyToHash = { [key: string]: string };

Expand Down Expand Up @@ -50,49 +51,133 @@ export const getSwarmPollingInstance = () => {
};

export class SwarmPolling {
private pubkeys: Array<PubKey>;
private groupPubkeys: Array<PubKey>;
private ourPubkey: PubKey | undefined;
private groupPolling: Array<{ pubkey: PubKey; lastPolledTimestamp: number }>;
private readonly lastHashes: { [key: string]: PubkeyToHash };

constructor() {
this.pubkeys = [];
this.groupPubkeys = [];
this.groupPolling = [];
this.lastHashes = {};
}

public start(): void {
public async start(waitForFirstPoll = false): Promise<void> {
this.ourPubkey = UserUtils.getOurPubKeyFromCache();
this.loadGroupIds();
void this.pollForAllKeys();
if (waitForFirstPoll) {
await this.TEST_pollForAllKeys();
} else {
void this.TEST_pollForAllKeys();
}
}

/**
* Used fo testing only
*/
public TEST_reset() {
this.ourPubkey = undefined;
this.groupPolling = [];
}

public addGroupId(pubkey: PubKey) {
if (this.groupPubkeys.findIndex(m => m.key === pubkey.key) === -1) {
if (this.groupPolling.findIndex(m => m.pubkey.key === pubkey.key) === -1) {
window?.log?.info('Swarm addGroupId: adding pubkey to polling', pubkey.key);
this.groupPubkeys.push(pubkey);
this.groupPolling.push({ pubkey, lastPolledTimestamp: 0 });
}
}

public addPubkey(pk: PubKey | string) {
public removePubkey(pk: PubKey | string) {
const pubkey = PubKey.cast(pk);
if (this.pubkeys.findIndex(m => m.key === pubkey.key) === -1) {
this.pubkeys.push(pubkey);
window?.log?.info('Swarm removePubkey: removing pubkey from polling', pubkey.key);

if (this.ourPubkey && PubKey.cast(pk).isEqual(this.ourPubkey)) {
this.ourPubkey = undefined;
}
this.groupPolling = this.groupPolling.filter(group => !pubkey.isEqual(group.pubkey));
}

public removePubkey(pk: PubKey | string) {
const pubkey = PubKey.cast(pk);
window?.log?.info('Swarm removePubkey: removing pubkey from polling', pubkey.key);
/**
* Only public for testing
* As of today, we pull closed group pubkeys as follow:
* if activeAt is not set, poll only once per hour
* if activeAt is less than an hour old, poll every 5 seconds or so
* if activeAt is less than a day old, poll every minutes only.
* If activeAt is more than a day old, poll only once per hour
*/
public TEST_getPollingTimeout(convoId: PubKey) {
const convo = getConversationController().get(convoId.key);
if (!convo) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}
const activeAt = convo.get('active_at');
if (!activeAt) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}

const currentTimestamp = Date.now();

this.pubkeys = this.pubkeys.filter(key => !pubkey.isEqual(key));
this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key));
// consider that this is an active group if activeAt is less than an hour old
if (currentTimestamp - activeAt <= DURATION.HOURS * 1) {
return SWARM_POLLING_TIMEOUT.ACTIVE;
}

if (currentTimestamp - activeAt <= DURATION.DAYS * 1) {
return SWARM_POLLING_TIMEOUT.MEDIUM_ACTIVE;
}

return SWARM_POLLING_TIMEOUT.INACTIVE;
}

protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
/**
* Only public for testing
*/
public async TEST_pollForAllKeys() {
// we always poll as often as possible for our pubkey
const directPromise = this.ourPubkey
? this.TEST_pollOnceForKey(this.ourPubkey, false)
: Promise.resolve();

const now = Date.now();
const groupPromises = this.groupPolling.map(async group => {
const convoPollingTimeout = this.TEST_getPollingTimeout(group.pubkey);

const diff = now - group.lastPolledTimestamp;

const loggingId =
getConversationController()
.get(group.pubkey.key)
?.idForLogging() || group.pubkey.key;

if (diff >= convoPollingTimeout) {
(window?.log?.info || console.warn)(
`Polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);
return this.TEST_pollOnceForKey(group.pubkey, true);
}
(window?.log?.info || console.warn)(
`Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);

return Promise.resolve();
});
try {
await Promise.all(_.concat(directPromise, groupPromises));
} catch (e) {
(window?.log?.info || console.warn)('pollForAllKeys swallowing exception: ', e);
throw e;
} finally {
setTimeout(this.TEST_pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
}
}

/**
* Only exposed as public for testing
*/
public async TEST_pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
// NOTE: sometimes pubkey is string, sometimes it is object, so
// accept both until this is fixed:
const pkStr = pubkey.key;

const snodes = await getSwarmFor(pkStr);
const snodes = await snodePool.getSwarmFor(pkStr);

// Select nodes for which we already have lastHashes
const alreadyPolled = snodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
Expand Down Expand Up @@ -123,6 +208,19 @@ export class SwarmPolling {
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);

if (isGroup) {
// update the last fetched timestamp
this.groupPolling = this.groupPolling.map(group => {
if (PubKey.isEqual(pubkey, group.pubkey)) {
return {
...group,
lastPolledTimestamp: Date.now(),
};
}
return group;
});
}

const newMessages = await this.handleSeenMessages(messages);

newMessages.forEach((m: Message) => {
Expand All @@ -133,7 +231,7 @@ export class SwarmPolling {

// Fetches messages for `pubkey` from `node` potentially updating
// the lash hash record
protected async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
const edkey = node.pubkey_ed25519;

const pkStr = pubkey.key;
Expand Down Expand Up @@ -188,24 +286,6 @@ export class SwarmPolling {
return newMessages;
}

private async pollForAllKeys() {
const directPromises = this.pubkeys.map(async pk => {
return this.pollOnceForKey(pk, false);
});

const groupPromises = this.groupPubkeys.map(async pk => {
return this.pollOnceForKey(pk, true);
});
try {
await Promise.all(_.concat(directPromises, groupPromises));
} catch (e) {
window?.log?.warn('pollForAllKeys swallowing exception: ', e);
throw e;
} finally {
setTimeout(this.pollForAllKeys.bind(this), 2000);
}
}

private async updateLastHash(
edkey: string,
pubkey: PubKey,
Expand Down
4 changes: 4 additions & 0 deletions ts/session/types/PubKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ export class PubKey {
return key.replace(PubKey.PREFIX_GROUP_TEXTSECURE, '');
}

public static isEqual(comparator1: PubKey | string, comparator2: PubKey | string) {
return PubKey.cast(comparator1).isEqual(comparator2);
}

public isEqual(comparator: PubKey | string) {
return comparator instanceof PubKey
? this.key === comparator.key
Expand Down
1 change: 0 additions & 1 deletion ts/session/utils/AttachmentsDownload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ export async function addJob(attachment: any, job: any = {}) {
};
}

// tslint:disable: function-name
async function _tick() {
await _maybeStartJob();
timeout = setTimeout(_tick, TICK_INTERVAL);
Expand Down
1 change: 0 additions & 1 deletion ts/state/ducks/defaultRooms.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const defaultRoomsSlice = createSlice({
},
updateDefaultRoomsInProgress(state, action) {
const inProgress = action.payload as boolean;
window?.log?.info('fetching default rooms inProgress?', action.payload);
return { ...state, inProgress };
},
updateDefaultBase64RoomData(state, action: PayloadAction<Base64Update>) {
Expand Down
Loading

0 comments on commit 19555b9

Please sign in to comment.