Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support static topology #1

Merged
merged 2 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default {
values: {
'process.env.BOOTSTRAP_NODES': JSON.stringify('{{.BootstrapNodes}}'),
'process.env.CONTENT_NODE_REFRESH_INTERVAL_MS': 60 * 60 * 1000, // 1 hour
'process.env.CONTENT_NODES': JSON.stringify('{{.ContentNodes}}'),
'process.env.PROJECT_ID': JSON.stringify('{{.ProjectID}}'),
},
preventAssignment: true,
Expand Down
21 changes: 18 additions & 3 deletions src/service-worker/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
import {Adapter} from './src/adapter';
import {ArmadaAPIClientImpl, HTTPProtocol} from './src/armada/api';
import {ArmadaDriver as Driver} from './src/armada/driver';
import {NodeRegistryImpl} from './src/armada/registry';
import {DynamicNodeRegistry, NodeRegistry, StaticNodeRegistry} from './src/armada/registry';
import {CacheDatabase} from './src/db-cache';

const scope = self as unknown as ServiceWorkerGlobalScope;

const envContentNodes = process.env.CONTENT_NODES as string;
const contentNodes = (envContentNodes.trim() !== '') ? envContentNodes.trim().split(',') : [];

const envBootstrapNodes = process.env.BOOTSTRAP_NODES as string;
const bootstrapNodes = envBootstrapNodes.split(',');
const bootstrapNodes = (envBootstrapNodes.trim() !== '') ? envBootstrapNodes.trim().split(',') : [];

const contentNodeRefreshIntervalMs = Number(process.env.CONTENT_NODE_REFRESH_INTERVAL_MS);

const projectId = process.env.PROJECT_ID as string;

const adapter = new Adapter(scope.registration.scope, self.caches);
Expand All @@ -26,5 +31,15 @@ const apiClient = new ArmadaAPIClientImpl(
location.protocol as HTTPProtocol,
projectId,
);
const registry = new NodeRegistryImpl(apiClient, bootstrapNodes, contentNodeRefreshIntervalMs);

let registry: NodeRegistry;
if (bootstrapNodes.length) {
registry = new DynamicNodeRegistry(apiClient, bootstrapNodes, contentNodeRefreshIntervalMs);
} else if (contentNodes.length) {
registry = new StaticNodeRegistry(contentNodes);
} else {
throw new Error(
'Can\'t initialize node registry: must set env.CONTENT_NODES or env.BOOTSTRAP_NODES');
}

new Driver(scope, adapter, new CacheDatabase(adapter), registry, apiClient, scope.crypto.subtle);
16 changes: 15 additions & 1 deletion src/service-worker/src/armada/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@ export interface NodeRegistry {
refreshNodesInterval(): void;
}

export class NodeRegistryImpl implements NodeRegistry {
export class StaticNodeRegistry implements NodeRegistry {
constructor(protected contentNodes: string[]) {}

public async allNodes(randomize: boolean): Promise<string[]> {
const nodes = this.contentNodes.slice();
if (randomize) {
shuffle(nodes);
}
return nodes;
}

public refreshNodesInterval() {}
}

export class DynamicNodeRegistry implements NodeRegistry {
private contentNodes: string[] = [];
private refreshPending: Promise<void>|null = null;
private updateTimer: ReturnType<typeof setTimeout>|null = null;
Expand Down
26 changes: 13 additions & 13 deletions src/service-worker/test/armada/happy_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {webcrypto} from 'crypto';

import {ArmadaAPIClient, ArmadaAPIClientImpl} from '../../src/armada/api';
import {ArmadaDriver, ArmadaDriver as Driver} from '../../src/armada/driver';
import {NodeRegistryImpl} from '../../src/armada/registry';
import {DynamicNodeRegistry} from '../../src/armada/registry';
import {CacheDatabase} from '../../src/db-cache';
import {DriverReadyState} from '../../src/driver';
import {AssetGroupConfig, DataGroupConfig, Manifest} from '../../src/manifest';
Expand Down Expand Up @@ -297,7 +297,7 @@ describe('Driver', () => {

scope = new SwTestHarnessBuilder().withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
});
Expand Down Expand Up @@ -557,7 +557,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
expect(await makeRequest(scope, '/foo.txt')).toEqual('this is foo');
Expand Down Expand Up @@ -616,7 +616,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -688,7 +688,7 @@ describe('Driver', () => {
.withServerState(serverUpdate)
.build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);
expect(await makeRequest(scope, '/foo.txt')).toEqual('this is foo');
Expand Down Expand Up @@ -734,7 +734,7 @@ describe('Driver', () => {
// Simulate failing to load the stored state (and thus starting from an empty state).
scope.caches.delete('db:control');
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1193,7 +1193,7 @@ describe('Driver', () => {
const newScope =
new SwTestHarnessBuilder('http://localhost/foo/bar/').withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(newScope, newScope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
new Driver(
newScope, newScope, new CacheDatabase(newScope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1271,7 +1271,7 @@ describe('Driver', () => {
.withServerState(serverState)
.build();
const apiClient = new ArmadaAPIClientImpl(newScope, newScope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const newDriver = new Driver(
newScope, newScope, new CacheDatabase(newScope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1398,7 +1398,7 @@ describe('Driver', () => {
scope = new SwTestHarnessBuilder().withServerState(brokenServer).build();
(scope.registration as any).scope = 'http://site.com';
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1618,7 +1618,7 @@ describe('Driver', () => {
// Create initial server state and initialize the SW.
scope = new SwTestHarnessBuilder().withServerState(originalServer).build();
let apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
let registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
let registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand All @@ -1637,7 +1637,7 @@ describe('Driver', () => {
.withServerState(updatedServer)
.build();
apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1671,7 +1671,7 @@ describe('Driver', () => {
// Create initial server state and initialize the SW.
scope = new SwTestHarnessBuilder().withServerState(originalServer).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
driver = new Driver(
scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down Expand Up @@ -1746,7 +1746,7 @@ describe('Driver', () => {
const server = serverBuilderBase.withManifest(freshnessManifest).build();
const scope = new SwTestHarnessBuilder().withServerState(server).build();
const apiClient = new ArmadaAPIClientImpl(scope, scope, 'http:', TEST_PROJECT_ID);
const registry = new NodeRegistryImpl(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const registry = new DynamicNodeRegistry(apiClient, [TEST_BOOTSTRAP_NODE], 10000);
const driver =
new Driver(scope, scope, new CacheDatabase(scope), registry, apiClient, webcrypto.subtle);

Expand Down
45 changes: 36 additions & 9 deletions src/service-worker/test/armada/registry_spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {NodesResponse} from '../../src/armada/api';
import {NodeRegistryImpl} from '../../src/armada/registry';
import {DynamicNodeRegistry, StaticNodeRegistry} from '../../src/armada/registry';

class StaticAPIClient {
public count: number;
Expand Down Expand Up @@ -35,12 +35,39 @@ class MultiHostStaticAPIClient {
}
}

describe('NodeRegistryImpl', () => {
describe('StaticNodeRegistry', () => {
describe('returns content nodes', () => {
it('when allNodes() is called', async () => {
const nodes = ['content0', 'content1'];
const registry = new StaticNodeRegistry(nodes);
expect(await registry.allNodes(false)).toEqual(nodes);
});

it('randomizes the returned nodes when specified', async () => {
const nodesArr = [...Array(20).keys()].map(i => `content${i}`);
const nodesSet = new Set(nodesArr);
const registry = new StaticNodeRegistry(nodesArr);

let foundShuffled = false;
for (let i = 0; i < 100 && !foundShuffled; i++) {
const got = await registry.allNodes(true);
expect(got.length).toEqual(nodesArr.length);
expect(new Set(got)).toEqual(nodesSet);
if (!arraysMatch(got, nodesArr)) {
foundShuffled = true;
}
}
expect(foundShuffled).toBeTrue();
});
});
});

describe('DynamicNodeRegistry', () => {
describe('populates content nodes', () => {
it('when allNodes() is called', async () => {
const nodes = ['content0', 'content1'];
const apiClient = new StaticAPIClient(nodes);
const registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
const registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);

expect(await registry.allNodes(false)).toEqual(nodes);
});
Expand Down Expand Up @@ -93,7 +120,7 @@ describe('NodeRegistryImpl', () => {
for (let tc of cases) {
it(tc.name, async () => {
const apiClient = new MultiHostStaticAPIClient(tc.topologyData);
const registry = new NodeRegistryImpl(apiClient, Object.keys(tc.topologyData), 10000);
const registry = new DynamicNodeRegistry(apiClient, Object.keys(tc.topologyData), 10000);
expect(await registry.allNodes(false)).toEqual(tc.want);
});
}
Expand Down Expand Up @@ -136,20 +163,20 @@ describe('NodeRegistryImpl', () => {
for (let tc of cases) {
it(tc.name, async () => {
const apiClient = new MultiHostStaticAPIClient(tc.topologyData);
const registry = new NodeRegistryImpl(apiClient, Object.keys(tc.topologyData), 10000);
const registry = new DynamicNodeRegistry(apiClient, Object.keys(tc.topologyData), 10000);
await expectAsync(registry.allNodes(false)).toBeRejected();
});
}
});

describe('content node cache', () => {
let apiClient: StaticAPIClient;
let registry: NodeRegistryImpl;
let registry: DynamicNodeRegistry;
const nodes = ['content0', 'content1'];

beforeEach(async () => {
apiClient = new StaticAPIClient(nodes);
registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);
});

it('will hit once populated', async () => {
Expand Down Expand Up @@ -188,7 +215,7 @@ describe('NodeRegistryImpl', () => {
return {hosts: ['content0', 'content1']};
},
};
const registry = new NodeRegistryImpl(waitingAPIClient, ['topology0'], 10000);
const registry = new DynamicNodeRegistry(waitingAPIClient, ['topology0'], 10000);

const fetch1 = registry.allNodes(false);
const fetch2 = registry.allNodes(false);
Expand All @@ -205,7 +232,7 @@ describe('NodeRegistryImpl', () => {
const nodesArr = [...Array(20).keys()].map(i => `content${i}`);
const nodesSet = new Set(nodesArr);
const apiClient = new StaticAPIClient(nodesArr);
const registry = new NodeRegistryImpl(apiClient, ['topology'], 10000);
const registry = new DynamicNodeRegistry(apiClient, ['topology'], 10000);

let foundShuffled = false;
for (let i = 0; i < 100 && !foundShuffled; i++) {
Expand Down