Skip to content

Commit

Permalink
fix(platform): connect to the host periodically when starting the pla…
Browse files Browse the repository at this point in the history
…tform

When connecting to the host and if the client does not receive connect acknowledgement in time, the client will resend the connect request.

A single connect request may not be sufficient if, for example, the microfrontends are to be integrated into a rich client. For example, an integrator may want to bridge messages to a remote host. If the integrator cannot hook into the message bus in time, the client's connection request may be lost. Therefore, the gateway initiates the connection request at regular intervals until it receives an acknowledgement.
  • Loading branch information
danielwiehl authored and Marcarrian committed Apr 1, 2022
1 parent bb044f1 commit 788a444
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2018-2020 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {MicrofrontendPlatform} from '../microfrontend-platform';
import {Beans} from '@scion/toolkit/bean-manager';
import {Observer} from 'rxjs';
import {ɵBrokerGateway} from './messaging/broker-gateway';

export async function connectToHost({symbolicName, brokerDiscoverTimeout, connectCount}, observer: Observer<string>): Promise<void> { // eslint-disable-line @typescript-eslint/typedef
await MicrofrontendPlatform.connectToHost(symbolicName, {brokerDiscoverTimeout});
observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId);

for (let i = 1; i < connectCount; i++) {
const {clientId} = await Beans.get(ɵBrokerGateway).connectToBroker();
observer.next(clientId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2018-2020 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
import {MicrofrontendPlatform} from '../microfrontend-platform';
import {MicrofrontendFixture} from '../testing/microfrontend-fixture/microfrontend-fixture';
import {firstValueFrom, timer} from 'rxjs';
import {ManifestFixture} from '../testing/manifest-fixture/manifest-fixture';
import {Beans} from '@scion/toolkit/bean-manager';
import {ClientRegistry} from '../host/client-registry/client.registry';
import {ObserveCaptor} from '@scion/toolkit/testing';

describe('MicrofrontendPlatform', () => {

const disposables = new Set<Disposable>();

beforeEach(async () => {
await MicrofrontendPlatform.destroy();
});

afterEach(async () => {
await MicrofrontendPlatform.destroy();
disposables.forEach(disposable => disposable());
});

it('should throw if not connected to the host within the configured timeout', async () => {
const microfrontendFixture = registerFixture(new MicrofrontendFixture()).insertIframe();
const connectPromise = microfrontendFixture.loadScript('./lib/client/client-connect.script.ts', 'connectToHost', {symbolicName: 'client', brokerDiscoverTimeout: 250});
await expectAsync(connectPromise).toBeRejectedWithError(/\[GatewayError] Message broker not discovered within 250ms/);
});

it('should retry to connect to the host', async () => {
// Mount the client before starting the host, which means that the first connect request(s) are never answered.
const microfrontendFixture = registerFixture(new MicrofrontendFixture()).insertIframe();
const connectPromise = microfrontendFixture.loadScript('./lib/client/client-connect.script.ts', 'connectToHost', {symbolicName: 'client', brokerDiscoverTimeout: 2000});
await firstValueFrom(timer(1000));

// Start the host a little bit later.
await MicrofrontendPlatform.startHost({
applications: [
{
symbolicName: 'client',
manifestUrl: new ManifestFixture({name: 'Client'}).serve(),
},
],
});

// Expect client to be connected
await expectAsync(connectPromise).toBeResolved();
const clientId = await firstValueFrom(microfrontendFixture.message$);
expect(Beans.get(ClientRegistry).getByClientId(clientId)).withContext('expected "client" to be CONNECTED').toBeDefined();
});

it('should ignore duplicate connect request of the same client', async () => {
await MicrofrontendPlatform.startHost({
applications: [
{
symbolicName: 'client',
manifestUrl: new ManifestFixture({name: 'Client'}).serve(),
},
],
});

const microfrontendFixture = registerFixture(new MicrofrontendFixture()).insertIframe();
await microfrontendFixture.loadScript('./lib/client/client-connect.script.ts', 'connectToHost', {symbolicName: 'client', connectCount: 3});

const clients = Beans.get(ClientRegistry).getByApplication('client');
expect(clients.length).toBe(1);
const clientId = clients[0].id;

const clientIdCaptor = new ObserveCaptor();
microfrontendFixture.message$.subscribe(clientIdCaptor);

expect(clientIdCaptor.getValues()).toEqual([clientId, clientId, clientId]);
});

/**
* Registers the fixture for destruction after test execution.
*/
function registerFixture(fixture: MicrofrontendFixture): MicrofrontendFixture {
disposables.add(() => fixture.removeIframe());
return fixture;
}
});

type Disposable = () => void;
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@

import {MicrofrontendPlatform} from '../microfrontend-platform';
import {Beans} from '@scion/toolkit/bean-manager';
import {MessageClient} from './messaging/message-client';
import {firstValueFrom, Observer} from 'rxjs';
import {UUID} from '@scion/toolkit/uuid';
import {map} from 'rxjs/operators';
import {MessageHeaders} from '../messaging.model';
import {Observer} from 'rxjs';
import {MicrofrontendPlatformStopper} from '../microfrontend-platform-stopper';
import {VERSION} from '../version';
import {ɵBrokerGateway} from './messaging/broker-gateway';

export async function connectToHost({symbolicName, disconnectOnUnloadDisabled = false, version = undefined}, observer: Observer<string>): Promise<void> { // eslint-disable-line @typescript-eslint/typedef
if (disconnectOnUnloadDisabled) {
Expand All @@ -26,28 +23,20 @@ export async function connectToHost({symbolicName, disconnectOnUnloadDisabled =
Beans.register(VERSION, {useValue: version});
}
await MicrofrontendPlatform.connectToHost(symbolicName);
await sendCurrentClientIdToFixture(observer);
observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId);
}

export async function connectToHostThenStopPlatform({symbolicName}, observer: Observer<string>): Promise<void> { // eslint-disable-line @typescript-eslint/typedef
await MicrofrontendPlatform.connectToHost(symbolicName);
await sendCurrentClientIdToFixture(observer);
observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId);
await MicrofrontendPlatform.destroy();
}

export async function connectToHostThenLocationHref({symbolicName, locationHref}, observer: Observer<string>): Promise<void> { // eslint-disable-line @typescript-eslint/typedef
await MicrofrontendPlatform.connectToHost(symbolicName);
await sendCurrentClientIdToFixture(observer);
observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId);
window.location.href = locationHref;
}

async function sendCurrentClientIdToFixture(observer: Observer<string>): Promise<void> {
const uuid = UUID.randomUUID();
const uuid$ = Beans.get(MessageClient).observe$(uuid).pipe(map(message => message.headers.get(MessageHeaders.ClientId)));
const clientId = firstValueFrom(uuid$);
await Beans.get(MessageClient).publish(uuid);
observer.next(await clientId);
}

class NullMicrofrontendPlatformStopper implements MicrofrontendPlatformStopper {
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
import {AsyncSubject, EMPTY, firstValueFrom, fromEvent, interval, lastValueFrom, merge, MonoTypeOperatorFunction, NEVER, noop, Observable, Observer, of, ReplaySubject, Subject, TeardownLogic, throwError, timeout} from 'rxjs';
import {AsyncSubject, EMPTY, firstValueFrom, fromEvent, interval, lastValueFrom, merge, MonoTypeOperatorFunction, NEVER, noop, Observable, Observer, of, ReplaySubject, Subject, TeardownLogic, throwError, timeout, timer} from 'rxjs';
import {ConnackMessage, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, TopicSubscribeCommand, TopicUnsubscribeCommand} from '../../ɵmessaging.model';
import {finalize, map, mergeMap, take, takeUntil, tap} from 'rxjs/operators';
import {filterByChannel, filterByMessageHeader, filterByOrigin, filterByTopicChannel, filterByTransport, pluckMessage} from '../../operators';
Expand Down Expand Up @@ -99,6 +99,17 @@ export class NullBrokerGateway implements BrokerGateway {
}
}

/**
* Specifies the interval for sending connect requests to parent windows. If not receiving acknowledgment within the
* specified interval, the gateway will try to connect anew.
*
* A single connect request may not be sufficient if, for example, the microfrontends are to be integrated into a rich client.
* For example, an integrator may want to bridge messages to a remote host. If the integrator cannot hook into the message bus
* in time, the client's connect request may be lost. Therefore, the gateway initiates the connect request at regular
* intervals until it receives an acknowledgement.
*/
const CONNECT_INTERVAL = 25;

/**
* @ignore
*/
Expand Down Expand Up @@ -143,6 +154,10 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
return lastValueFrom(this._brokerInfo$).then(() => true).catch(() => false);
}

