Skip to content

Commit

Permalink
Merge pull request #1 from armada-network/static-topology
Browse files Browse the repository at this point in the history
Support static topology
  • Loading branch information
the-flagship authored Apr 10, 2023
2 parents 8a733dc + f4c29e5 commit 484cc01
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 26 deletions.
1 change: 1 addition & 0 deletions rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default {
values: {
'process.env.BOOTSTRAP_NODES': JSON.stringify('{{.BootstrapNodes}}'),
'process.env.CONTENT_NODE_REFRESH_INTERVAL_MS': 60 * 60 * 1000, // 1 hour
'process.env.CONTENT_NODES': JSON.stringify('{{.ContentNodes}}'),
'process.env.PROJECT_ID': JSON.stringify('{{.ProjectID}}'),
},
preventAssignment: true,
Expand Down
21 changes: 18 additions & 3 deletions src/service-worker/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
import {Adapter} from './src/adapter';
import {ArmadaAPIClientImpl, HTTPProtocol} from './src/armada/api';
import {ArmadaDriver as Driver} from './src/armada/driver';
import {NodeRegistryImpl} from './src/armada/registry';
import {DynamicNodeRegistry, NodeRegistry, StaticNodeRegistry} from './src/armada/registry';
import {CacheDatabase} from './src/db-cache';

const scope = self as unknown as ServiceWorkerGlobalScope;

const envContentNodes = process.env.CONTENT_NODES as string;
const contentNodes = (envContentNodes.trim() !== '') ? envContentNodes.trim().split(',') : [];

const envBootstrapNodes = process.env.BOOTSTRAP_NODES as string;
const bootstrapNodes = envBootstrapNodes.split(',');
const bootstrapNodes = (envBootstrapNodes.trim() !== '') ? envBootstrapNodes.trim().split(',') : [];

const contentNodeRefreshIntervalMs = Number(process.env.CONTENT_NODE_REFRESH_INTERVAL_MS);

const projectId = process.env.PROJECT_ID as string;

const adapter = new Adapter(scope.registration.scope, self.caches);
Expand All @@ -26,5 +31,15 @@ const apiClient = new ArmadaAPIClientImpl(
location.protocol as HTTPProtocol,
projectId,
);
const registry = new NodeRegistryImpl(apiClient, bootstrapNodes, contentNodeRefreshIntervalMs);

let registry: NodeRegistry;
if (bootstrapNodes.length) {
registry = new DynamicNodeRegistry(apiClient, bootstrapNodes, contentNodeRefreshIntervalMs);
} else if (contentNodes.length) {
registry = new StaticNodeRegistry(contentNodes);
} else {
throw new Error(
'Can\'t initialize node registry: must set env.CONTENT_NODES or env.BOOTSTRAP_NODES');
}

new Driver(scope, adapter, new CacheDatabase(adapter), registry, apiClient, scope.crypto.subtle);
16 changes: 15 additions & 1 deletion src/service-worker/src/armada/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@ export interface NodeRegistry {
refreshNodesInterval(): void;
}

export class NodeRegistryImpl implements NodeRegistry {
export class StaticNodeRegistry implements NodeRegistry {
constructor(protected contentNodes: string[]) {}

public async allNodes(randomize: boolean): Promise<string[]> {
const nodes = this.contentNodes.slice();
if (randomize) {
shuffle(nodes);
}
return nodes;
}

public refreshNodesInterval() {}
}

export class DynamicNodeRegistry implements NodeRegistry {
private contentNodes: string[] = [];
private refreshPending: Promise<void>|null = null;
private updateTimer: ReturnType<typeof setTimeout>|null = null;
Expand Down
26 changes: 13 additions & 13 deletions src/service-worker/test/armada/happy_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {webcrypto} from 'crypto';

import {ArmadaAPIClient, ArmadaAPIClientImpl} from '../../src/armada/api';
import {ArmadaDriver, ArmadaDriver as Driver} from '../../src/armada/driver';
import {NodeRegistryImpl} from '../../src/armada/registry';
import {DynamicNodeRegistry} from '../../src/armada/registry';
import {CacheDatabase} from '../../src/db-cache';
import {DriverReadyState} from '../../src/driver';
import {AssetGroupConfig, DataGroupConfig, Manifest} from '../../src/manifest';
Expand Down Expand Up @@ -297,7 +297,7 @@ describe('Driver', () => {

scope = new SwTestHarnessBuilder().withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
});
Expand Down Expand Up @@ -557,7 +557,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
expect(await makeRequest(scope, '/foo.txt')).toEqual('this is foo');
Expand Down Expand Up @@ -616,7 +616,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -688,7 +688,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
expect(await makeRequest(scope, '/foo.txt')).toEqual('this is foo');
Expand Down Expand Up @@ -734,7 +734,7 @@ describe('Driver', () => {
// Simulate failing to load the stored state (and thus starting from an empty state).
scope.caches.delete('db:control');
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1193,7 +1193,7 @@ describe('Driver', () => {
const newScope =
new SwTestHarnessBuilder('http://localhost/foo/bar/').withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(newScope, newScope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
new Driver(
newScope, newScope, new CacheDatabase(newScope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1271,7 +1271,7 @@ describe('Driver', () => {
.withServerState(serverState)
.build();
const apiClient = new ArmadaAPIClientImpl(newScope, newScope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const newDriver = new Driver(
newScope, newScope, new CacheDatabase(newScope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1398,7 +1398,7 @@ describe('Driver', () => {
scope = new SwTestHarnessBuilder().withServerState(brokenServer).build();
(scope.registration as any).scope = 'http://site.com';
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1618,7 +1618,7 @@ describe('Driver', () => {
// Create initial server state and initialize the SW.
scope = new SwTestHarnessBuilder().withServerState(originalServer).build();
let apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
let registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
let registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand All @@ -1637,7 +1637,7 @@ describe('Driver', () => {
.withServerState(updatedServer)
.build();
apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1671,7 +1671,7 @@ describe('Driver', () => {
// Create initial server state and initialize the SW.
scope = new SwTestHarnessBuilder().withServerState(originalServer).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1746,7 +1746,7 @@ describe('Driver', () => {
const server = serverBuilderBase.withManifest(freshnessManifest).build();
const scope = new SwTestHarnessBuilder().withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down
45 changes: 36 additions & 9 deletions src/service-worker/test/armada/registry_spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {NodesResponse} from '../../src/armada/api';
import {NodeRegistryImpl} from '../../src/armada/registry';
import {DynamicNodeRegistry, StaticNodeRegistry} from '../../src/armada/registry';

class StaticAPIClient {
public count: number;
Expand Down Expand Up @@ -35,12 +35,39 @@ class MultiHostStaticAPIClient {
}
}

describe('NodeRegistryImpl', () => {
describe('StaticNodeRegistry', () => {
describe('returns content nodes', () => {
it('when allNodes() is called', async () => {
const nodes = ['content0', 'content1'];
const registry = new StaticNodeRegistry(nodes);
expect(await registry.allNodes(false)).toEqual(nodes);
});

it('randomizes the returned nodes when specified', async () => {
const nodesArr = [...Array(20).keys()].map(i => `content${i}`);
const nodesSet = new Set(nodesArr);
const registry = new StaticNodeRegistry(nodesArr);

let foundShuffled = false;
for (let i = 0; i < 100 && !foundShuffled; i++) {
const got = await registry.allNodes(true);
expect(got.length).toEqual(nodesArr.length);
expect(new Set(got)).toEqual(nodesSet);
if (!arraysMatch(got, nodesArr)) {
foundShuffled = true;
}
}
expect(foundShuffled).toBeTrue();
});
});
});

describe('DynamicNodeRegistry', () => {
describe('populates content nodes', () => {
it('when allNodes() is called', async () => {
const nodes = ['content0', 'content1'];
const apiClient = new StaticAPIClient(nodes);
const registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
const registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);

expect(await registry.allNodes(false)).toEqual(nodes);
});
Expand Down Expand Up @@ -93,7 +120,7 @@ describe('NodeRegistryImpl', () => {
for (let tc of cases) {
it(tc.name, async () => {
const apiClient = new MultiHostStaticAPIClient(tc.topologyData);
const registry = new NodeRegistryImpl(apiClient, Object.keys(tc.topologyData), 10000);
const registry = new DynamicNodeRegistry(apiClient, Object.keys(tc.topologyData), 10000);
expect(await registry.allNodes(false)).toEqual(tc.want);
});
}
Expand Down Expand Up @@ -136,20 +163,20 @@ describe('NodeRegistryImpl', () => {
for (let tc of cases) {
it(tc.name, async () => {
const apiClient = new MultiHostStaticAPIClient(tc.topologyData);
const registry = new NodeRegistryImpl(apiClient, Object.keys(tc.topologyData), 10000);
const registry = new DynamicNodeRegistry(apiClient, Object.keys(tc.topologyData), 10000);
await expectAsync(registry.allNodes(false)).toBeRejected();
});
}
});

describe('content node cache', () => {
let apiClient: StaticAPIClient;
let registry: NodeRegistryImpl;
let registry: DynamicNodeRegistry;
const nodes = ['content0', 'content1'];

beforeEach(async () => {
apiClient = new StaticAPIClient(nodes);
registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);
});

it('will hit once populated', async () => {
Expand Down Expand Up @@ -188,7 +215,7 @@ describe('NodeRegistryImpl', () => {
return {hosts: ['content0', 'content1']};
},
};
const registry = new NodeRegistryImpl(waitingAPIClient, ['topology0'], 10000);
const registry = new DynamicNodeRegistry(waitingAPIClient, ['topology0'], 10000);

const fetch1 = registry.allNodes(false);
const fetch2 = registry.allNodes(false);
Expand All @@ -205,7 +232,7 @@ describe('NodeRegistryImpl', () => {
const nodesArr = [...Array(20).keys()].map(i => `content${i}`);
const nodesSet = new Set(nodesArr);
const apiClient = new StaticAPIClient(nodesArr);
const registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
const registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);

let foundShuffled = false;
for (let i = 0; i < 100 && !foundShuffled; i++) {
Expand Down

0 comments on commit 484cc01

Please sign in to comment.