Skip to content

Commit

Permalink
add test for swarmPolling variable rate
Browse files Browse the repository at this point in the history
  • Loading branch information
Bilb committed Jun 25, 2021
1 parent ca2203d commit 13bc1a2
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 81 deletions.
6 changes: 3 additions & 3 deletions ts/session/conversations/ConversationController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class ConversationController {

conversation.initialPromise = create();
conversation.initialPromise.then(async () => {
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(
conversationActions.conversationAdded(conversation.id, conversation.getProps())
);
Expand Down Expand Up @@ -242,7 +242,7 @@ export class ConversationController {
window.log.info(`deleteContact !isPrivate, convo removed from DB: ${id}`);

this.conversations.remove(conversation);
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(conversationActions.conversationRemoved(conversation.id));
window.inboxStore?.dispatch(
conversationActions.conversationChanged(conversation.id, conversation.getProps())
Expand Down Expand Up @@ -310,7 +310,7 @@ export class ConversationController {
public reset() {
this._initialPromise = Promise.resolve();
this._initialFetchComplete = false;
if (window.inboxStore) {
if (window?.inboxStore) {
window.inboxStore?.dispatch(conversationActions.removeAllConversations());
}
this.conversations.reset([]);
Expand Down
173 changes: 96 additions & 77 deletions ts/session/snode_api/swarmPolling.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { PubKey } from '../types';
import { getSwarmFor } from './snodePool';
import * as snodePool from './snodePool';
import { retrieveNextMessages } from './SNodeAPI';
import { SignalService } from '../../protobuf';
import * as Receiver from '../../receiver/receiver';
Expand All @@ -13,10 +13,9 @@ import {
} from '../../../ts/data/data';

import { StringUtils, UserUtils } from '../../session/utils';
import { getConversationController } from '../conversations';
import { ConversationModel } from '../../models/conversation';
import { DURATION, SWARM_POLLING_TIMEOUT } from '../constants';
import { ConversationController } from '../conversations/ConversationController';
import { getConversationController } from '../conversations';

type PubkeyToHash = { [key: string]: string };

Expand Down Expand Up @@ -51,7 +50,7 @@ export const getSwarmPollingInstance = () => {
return instance;
};

class SwarmPolling {
export class SwarmPolling {
private ourPubkey: PubKey | undefined;
private groupPolling: Array<{ pubkey: PubKey; lastPolledTimestamp: number }>;
private readonly lastHashes: { [key: string]: PubkeyToHash };
Expand All @@ -61,10 +60,22 @@ class SwarmPolling {
this.lastHashes = {};
}

public start(): void {
public async start(waitForFirstPoll = false): Promise<void> {
this.ourPubkey = UserUtils.getOurPubKeyFromCache();
this.loadGroupIds();
void this.pollForAllKeys();
if (waitForFirstPoll) {
await this.TEST_pollForAllKeys();
} else {
void this.TEST_pollForAllKeys();
}
}

/**
* Used fo testing only
*/
public TEST_reset() {
this.ourPubkey = undefined;
this.groupPolling = [];
}

public addGroupId(pubkey: PubKey) {
Expand All @@ -84,12 +95,89 @@ class SwarmPolling {
this.groupPolling = this.groupPolling.filter(group => !pubkey.isEqual(group.pubkey));
}

private async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
/**
* Only public for testing
* As of today, we pull closed group pubkeys as follow:
* if activeAt is not set, poll only once per hour
* if activeAt is less than an hour old, poll every 5 seconds or so
* if activeAt is less than a day old, poll every minutes only.
* If activeAt is more than a day old, poll only once per hour
*/
public TEST_getPollingTimeout(convoId: PubKey) {
const convo = getConversationController().get(convoId.key);
if (!convo) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}
const activeAt = convo.get('active_at');
if (!activeAt) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}

const currentTimestamp = Date.now();

// consider that this is an active group if activeAt is less than an hour old
if (currentTimestamp - activeAt <= DURATION.HOURS * 1) {
return SWARM_POLLING_TIMEOUT.ACTIVE;
}

if (currentTimestamp - activeAt <= DURATION.DAYS * 1) {
return SWARM_POLLING_TIMEOUT.MEDIUM_ACTIVE;
}

return SWARM_POLLING_TIMEOUT.INACTIVE;
}

/**
* Only public for testing
*/
public async TEST_pollForAllKeys() {
// we always poll as often as possible for our pubkey
const directPromise = this.ourPubkey
? this.TEST_pollOnceForKey(this.ourPubkey, false)
: Promise.resolve();

const now = Date.now();
const groupPromises = this.groupPolling.map(async group => {
const convoPollingTimeout = this.TEST_getPollingTimeout(group.pubkey);

const diff = now - group.lastPolledTimestamp;

const loggingId =
getConversationController()
.get(group.pubkey.key)
?.idForLogging() || group.pubkey.key;

if (diff >= convoPollingTimeout) {
(window?.log?.info || console.warn)(
`Polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);
return this.TEST_pollOnceForKey(group.pubkey, true);
}
(window?.log?.info || console.warn)(
`Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);

return Promise.resolve();
});
try {
await Promise.all(_.concat(directPromise, groupPromises));
} catch (e) {
(window?.log?.info || console.warn)('pollForAllKeys swallowing exception: ', e);
throw e;
} finally {
setTimeout(this.TEST_pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
}
}

/**
* Only exposed as public for testing
*/
public async TEST_pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
// NOTE: sometimes pubkey is string, sometimes it is object, so
// accept both until this is fixed:
const pkStr = pubkey.key;

const snodes = await getSwarmFor(pkStr);
const snodes = await snodePool.getSwarmFor(pkStr);

// Select nodes for which we already have lastHashes
const alreadyPolled = snodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
Expand Down Expand Up @@ -198,75 +286,6 @@ class SwarmPolling {
return newMessages;
}

/**
* As of today, we pull closed group pubkeys as follow:
* if activeAt is not set, poll only once per hour
* if activeAt is less than an hour old, poll every 5 seconds or so
* if activeAt is less than a day old, poll every minutes only.
* If activeAt is more than a day old, poll only once per hour
*/
private getPollingTimeout(convoId: PubKey) {
const convo = getConversationController().get(convoId.key);
if (!convo) {
return this.pollOnceForKey(convoId, true);
}
const activeAt = convo.get('active_at');
if (!activeAt) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}

const currentTimestamp = Date.now();

// consider that this is an active group if activeAt is less than an hour old
if (currentTimestamp - activeAt <= DURATION.HOURS * 1) {
return SWARM_POLLING_TIMEOUT.ACTIVE;
}

if (currentTimestamp - activeAt <= DURATION.DAYS * 1) {
return SWARM_POLLING_TIMEOUT.MEDIUM_ACTIVE;
}

return SWARM_POLLING_TIMEOUT.INACTIVE;
}

private async pollForAllKeys() {
// we always poll as often as possible for our pubkey
const directPromise = this.ourPubkey
? this.pollOnceForKey(this.ourPubkey, false)
: Promise.resolve();

const now = Date.now();
const groupPromises = this.groupPolling.map(async group => {
const convoPollingTimeout = this.getPollingTimeout(group.pubkey);

const diff = now - group.lastPolledTimestamp;

const loggingId = getConversationController()
.get(group.pubkey.key)
.idForLogging();

if (diff >= convoPollingTimeout) {
window?.log?.info(
`Polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);
return this.pollOnceForKey(group.pubkey, true);
}
window?.log?.info(
`Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);

return Promise.resolve();
});
try {
await Promise.all(_.concat(directPromise, groupPromises));
} catch (e) {
window?.log?.warn('pollForAllKeys swallowing exception: ', e);
throw e;
} finally {
setTimeout(this.pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
}
}

private async updateLastHash(
edkey: string,
pubkey: PubKey,
Expand Down
Loading

0 comments on commit 13bc1a2

Please sign in to comment.