public get brokerInfo(): BrokerInfo | null {
return this._brokerInfo;
}

public async postMessage(channel: MessagingChannel, message: Message): Promise<void> {
if (isPlatformStopped()) {
throw GatewayErrors.PLATFORM_STOPPED_ERROR;
Expand Down Expand Up @@ -315,7 +330,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
* @return A Promise that, when connected, resolves to information about the broker, or that rejects if the connect attempt
* failed, either because the broker could not be found or because the application is not allowed to connect.
*/
private connectToBroker(): Promise<BrokerInfo> {
public connectToBroker(): Promise<BrokerInfo> {
const replyTo = UUID.randomUUID();
const connectPromise = firstValueFrom(fromEvent<MessageEvent>(window, 'message')
.pipe(
Expand Down Expand Up @@ -351,7 +366,13 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
},
};

this.collectWindowHierarchy().forEach(window => window.postMessage(connectMessage, '*'));
const windowHierarchy = this.collectWindowHierarchy();
timer(0, CONNECT_INTERVAL)
.pipe(takeUntil(connectPromise.catch(() => null)))
.subscribe(() => {
windowHierarchy.forEach(window => window.postMessage(connectMessage, '*'));
});

return connectPromise;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export class MessageBroker implements Initializer, PreDestroy {
return;
}

const eventSource: Window = event.source as Window;
const envelope: MessageEnvelope<TopicMessage<void>> = event.data;
const clientAppName = envelope.message.headers.get(MessageHeaders.AppSymbolicName);
const clientMessageTarget = new MessageTarget(event);
Expand Down Expand Up @@ -162,7 +163,19 @@ export class MessageBroker implements Initializer, PreDestroy {
return;
}

const client = new ɵClient(UUID.randomUUID(), event.source as Window, application, envelope.message.headers.get(MessageHeaders.Version));
// Check if the client is already connected. If already connected, do nothing. A client can potentially initiate multiple connect requests, for example,
// when not receiving connect confirmation in time.
const currentClient = this._clientRegistry.getByWindow(eventSource);
if (currentClient && currentClient.application.origin === event.origin && currentClient.application.symbolicName === application.symbolicName) {
sendTopicMessage<ConnackMessage>(currentClient, {
topic: replyTo,
body: {returnCode: 'accepted', clientId: currentClient.id, heartbeatInterval: this._heartbeatInterval},
headers: new Map(),
});
return;
}

const client = new ɵClient(UUID.randomUUID(), eventSource, application, envelope.message.headers.get(MessageHeaders.Version));
this._clientRegistry.registerClient(client);

// Check if the client is compatible with the platform version of the host.
Expand Down

0 comments on commit 788a444

Please sign in to comment.